Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30602

SPIP: Support push-based shuffle to improve shuffle efficiency

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: In Progress
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 3.1.0
    • Fix Version/s: None
    • Component/s: Shuffle, Spark Core
    • Labels:
    • Target Version/s:

      Description

      In a large deployment of a Spark compute infrastructure, Spark shuffle is becoming a potential scaling bottleneck and a source of inefficiency in the cluster. When doing Spark on YARN for a large-scale deployment, people usually enable Spark external shuffle service and store the intermediate shuffle files on HDD. Because the number of blocks generated for a particular shuffle grows quadratically compared to the size of shuffled data (# mappers and reducers grows linearly with the size of shuffled data, but # blocks is # mappers * # reducers), one general trend we have observed is that the more data a Spark application processes, the smaller the block size becomes. In a few production clusters we have seen, the average shuffle block size is only 10s of KBs. Because of the inefficiency of performing random reads on HDD for small amount of data, the overall efficiency of the Spark external shuffle services serving the shuffle blocks degrades as we see an increasing # of Spark applications processing an increasing amount of data. In addition, because Spark external shuffle service is a shared service in a multi-tenancy cluster, the inefficiency with one Spark application could propagate to other applications as well.

      In this ticket, we propose a solution to improve Spark shuffle efficiency in above mentioned environments with push-based shuffle. With push-based shuffle, shuffle is performed at the end of mappers and blocks get pre-merged and move towards reducers. In our prototype implementation, we have seen significant efficiency improvements when performing large shuffles. We take a Spark-native approach to achieve this, i.e., extending Spark’s existing shuffle netty protocol, and the behaviors of Spark mappers, reducers and drivers. This way, we can bring the benefits of more efficient shuffle in Spark without incurring the dependency or overhead of either specialized storage layer or external infrastructure pieces.

       

      Link to dev mailing list discussion: http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html

        Attachments

        1. vldb_magnet_final.pdf
          728 kB
          Min Shen
        2. Screen Shot 2020-06-23 at 11.31.22 AM.jpg
          253 kB
          Min Shen

          Issue Links

          1.
          RPC implementation to support pushing and merging shuffle blocks Sub-task Resolved Min Shen
          2.
          Add support for external shuffle service in YARN deployment mode to leverage push-based shuffle Sub-task Resolved Chandni Singh
          3.
          Add support for executors to push shuffle blocks after successful map task completion Sub-task Resolved Chandni Singh
          4.
          RPC implementation to support control plane coordination for push-based shuffle Sub-task Resolved Ye Zhou
          5.
          Add support in Spark driver to coordinate the shuffle map stage in push-based shuffle by selecting external shuffle services for merging shuffle partitions Sub-task Resolved Venkata krishnan Sowrirajan
          6.
          Add support in Spark driver to coordinate the finalization of the push/merge phase in push-based shuffle for a given shuffle and the initiation of the reduce stage Sub-task Resolved Venkata krishnan Sowrirajan
          7.
          Extend MapOutputTracker to support tracking and serving the metadata about each merged shuffle partitions for a given shuffle in push-based shuffle scenario Sub-task Resolved Venkata krishnan Sowrirajan
          8.
          Add support for ShuffleBlockFetcherIterator to read from merged shuffle partitions and to fallback to original shuffle blocks if encountering failures Sub-task In Progress Unassigned
          9.
          Add support to properly handle different type of stage retries Sub-task Open Unassigned
          10.
          Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data Sub-task Resolved Ye Zhou
          11.
          Fix cases of corruption in merged shuffle blocks that are pushed Sub-task Resolved Chandni Singh
          12.
          Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better way Sub-task Open Unassigned
          13.
          Add Support in the ESS to serve merged shuffle block meta and data to executors Sub-task In Progress Unassigned

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                mshen Min Shen
              • Votes:
                19 Vote for this issue
                Watchers:
                89 Start watching this issue

                Dates

                • Created:
                  Updated: