Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-7081

Faster sort-based shuffle path using binary processing cache-aware sort

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.4.0
    • Component/s: Shuffle, Spark Core
    • Labels:
      None

      Description

      (Description copied from GitHub):

      This patch introduces a new shuffle manager that enhances the existing sort-based shuffle with a new cache-friendly sort algorithm that operates directly on binary data. The goals of this patch are to lower memory usage and Java object overheads during shuffle and to speed up sorting. It also lays groundwork for follow-up patches that will enable end-to-end processing of serialized records.

      The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting `spark.shuffle.manager=tungsten-sort` in SparkConf.

      The new shuffle manager uses directly-managed memory to implement several performance optimizations for certain types of shuffles. In cases where the new performance optimizations cannot be applied, the new shuffle manager delegates to SortShuffleManager to handle those shuffles.

      UnsafeShuffleManager's optimizations will apply when all of the following conditions hold:

      • The shuffle dependency specifies no aggregation or output ordering.
      • The shuffle serializer supports relocation of serialized values (this is currently supported
        by KryoSerializer and Spark SQL's custom serializers).
      • The shuffle produces fewer than 16777216 output partitions.
      • No individual record is larger than 128 MB when serialized.

      In addition, extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark's LZF serializer.

      At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. In sort-based shuffle, incoming records are sorted according to their target partition ids, then written to a single map output file. Reducers fetch contiguous regions of this file in order to read their portion of the map output. In cases where the map output data is too large to fit in memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged to produce the final output file.

      UnsafeShuffleManager optimizes this process in several ways:

      • Its sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. This optimization requires the record serializer to have certain properties to allow serialized records to be re-ordered without requiring deserialization. See SPARK-4550, where this optimization was first proposed and implemented, for more details.
      • It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, this fits more of the array into cache.
      • The spill merging procedure operates on blocks of serialized records that belong to the same partition and does not need to deserialize records during the merge.
      • When the spill compression codec supports concatenation of compressed data, the spill merge simply concatenates the serialized and compressed spill partitions to produce the final output partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used and avoids the need to allocate decompression or copying buffers during the merge.

      The shuffle read path is unchanged.

      This patch is similar to http://issues.apache.org/jira/browse/SPARK-4550 but uses a slightly different implementation. The `unsafe`-based implementation featured in this patch lays the groundwork for followup patches that will enable sorting to operate on serialized data pages that will be prepared by Spark SQL's new `unsafe` operators.

        Issue Links

          Activity

          Hide
          apachespark Apache Spark added a comment -

          User 'JoshRosen' has created a pull request for this issue:
          https://github.com/apache/spark/pull/5868

          Show
          apachespark Apache Spark added a comment - User 'JoshRosen' has created a pull request for this issue: https://github.com/apache/spark/pull/5868
          Hide
          joshrosen Josh Rosen added a comment - - edited

          Cross-posting a comment from my PR:

          Based on some additional discussions, I think that we should use specialized sorter implementation that is specific to sort-based shuffle and design a separate sorter for more general-purpose record sorting. By using a specialized sorter, we can benefit from several performance optimizations that would be difficult to implement in a more general-purpose sorter:

          • We may not need a full 32-bits to record the partition id; since shuffles are unlikely to have more than, say, 10 million partitions, we can use fewer bits to encode the partition id, allowing us to pack it into a single word that uses the remaining bits to store the record pointer (exploiting the fact that we can choose a reasonable upper bound on our addressable memory (say 1 terabyte) and can save address bits due to word-alignment). This should give us excellent cache-locality benefits, since the sort array will only require 8 bytes of space per record.
          • Because sort-shuffle's sort keys are partition ids, we can expect to encounter long runs of records with the same sort key in the sorted file; this may not be the case in a more general-purpose sort, where there might be cases in which all of the sort keys are distinct. As a result, this allows external sort to use a specialized merge procedure that operates on runs of records with the same key rather than individual records.
          • Ignoring the spilling/merging case for a moment, the sort shuffle writer only needs to know individual records' lengths during the in-memory sort: we need to know a record's length in order to copy it to the sorted output file, but once a partition's records are adjacent in the output file we no longer need to know their individual lengths. This is a consequence of the fact that the sorted data will be consumed by a serializer that knows how to identify record boundaries, building on our assumption that the serializer supports reordering of serialized records in its serialization stream.
          • If we go further and assume that our IO compression codec supports concatenation of compressed data (which Snappy seems to support), then we can implement an IO-efficient merging procedure for external sort. If we store an index file for each spill file, identifying the offsets of each partition within the sorted file, then the merge sort procedure can simply traverse these indicies and concatenate the partitions' serialized data without interpreting it. This would let us use methods like `transferTo` to implement copying without requiring data to be buffered in the JVM, sidestepping the complexity of managing IO buffers during the merge.

          In light of this, I'm going to work on refactoring my external sorting branch to perform these optimizations and will update this pull request with those changes. I'm going to remove the SPARK-7078 JIRA link, since that JIRA seems to be concerned with a more general-purpose record sorter for use in SQL joins.

          Show
          joshrosen Josh Rosen added a comment - - edited Cross-posting a comment from my PR : Based on some additional discussions, I think that we should use specialized sorter implementation that is specific to sort-based shuffle and design a separate sorter for more general-purpose record sorting. By using a specialized sorter, we can benefit from several performance optimizations that would be difficult to implement in a more general-purpose sorter: We may not need a full 32-bits to record the partition id; since shuffles are unlikely to have more than, say, 10 million partitions, we can use fewer bits to encode the partition id, allowing us to pack it into a single word that uses the remaining bits to store the record pointer (exploiting the fact that we can choose a reasonable upper bound on our addressable memory (say 1 terabyte) and can save address bits due to word-alignment). This should give us excellent cache-locality benefits, since the sort array will only require 8 bytes of space per record. Because sort-shuffle's sort keys are partition ids, we can expect to encounter long runs of records with the same sort key in the sorted file; this may not be the case in a more general-purpose sort, where there might be cases in which all of the sort keys are distinct. As a result, this allows external sort to use a specialized merge procedure that operates on runs of records with the same key rather than individual records. Ignoring the spilling/merging case for a moment, the sort shuffle writer only needs to know individual records' lengths during the in-memory sort: we need to know a record's length in order to copy it to the sorted output file, but once a partition's records are adjacent in the output file we no longer need to know their individual lengths. This is a consequence of the fact that the sorted data will be consumed by a serializer that knows how to identify record boundaries, building on our assumption that the serializer supports reordering of serialized records in its serialization stream. If we go further and assume that our IO compression codec supports concatenation of compressed data (which Snappy seems to support ), then we can implement an IO-efficient merging procedure for external sort. If we store an index file for each spill file, identifying the offsets of each partition within the sorted file, then the merge sort procedure can simply traverse these indicies and concatenate the partitions' serialized data without interpreting it. This would let us use methods like `transferTo` to implement copying without requiring data to be buffered in the JVM, sidestepping the complexity of managing IO buffers during the merge. In light of this, I'm going to work on refactoring my external sorting branch to perform these optimizations and will update this pull request with those changes. I'm going to remove the SPARK-7078 JIRA link, since that JIRA seems to be concerned with a more general-purpose record sorter for use in SQL joins.
          Hide
          joshrosen Josh Rosen added a comment -

          I've updated the issue description based on the commit message.

          Show
          joshrosen Josh Rosen added a comment - I've updated the issue description based on the commit message.
          Hide
          joshrosen Josh Rosen added a comment -

          Also, note that I posted some simple performance benchmarks at https://github.com/apache/spark/pull/5868#issuecomment-101837095

          Show
          joshrosen Josh Rosen added a comment - Also, note that I posted some simple performance benchmarks at https://github.com/apache/spark/pull/5868#issuecomment-101837095
          Hide
          lirui Rui Li added a comment -

          Hi Josh Rosen, requiring the dependency having no aggregation or key ordering seems to prevent lots of shuffle from leveraging this new optimization, e.g. reduceByKey, sortByKey. Do you have any plan to relax the limitation?

          Show
          lirui Rui Li added a comment - Hi Josh Rosen , requiring the dependency having no aggregation or key ordering seems to prevent lots of shuffle from leveraging this new optimization, e.g. reduceByKey, sortByKey. Do you have any plan to relax the limitation?

            People

            • Assignee:
              joshrosen Josh Rosen
              Reporter:
              rxin Reynold Xin
            • Votes:
              0 Vote for this issue
              Watchers:
              15 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development