Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.2.0
Description
Problem:
When an RDD (particularly with a low item-per-partition ratio) is repartitioned to numPartitions = power of 2, the resulting partitions are very uneven-sized. This affects both repartition() and coalesce(shuffle=true).
Steps to reproduce:
$ spark-shell scala> sc.parallelize(0 until 1000, 250).repartition(64).glom().map(_.length).collect() res0: Array[Int] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 144, 250, 250, 250, 106, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
Explanation:
Currently, the algorithm for repartition (shuffle-enabled coalesce) is as follows:
- for each initial partition index, generate position as (new Random(index)).nextInt(numPartitions)
- then, for element number k in initial partition index, put it in the new partition position + k (modulo numPartitions).
So, essentially elements are smeared roughly equally over numPartitions buckets - starting from the one with number position+1.
Note that a new instance of Random is created for every initial partition index, with a fixed seed index, and then discarded. So the position is deterministic for every index for any RDD in the world. Also, nextInt(bound) implementation has a special case when bound is a power of 2, which is basically taking several highest bits from the initial seed, with only a minimal scrambling.
Due to deterministic seed, using the generator only once, and lack of scrambling, the position values for power-of-two numPartitions always end up being almost the same regardless of the index, causing some buckets to be much more popular than others. So, repartition will in fact intentionally produce skewed partitions even when before the partition were roughly equal in size.
The behavior seems to have been introduced in SPARK-1770 by https://github.com/apache/spark/pull/727/
The load balancing is not perfect: a given output partition
can have up to N more elements than the average if there are N input
partitions. However, some randomization is used to minimize the
probabiliy that this happens.
Another related ticket: SPARK-17817 - https://github.com/apache/spark/pull/15445