Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
Description
Currently, each invocation of SourceOperator#emitNext() push at most one record to the given DataOutput. This unnecessarily increases the average Java call stack depth needed to produce a record.
Take the following program as an example. For each element produced by this program, Flink runtime needs to include in the call stack these 3 function calls:
- StreamTask#processInput()
- StreamOneInputProcessor#processInput()
- StreamTaskSourceInput#emitNext()
env.fromSequence(1, 1000000000L).map(x -> x).addSink(new DiscardingSink<>());
This ticket proposes to update SourceOperator#emitNext() to push records to DataOutput in a while loop. It improves Flink performance by removing an average of 3 function from the call stack needed to produce a record.
Here are the benchmark results obtained by running the above program with parallelism=1 and object re-use enabled. The results are averaged across 5 runs for each setup.
- Prior to the proposed change, the average execution time is 46.1 sec with std=5.1 sec.
- After the proposed change, the average execution time is 33.3 sec with std=0.9 sec.
- The proposed change increases throughput by 38.4%.
Attachments
Issue Links
- relates to
-
FLINK-30623 Performance regression in checkpointSingleInput.UNALIGNED on 04.01.2023
- Closed
- links to