Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
Description
Benchmark results show that Flink time to execute simple programs is more than 3X slower than Spark. For example, if we run the following program with object re-use enabled and with parallelism=1, it takes roughtly 120 sec on a macbook, whereas it takes Spark less than 40 sec to run the same logic on the same machine.
DataStream<Long> stream = env.fromSequence(1, 1000000000L) .map(x -> x) .map(x -> x) .map(x -> x) .map(x -> x) .map(x -> x).addSink(new DiscardingSink<>());
It turns out that the operator chain overhead introduced by Flink is surprisingly high. For the above example program, Flink runtime goes through a call stack of 24 functions to produce 1 element. And each extra map(...) operation introduces 3 extra functions in the call stack.
Here are the 24 functions in the call stack:
StreamTask#processInput StreamOneInputProcessor#processInput StreamTaskSourceInput#emitNext SourceOperator#emitNext IteratorSourceReaderBase#pollNext SourceOutputWithWatermarks#collect AsyncDataOutputToOutput#emitRecord ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamSink#processElement
Given the observation described above, here are the explanations for why Flink is slow for programs with low computation overhead:
- For each record produced, Flink runtime currently incurs an unnecessarily deep function call stack. It can be more than 24 for a simple program consisting of 5 map() operations.
- Java's maximum inline level is less than 18 [2]. It is easy for operator chain call stack to exceed this limit and prevent Java from inlining function calls, which further increases the function call overhead.
- For function calls that are not inlined, it requires looking up a virtual table since most functions are virtual functions.
Given the above explanations of the performance issue, here are the ideas to reduce Flink's runtime overhead:
- Update SourceOperator#emitNext() to push records to DataOutput in a while loop. This can reduce the depth of the call stack needed to produce a record by 3 functions. See
FLINK-30533for more information. - Fuse some functions (e.g. ChainingOutput, StreamMap, CountingOutput) to reduce the call stack depth required for each extra operation (e.g. map(...)).
[1] https://arxiv.org/pdf/1610.09166.pdf
[2] https://bugs.openjdk.org/browse/JDK-8234863