Description
During `finalizeShuffleMerge` in external shuffle service, if the finalization request is for a newer shuffle merge id, then we cleanup the existing (older) shuffle details and add the newer entry (for which we got no pushed blocks) to the DB.
Unfortunately, when cleaning up from the DB, we are using the incorrect AppAttemptShuffleMergeId - we remove the latest shuffle merge id instead of the existing entry.
Proposed Fix:
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 816d1082850..551104d0eba 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -653,9 +653,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return // empty MergeStatuses but cleanup the older shuffleMergeId files. + AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new AppAttemptShuffleMergeId( + msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId); submitCleanupTask(() -> closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); } else { // This block covers: // 1. finalization of determinate stage