Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30531

Reduce operator chain call stack depth

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 1.17.0
    • Runtime / Task
    • None

    Description

      Benchmark results show that Flink time to execute simple programs is more than 3X slower than Spark. For example, if we run the following program with object re-use enabled and with parallelism=1, it takes roughtly 120 sec on a macbook, whereas it takes Spark less than 40 sec to run the same logic on the same machine.

      DataStream<Long> stream = env.fromSequence(1, 1000000000L)
      .map(x -> x)
      .map(x -> x)
      .map(x -> x)
      .map(x -> x)
      .map(x -> x).addSink(new DiscardingSink<>());
      

       

      It turns out that the operator chain overhead introduced by Flink is surprisingly high. For the above example program, Flink runtime goes through a call stack of 24 functions to produce 1 element. And each extra map(...) operation introduces 3 extra functions in the call stack.

      Here are the 24 functions in the call stack:

      StreamTask#processInput
      StreamOneInputProcessor#processInput
      StreamTaskSourceInput#emitNext
      SourceOperator#emitNext
      IteratorSourceReaderBase#pollNext
      SourceOutputWithWatermarks#collect
      AsyncDataOutputToOutput#emitRecord
      ChainingOutput#collect
      StreamMap#processElement
      CountingOutput#collect
      ChainingOutput#collect
      StreamMap#processElement
      CountingOutput#collect
      ChainingOutput#collect
      StreamMap#processElement
      CountingOutput#collect
      ChainingOutput#collect
      StreamMap#processElement
      CountingOutput#collect
      ChainingOutput#collect
      StreamMap#processElement
      CountingOutput#collect
      ChainingOutput#collect
      StreamSink#processElement
      

       

      Given the observation described above, here are the explanations for why Flink is slow for programs with low computation overhead:

      • For each record produced, Flink runtime currently incurs an unnecessarily deep function call stack. It can be more than 24 for a simple program consisting of 5 map() operations.
      • Java's maximum inline level is less than 18 [2]. It is easy for operator chain call stack to exceed this limit and prevent Java from inlining function calls, which further increases the function call overhead.
      • For function calls that are not inlined, it requires looking up a virtual table since most functions are virtual functions.

      Given the above explanations of the performance issue, here are the ideas to reduce Flink's runtime overhead:

      • Update SourceOperator#emitNext() to push records to DataOutput in a while loop. This can reduce the depth of the call stack needed to produce a record by 3 functions. See FLINK-30533 for more information.
      • Fuse some functions (e.g. ChainingOutput, StreamMap, CountingOutput) to reduce the call stack depth required for each extra operation (e.g. map(...)).

      [1] https://arxiv.org/pdf/1610.09166.pdf

      [2] https://bugs.openjdk.org/browse/JDK-8234863

       

       

       

      Attachments

        Activity

          People

            lindong Dong Lin
            lindong Dong Lin
            Votes:
            0 Vote for this issue
            Watchers:
            11 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: