Currently, each invocation of SourceOperator#emitNext() push at most one record to the given DataOutput. This unnecessarily increases the average Java call stack depth needed to produce a record.
Take the following program as an example. For each element produced by this program, Flink runtime needs to include in the call stack these 3 function calls:
This ticket proposes to update SourceOperator#emitNext() to push records to DataOutput in a while loop. It improves Flink performance by removing an average of 3 function from the call stack needed to produce a record.
Here are the benchmark results obtained by running the above program with parallelism=1 and object re-use enabled. The results are averaged across 5 runs for each setup.
- Prior to the proposed change, the average execution time is 46.1 sec with std=5.1 sec.
- After the proposed change, the average execution time is 33.3 sec with std=0.9 sec.
- The proposed change increases throughput by 38.4%.