Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Done
-
None
Description
Currently, flink will set the correct key context(by call setKeyContextElement) before processing each record, which is typically used to extract key from record and pass that key to the state backends.
However, the "setKeyContextElement" is obviously not need for non-keyed stream/operator, in which case we can omit the "setKeyContextElement" calls to improve performance. Note that "setKeyContextElement" is an interface method, it requires looking up the interface table when calling, which will further increase the method call overhead.
We run the following program as benchmark with parallelism=1 and object re-use enabled. The benchmark results are averaged across 5 runs for each setup. Before and after applying the proposed change, the average execution time changed from 88.39 s to 78.76 s, which increases throughput by 10.8%.
env.fromSequence(1, 1000000000L)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x).addSink(new DiscardingSink<>());
Attachments
Issue Links
- links to