Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30131

flink iterate will suspend when record is a bit large

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Cannot Reproduce
    • 1.15.2
    • None
    • API / DataStream
    • None

    Description

       

      //代码占位符
      Configuration configuration = new Configuration();
      configuration.setInteger(RestOptions.PORT, 8082);
      configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 10000000);
      configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("4g"));
      configuration.setInteger("taskmanager.network.memory.buffers-per-channel", 10000000);
      configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate", 10000000);
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
      env.setParallelism(1);
      List<Integer> list = new ArrayList<>(10);
      for (int i = 1; i < 10000; i++) {
          list.add(i);
      }
      DataStreamSource<Integer> integerDataStreamSource = env.fromCollection(list);
      DataStream<byte[]> map = integerDataStreamSource.map(i -> new byte[10000000]).setParallelism(1).name("map to byte[]").shuffle();
      IterativeStream<byte[]> iterate = map.iterate();
      DataStream<byte[]> map1 = iterate.process(new ProcessFunction<byte[], byte[]>() {
          @Override
          public void processElement(byte[] value, ProcessFunction<byte[], byte[]>.Context ctx, Collector<byte[]> out) throws Exception {
              out.collect(value);
          }
      }).name("multi collect");
      DataStream<byte[]> filter = map1.filter(i -> true ).setParallelism(1).name("feedback");
      iterate.closeWith(filter);
      map1.map(bytes -> bytes.length).name("map to length").print();
      env.execute(); 

      my code is above.

       

      when i use iterate with big record ,  the iterate will suspend at a random place. when i saw the stack, it has a suspicious thread

      it seems like a network related problem. so i increse the network buffer memory and num. but it only delay the suspend point,  it will still suspend after iterate a little more times than before.

      i want to know if this is a bug or i have some error in my code or configuration.

      looking forward to your reply. thanks in advance.

       

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            landlord Lu
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: