Details
Description
_val df = spark.range(0, 100, 1, 50).repartition(4)_
_val v = df.rdd.mapPartitions { iter => {_
_Iterator.single(iter.length)_
{_}}{_}{_}.collect(){_}
_println(v.mkString(","))_
The above simple code outputs `50,0,0,50`, which means there is no data in partition 1 and partition 2.
I just debugged it and found the RoundRobin seems to ensure to distribute the records evenly *in the same partition*, and not guarantee it between partitions.
Below is the code to generate the key
case RoundRobinPartitioning(numPartitions) => // Distributes elements evenly across output partitions, starting from a random partition. var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions) (row: InternalRow) => { // The HashPartitioner will handle the `mod` by the number of partitions position += 1 position }
In this case, There are 50 partitions, each partition will only compute 2 elements. The issue for RoundRobin here is it always starts with position=2 to do the Roundrobin.
See the output of Random
scala> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " ")) // the position is always 2. 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
Similarly, the below Random code also outputs the same value,
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(2) + " ")) (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " ")) (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(8) + " ")) (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(16) + " ")) (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + " "))
Let's go back to this case,
Consider partition 0, the total elements are [0, 1], so when shuffle writes, for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1, the key will be (position + 1)=(3+1)=4%4 = 0
consider partition 1, the total elements are [2, 3], so when shuffle writes, for element 2, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 3, the key will be (position + 1)=(3+1)=4%4 = 0
The calculation is also applied for other left partitions since the starting position is always 2 for this case.
So, as you can see, each partition will write its elements to Partition [0, 3], which results in Partition [1, 2] without any data.
I will try to provide the patch to fix this issue.