Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23778

SparkContext.emptyRDD confuses SparkContext.union

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Trivial
    • Resolution: Fixed
    • Affects Version/s: 1.3.0, 2.3.0
    • Fix Version/s: 2.4.0
    • Component/s: Spark Core
    • Labels:
      None

      Description

      SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether it's partitioned or not should be just a academic debate. Unfortunately it doesn't seem to be like this and the issue has side effects.

      Namely, it confuses the RDD union.

      When there are N classic RDDs partitioned the same way, the union is implemented with the optimized PartitionerAwareUnionRDD, that retains the common partitioner in the result. If one of the N RDDs happens to be an emptyRDD, as it doesn't have a partitioner, the union is implemented by just appending all the partitions of the N RDDs, dropping the partitioner. But there's no need for this, as the emptyRDD contains no elements. This results in further unneeded shuffles once the result of the union is used.

      See for example:

      val p = new HashPartitioner(3)
      val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / 10).partitionBy(p)
      val b1 = a.mapValues(_ + 1)
      val b2 = a.mapValues(_ - 1)
      val e = context.emptyRDD[(Int, Int)]
      val x = context.union(a, b1, b2, e)
      val y = x.reduceByKey(_ + _)
      assert(x.partitioner.contains(p))
      y.collect()

      The assert fails. Disabling it, it's possible to see that reduceByKey introduced a shuffles, although all the input RDDs are already partitioned the same way, but the emptyRDD.

      Forcing a partitioner on the emptyRDD:

      val e = context.emptyRDD[(Int, Int)].partitionBy(p)

      solves the problem with the assert and doesn't introduce the unneeded extra stage and shuffle.

      Union implementation should be changed to ignore the partitioner of emptyRDDs and consider those as partitioned in a way compatible with any partitioner, basically ignoring them.

      Present since 1.3 at least.

        Attachments

        1. as_it_should_be.png
          198 kB
          Stefano Pettini
        2. partitioner_lost_and_unneeded_extra_stage.png
          164 kB
          Stefano Pettini

          Activity

            People

            • Assignee:
              mgaido Marco Gaido
              Reporter:
              ragazzojp Stefano Pettini
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: