Details
-
Bug
-
Status: Resolved
-
Trivial
-
Resolution: Fixed
-
1.3.0, 2.3.0
-
None
Description
SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether it's partitioned or not should be just a academic debate. Unfortunately it doesn't seem to be like this and the issue has side effects.
Namely, it confuses the RDD union.
When there are N classic RDDs partitioned the same way, the union is implemented with the optimized PartitionerAwareUnionRDD, that retains the common partitioner in the result. If one of the N RDDs happens to be an emptyRDD, as it doesn't have a partitioner, the union is implemented by just appending all the partitions of the N RDDs, dropping the partitioner. But there's no need for this, as the emptyRDD contains no elements. This results in further unneeded shuffles once the result of the union is used.
See for example:
val p = new HashPartitioner(3)
val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / 10).partitionBy(p)
val b1 = a.mapValues(_ + 1)
val b2 = a.mapValues(_ - 1)
val e = context.emptyRDD[(Int, Int)]
val x = context.union(a, b1, b2, e)
val y = x.reduceByKey(_ + _)
assert(x.partitioner.contains(p))
y.collect()
The assert fails. Disabling it, it's possible to see that reduceByKey introduced a shuffles, although all the input RDDs are already partitioned the same way, but the emptyRDD.
Forcing a partitioner on the emptyRDD:
val e = context.emptyRDD[(Int, Int)].partitionBy(p)
solves the problem with the assert and doesn't introduce the unneeded extra stage and shuffle.
Union implementation should be changed to ignore the partitioner of emptyRDDs and consider those as partitioned in a way compatible with any partitioner, basically ignoring them.
Present since 1.3 at least.