Description
We identified 3 issues in the handling of merge finalization requests in the RemoteBlockPushResolver:
1. Empty merge data
If the shuffle gets finalized while a reducer partition is still receiving its first block, when merger finalizes that partition, we will end up with no data in the files - as it gets truncated to the last good position (which will be 0 in this case).
Even though no data exists for the reducer - we still add it to result (merged reducerIds).
2. Overwriting of the merged data file of a reduce partition after it is finalized
This is a more involved issue where some specific set of situations must occur, and starts with how our check for a too late block is done here.
The example below gives more details, but in a nutshell we have the following for a DETERMINATE shuffle:
- Merge starts, blocks are accepted.
- Merge is finalized.
- Files closed, status reported to driver, appShuffleInfo.shuffles cleaned up.
- Late block push from an executor received.
- Request for a reducer for which merger never received a data until then - so no on-disk files
- Our check does not catch this case - we end up (re-) starting merge.
- Executor could now push blocks for reducers which were finalized earlier.
- Files are truncated.
- Reads will see inconsistent state due to the ongoing writes.
Explaining this with an example with for a DETERMINATE shuffleId 1, shuffleMergeId 0, and reduce partitions 100 and 200:
- shufflePush_1_0_0_100 is received by the RemoteBlockPushResolver.
- No meta information existed for shuffle 1 so shuffle service creates AppShuffleMergePartitionsInfo for shuffle 1 and shuffleMerge 0 to start merge.
- Merge starts with RemoteBlockPushResolver and it creates the data file for the merger request shuffleMerged_$APP_ID_1_0_100.data (along with index/meta files)
- FinalizeShuffleMerge message for shuffleId 1 and shuffleMerged 0 is received by RemoteBlockPushResolver. In a thread safe manner:
- AppShuffleMergePartitionsInfo for shuffle 1 is removed from the map in memory.
- shuffleMerged_$APP_ID_1_0_100.data/index/meta files are closed.
- Driver is informed that partition 100 of shuffleId 1/mergeId 0 was merged.
- shufflePush_1_0_0_200 is received by the RemoteBlockPushResolver.
- A new AppShuffleMergePartitionsInfo is added since:
- There is no AppShuffleMergePartitionsInfo for shuffle 1/merged id 0 - as it was removed during finalization, and
- The merger had never received data for partition 200 until then.
- With this, shuffleMerged…200.data is created, and on that merger, merge for shuffleId 1/mergeId 0 starts again.
- A new AppShuffleMergePartitionsInfo is added since:
- shufflePush_1_0_5_100 is received by the RemoteBlockPushResolver. We randomize the order of pushes, so late pushes from an executor can end up pushing reducer 200 followed by data for reducer 100.
- AppShuffleMergePartitionsInfo was created for shuffle 1 and shuffleMerged 0 in 3-1 which doesn’t have the reduce id 100, the data/index/meta files for these partitions will be recreated. Reference code.
3. Throwing exception in the finalization of a shuffle for which the shuffle server didn't receive any blocks.
For very small stages and with low minCompletedPushRatio/minShuffleSizeToWait, the driver can initiate the finalization of a shuffle right away. The shuffle server may not receive any push blocks and so there will not be a AppShuffleMergePartitionsInfo instance corresponding to the shuffle in the state. In this case, we should mark the shuffle as finalized and return empty results.
Attachments
Issue Links
- fixes
-
SPARK-42834 Divided by zero occurs in PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse
- Closed
- links to