SPARK-32920 implements a simple approach for shuffle merge finalization, which transitions from shuffle map stage to reduce stage when push-based shuffle is enabled.
This simple approach basically waits for a static period of time after all map tasks are finished before initiating shuffle merge finalization. This approach is not very ideal to handle jobs with varying size of shuffles. For a small shuffle, we want the merge finalization to happen as early and as quickly as possible. For a large shuffle, we might want to wait for longer time to achieve a better merge ratio. A static configuration for the entire job cannot adapt to such varying needs.
This raises the need for adaptive shuffle merge finalization, where the amount of time to wait before merge finalization is adaptive to the size of the shuffle. We have implemented an effective adaptive shuffle merge finalization mechanism, which introduces 2 more config parameters: spark.shuffle.push.minShuffleSizeToWait and spark.shuffle.push.minPushRatio. Together with spark.shuffle.push.finalize.time, the adaptive shuffle merge finalization works in the following way:
- Whenever a ShuffleBlockPusher finishes pushing all the shuffle data generated by a mapper, it notifies the Spark driver about this.
- When the Spark driver receives notification of a completed shuffle push, it updates state maintained in the corresponding ShuffleDependency.
- If the ratio of completed pushes (# completed pushes / # map tasks) exceeds minPushRatio, the driver would then immediately schedule shuffle merge finalization.
- If the driver receives notification that all map tasks have finished first, it would then gather the size of the shuffle from MapOutputStatistics. If the total shuffle size is smaller than minSizeToWait, the driver would ignore the pushed shuffle partition and treat the shuffle as a regular shuffle and start schedule the reduce stage. It would also asynchronously schedule shuffle merge finalization immediately, but ignores all the responses.
- If the total shuffle size is larger than minSizeToWait, the driver would schedule shuffle merge finalization after waiting for a period of time of finalize.time. If during this wait time the driver receives enough push completion notification to reach minPushRatio, the driver would then reschedule the shuffle merge finalization for immediate execution.
In addition to the above, per
SPARK-36530, we should also check if no block gets pushed because all blocks are larger than spark.shuffle.push.maxBlockSizeToPush. If so, we should also skip shuffle merge finalization. The information about whether any blocks from a mapper get pushed can be included in the new RPC between Spark executor/driver to notify driver about push completion.