Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-33235 Push-based Shuffle Improvement Tasks
  3. SPARK-37675

Prevent overwriting of push shuffle merged files once the shuffle is finalized



    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.2.0
    • 3.3.0, 3.2.2
    • Shuffle
    • None


      We identified 3 issues in the handling of merge finalization requests in the RemoteBlockPushResolver:
      1. Empty merge data
      If the shuffle gets finalized while a reducer partition is still receiving its first block, when merger finalizes that partition, we will end up with no data in the files - as it gets truncated to the last good position (which will be 0 in this case).
      Even though no data exists for the reducer - we still add it to result (merged reducerIds).

      2. Overwriting of the merged data file of a reduce partition after it is finalized
      This is a more involved issue where some specific set of situations must occur, and starts with how our check for a too late block is done here.
      The example below gives more details, but in a nutshell we have the following for a DETERMINATE shuffle:

      1. Merge starts, blocks are accepted.
      2. Merge is finalized.
        • Files closed, status reported to driver, appShuffleInfo.shuffles cleaned up.
      3. Late block push from an executor received.
        • Request for a reducer for which merger never received a data until then - so no on-disk files
        • Our check does not catch this case - we end up (re-) starting merge.
      4. Executor could now push blocks for reducers which were finalized earlier.
        • Files are truncated.
      5. Reads will see inconsistent state due to the ongoing writes.

      Explaining this with an example with for a DETERMINATE shuffleId 1, shuffleMergeId 0, and reduce partitions 100 and 200:

      1. shufflePush_1_0_0_100 is received by the RemoteBlockPushResolver.
        1. No meta information existed for shuffle 1 so shuffle service creates AppShuffleMergePartitionsInfo for shuffle 1 and shuffleMerge 0 to start merge.
        2. Merge starts with RemoteBlockPushResolver and it creates the data file for the merger request shuffleMerged_$APP_ID_1_0_100.data (along with index/meta files)
      2. FinalizeShuffleMerge message for shuffleId 1 and shuffleMerged 0 is received by RemoteBlockPushResolver. In a thread safe manner:
        1. AppShuffleMergePartitionsInfo for shuffle 1 is removed from the map in memory.
        2. shuffleMerged_$APP_ID_1_0_100.data/index/meta files are closed.
        3. Driver is informed that partition 100 of shuffleId 1/mergeId 0 was merged.
      3. shufflePush_1_0_0_200 is received by the RemoteBlockPushResolver.
        1. A new AppShuffleMergePartitionsInfo is added since:
          1. There is no AppShuffleMergePartitionsInfo for shuffle 1/merged id 0 - as it was removed during finalization, and
          2. The merger had never received data for partition 200 until then.
        2. With this, shuffleMerged…200.data is created, and on that merger, merge for shuffleId 1/mergeId 0 starts again.
      4. shufflePush_1_0_5_100 is received by the RemoteBlockPushResolver. We randomize the order of pushes, so late pushes from an executor can end up pushing reducer 200 followed by data for reducer 100.
        1. AppShuffleMergePartitionsInfo was created for shuffle 1 and shuffleMerged 0 in 3-1 which doesn’t have the reduce id 100, the data/index/meta files for these partitions will be recreated. Reference code.

      3. Throwing exception in the finalization of a shuffle for which the shuffle server didn't receive any blocks.
      For very small stages and with low minCompletedPushRatio/minShuffleSizeToWait, the driver can initiate the finalization of a shuffle right away. The shuffle server may not receive any push blocks and so there will not be a AppShuffleMergePartitionsInfo instance corresponding to the shuffle in the state. In this case, we should mark the shuffle as finalized and return empty results.


        Issue Links



              csingh Chandni Singh
              chengpan Cheng Pan
              0 Vote for this issue
              3 Start watching this issue