Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Cannot Reproduce
-
1.15.2
-
None
-
None
Description
//代码占位符 Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT, 8082); configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 10000000); configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("4g")); configuration.setInteger("taskmanager.network.memory.buffers-per-channel", 10000000); configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate", 10000000); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(1); List<Integer> list = new ArrayList<>(10); for (int i = 1; i < 10000; i++) { list.add(i); } DataStreamSource<Integer> integerDataStreamSource = env.fromCollection(list); DataStream<byte[]> map = integerDataStreamSource.map(i -> new byte[10000000]).setParallelism(1).name("map to byte[]").shuffle(); IterativeStream<byte[]> iterate = map.iterate(); DataStream<byte[]> map1 = iterate.process(new ProcessFunction<byte[], byte[]>() { @Override public void processElement(byte[] value, ProcessFunction<byte[], byte[]>.Context ctx, Collector<byte[]> out) throws Exception { out.collect(value); } }).name("multi collect"); DataStream<byte[]> filter = map1.filter(i -> true ).setParallelism(1).name("feedback"); iterate.closeWith(filter); map1.map(bytes -> bytes.length).name("map to length").print(); env.execute();
my code is above.
when i use iterate with big record , the iterate will suspend at a random place. when i saw the stack, it has a suspicious thread
it seems like a network related problem. so i increse the network buffer memory and num. but it only delay the suspend point, it will still suspend after iterate a little more times than before.
i want to know if this is a bug or i have some error in my code or configuration.
looking forward to your reply. thanks in advance.