Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21782

Repartition creates skews when numPartitions is a power of 2

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.2.0
    • Fix Version/s: 2.3.0
    • Component/s: Spark Core
    • Labels:

      Description

      Problem:
      When an RDD (particularly with a low item-per-partition ratio) is repartitioned to numPartitions = power of 2, the resulting partitions are very uneven-sized. This affects both repartition() and coalesce(shuffle=true).

      Steps to reproduce:

      $ spark-shell
      
      scala> sc.parallelize(0 until 1000, 250).repartition(64).glom().map(_.length).collect()
      res0: Array[Int] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 144, 250, 250, 250, 106, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
      

      Explanation:
      Currently, the algorithm for repartition (shuffle-enabled coalesce) is as follows:

      • for each initial partition index, generate position as (new Random(index)).nextInt(numPartitions)
      • then, for element number k in initial partition index, put it in the new partition position + k (modulo numPartitions).

      So, essentially elements are smeared roughly equally over numPartitions buckets - starting from the one with number position+1.

      Note that a new instance of Random is created for every initial partition index, with a fixed seed index, and then discarded. So the position is deterministic for every index for any RDD in the world. Also, nextInt(bound) implementation has a special case when bound is a power of 2, which is basically taking several highest bits from the initial seed, with only a minimal scrambling.

      Due to deterministic seed, using the generator only once, and lack of scrambling, the position values for power-of-two numPartitions always end up being almost the same regardless of the index, causing some buckets to be much more popular than others. So, repartition will in fact intentionally produce skewed partitions even when before the partition were roughly equal in size.

      The behavior seems to have been introduced in SPARK-1770 by https://github.com/apache/spark/pull/727/

      The load balancing is not perfect: a given output partition
      can have up to N more elements than the average if there are N input
      partitions. However, some randomization is used to minimize the
      probabiliy that this happens.

      Another related ticket: SPARK-17817 - https://github.com/apache/spark/pull/15445

        Attachments

        1. Screen Shot 2017-08-16 at 3.40.01 PM.png
          73 kB
          Sergey Serebryakov

          Activity

            People

            • Assignee:
              megaserg Sergey Serebryakov
              Reporter:
              megaserg Sergey Serebryakov
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: