Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30602

SPIP: Support push-based shuffle to improve shuffle efficiency




      In a large deployment of a Spark compute infrastructure, Spark shuffle is becoming a potential scaling bottleneck and a source of inefficiency in the cluster. When doing Spark on YARN for a large-scale deployment, people usually enable Spark external shuffle service and store the intermediate shuffle files on HDD. Because the number of blocks generated for a particular shuffle grows quadratically compared to the size of shuffled data (# mappers and reducers grows linearly with the size of shuffled data, but # blocks is # mappers * # reducers), one general trend we have observed is that the more data a Spark application processes, the smaller the block size becomes. In a few production clusters we have seen, the average shuffle block size is only 10s of KBs. Because of the inefficiency of performing random reads on HDD for small amount of data, the overall efficiency of the Spark external shuffle services serving the shuffle blocks degrades as we see an increasing # of Spark applications processing an increasing amount of data. In addition, because Spark external shuffle service is a shared service in a multi-tenancy cluster, the inefficiency with one Spark application could propagate to other applications as well.

      In this ticket, we propose a solution to improve Spark shuffle efficiency in above mentioned environments with push-based shuffle. With push-based shuffle, shuffle is performed at the end of mappers and blocks get pre-merged and move towards reducers. In our prototype implementation, we have seen significant efficiency improvements when performing large shuffles. We take a Spark-native approach to achieve this, i.e., extending Spark’s existing shuffle netty protocol, and the behaviors of Spark mappers, reducers and drivers. This way, we can bring the benefits of more efficient shuffle in Spark without incurring the dependency or overhead of either specialized storage layer or external infrastructure pieces.


      Link to dev mailing list discussion: http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html


        1. Screen Shot 2020-06-23 at 11.31.22 AM.jpg
          253 kB
          Min Shen
        2. vldb_magnet_final.pdf
          728 kB
          Min Shen

        Issue Links

          Push-based shuffle documentation Sub-task Resolved Venkata krishnan Sowrirajan
          RPC implementation to support pushing and merging shuffle blocks Sub-task Resolved Min Shen
          Add support for external shuffle service in YARN deployment mode to leverage push-based shuffle Sub-task Resolved Chandni Singh
          Add support for executors to push shuffle blocks after successful map task completion Sub-task Resolved Chandni Singh
          RPC implementation to support control plane coordination for push-based shuffle Sub-task Resolved Ye Zhou
          Add support in Spark driver to coordinate the shuffle map stage in push-based shuffle by selecting external shuffle services for merging shuffle partitions Sub-task Resolved Venkata krishnan Sowrirajan
          Add support in Spark driver to coordinate the finalization of the push/merge phase in push-based shuffle for a given shuffle and the initiation of the reduce stage Sub-task Resolved Venkata krishnan Sowrirajan
          Extend MapOutputTracker to support tracking and serving the metadata about each merged shuffle partitions for a given shuffle in push-based shuffle scenario Sub-task Resolved Venkata krishnan Sowrirajan
          Add support for ShuffleBlockFetcherIterator to read from merged shuffle partitions and to fallback to original shuffle blocks if encountering failures Sub-task Resolved Chandni Singh
          Add support to properly handle different type of stage retries Sub-task Resolved Venkata krishnan Sowrirajan
          Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data Sub-task Resolved Ye Zhou
          Fix cases of corruption in merged shuffle blocks that are pushed Sub-task Resolved Chandni Singh
          Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better way Sub-task Resolved Ye Zhou
          Add Support in the ESS to serve merged shuffle block meta and data to executors Sub-task Resolved Chandni Singh
          Disable push-based shuffle until the feature is complete Sub-task Resolved Unassigned
          FileNotFoundException from the shuffle push can cause the executor to terminate Sub-task Resolved Chandni Singh
          Rename classes in shuffle RPC used for block push operations Sub-task Resolved Min Shen
          Avoid finalizing when there's no push at all in a shuffle Sub-task Resolved Unassigned
          Stage has all tasks finished but with ongoing finalization can cause job hang Sub-task Resolved Unassigned
          Disable push based shuffle when IO encryption is enabled or serializer is not relocatable Sub-task Resolved Minchu Yang
          Handle fallback when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true Sub-task Resolved Aravind Patnam



              mshen Min Shen
              mshen Min Shen
              Mridul Muralidharan Mridul Muralidharan
              19 Vote for this issue
              100 Start watching this issue