Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-7263

Add new shuffle manager which stores shuffle blocks in Parquet

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Later
    • None
    • None
    • Shuffle, Spark Core
    • None

    Description

      I have a working prototype of this feature that can be viewed at
      https://github.com/apache/spark/compare/master...massie:parquet-shuffle?expand=1

      Setting the "spark.shuffle.manager" to "parquet" enables this shuffle manager.

      The dictionary support that Parquet provides appreciably reduces the amount of
      memory that objects use; however, once Parquet data is shuffled, all the
      dictionary information is lost and the column-oriented data is written to shuffle
      blocks in a record-oriented fashion. This shuffle manager addresses this issue
      by reading and writing all shuffle blocks in the Parquet format.

      If shuffle objects are Avro records, then the Avro $SCHEMA is converted to Parquet
      schema and used directly, otherwise, the Parquet schema is generated via reflection.
      Currently, the only non-Avro keys supported is primitive types. The reflection
      code can be improved (or replaced) to support complex records.

      The ParquetShufflePair class allows the shuffle key and value to be stored in
      Parquet blocks as a single record with a single schema.

      This commit adds the following new Spark configuration options:
      "spark.shuffle.parquet.compression" - sets the Parquet compression codec
      "spark.shuffle.parquet.blocksize" - sets the Parquet block size
      "spark.shuffle.parquet.pagesize" - set the Parquet page size
      "spark.shuffle.parquet.enabledictionary" - turns dictionary encoding on/off

      Parquet does not (and has no plans to) support a streaming API. Metadata sections
      are scattered through a Parquet file making a streaming API difficult. As such,
      the ShuffleBlockFetcherIterator has been modified to fetch the entire contents
      of map outputs into temporary blocks before loading the data into the reducer.

      Interesting future asides:
      o There is no need to define a data serializer (although Spark requires it)
      o Parquet support predicate pushdown and projection which could be used at
      between shuffle stages to improve performance in the future

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              massie Matt Massie
              Votes:
              3 Vote for this issue
              Watchers:
              20 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: