Details

Type: Bug

Status: Resolved

Priority: Major

Resolution: Fixed

Affects Version/s: 2.2.0

Fix Version/s: 2.3.0

Component/s: Spark Core

Labels:
Description
Problem:
When an RDD (particularly with a low itemperpartition ratio) is repartitioned to numPartitions = power of 2, the resulting partitions are very unevensized. This affects both repartition() and coalesce(shuffle=true).
Steps to reproduce:
$ sparkshell scala> sc.parallelize(0 until 1000, 250).repartition(64).glom().map(_.length).collect() res0: Array[Int] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 144, 250, 250, 250, 106, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
Explanation:
Currently, the algorithm for repartition (shuffleenabled coalesce) is as follows:
 for each initial partition index, generate position as (new Random(index)).nextInt(numPartitions)
 then, for element number k in initial partition index, put it in the new partition position + k (modulo numPartitions).
So, essentially elements are smeared roughly equally over numPartitions buckets  starting from the one with number position+1.
Note that a new instance of Random is created for every initial partition index, with a fixed seed index, and then discarded. So the position is deterministic for every index for any RDD in the world. Also, nextInt(bound) implementation has a special case when bound is a power of 2, which is basically taking several highest bits from the initial seed, with only a minimal scrambling.
Due to deterministic seed, using the generator only once, and lack of scrambling, the position values for poweroftwo numPartitions always end up being almost the same regardless of the index, causing some buckets to be much more popular than others. So, repartition will in fact intentionally produce skewed partitions even when before the partition were roughly equal in size.
The behavior seems to have been introduced in SPARK1770 by https://github.com/apache/spark/pull/727/
The load balancing is not perfect: a given output partition
can have up to N more elements than the average if there are N input
partitions. However, some randomization is used to minimize the
probabiliy that this happens.
Another related ticket: SPARK17817  https://github.com/apache/spark/pull/15445