Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-41792

Shuffle merge finalization removes the wrong finalization state from the DB

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.4.0
    • 3.4.0
    • Shuffle
    • None

    Description

      During `finalizeShuffleMerge` in external shuffle service, if the finalization request is for a newer shuffle merge id, then we cleanup the existing (older) shuffle details and add the newer entry (for which we got no pushed blocks) to the DB.

      Unfortunately, when cleaning up from the DB, we are using the incorrect AppAttemptShuffleMergeId - we remove the latest shuffle merge id instead of the existing entry.

      Proposed Fix:

      diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
      index 816d1082850..551104d0eba 100644
      --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
      +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
      @@ -653,9 +653,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
               } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
                 // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return
                 // empty MergeStatuses but cleanup the older shuffleMergeId files.
      +          AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new AppAttemptShuffleMergeId(
      +                  msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
                 submitCleanupTask(() ->
                     closeAndDeleteOutdatedPartitions(
      -                  appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions));
      +                  currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions));
               } else {
                 // This block covers:
                 //  1. finalization of determinate stage
      

      Attachments

        Activity

          People

            mridulm80 Mridul Muralidharan
            mridulm80 Mridul Muralidharan
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: