With push-based shuffle enabled and adaptive merge finalization, there are scenarios where a re-attempt of ShuffleMapStage may not complete.
With Adaptive Merge Finalization, a stage may be triggered for finalization when it is in the below state:
- The stage is not running (not in the running set of the DAGScheduler) - had failed or canceled or waiting, and
- The stage has no pending partitions (all the tasks completed at-least once).
For such a stage when the finalization completes, the stage will still not be marked as mergeFinalized.
The stage of the stage will be:
- stage.shuffleDependency.mergeFinalized = false
- stage.shuffleDependency.getFinalizeTask = finalizeTask
- Merged statuses of the state are unregistered
When the stage is resubmitted, the newer attempt of the stage will never complete even though its tasks may be completed. This is because the newer attempt of the stage will have shuffleMergeEnabled = true, since with the previous attempt the stage was never marked as mergedFinalized, and the finalizeTask is present (from finalization attempt for previous stage attempt).
So, when all the tasks of the newer attempt complete, then these conditions will be true:
- stage will be running
- There will be no pending partitions since all the tasks completed
- stage.shuffleDependency.shuffleMergeEnabled = true
- stage.shuffleDependency.shuffleMergeFinalized = false
- stage.shuffleDependency.getFinalizeTask is not empty
This leads the DAGScheduler to try scheduling finalization and not trigger the completion of the Stage. However because of the last condition it never even schedules the finalization and the stage never completes.