Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-1296

Make RDDs Covariant



    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Won't Fix
    • None
    • None
    • Spark Core
    • None


      First, what is the problem with RDDs not being covariant

      // Consider a function that takes a Seq of some trait.
      scala> trait A { val a = 1 }
      scala> def f(as: Seq[A]) = as.map(_.a)
      // A list of a concrete version of that trait can be used in this function.
      scala> class B extends A
      scala> f(new B :: Nil)
      res0: Seq[Int] = List(1)
      // Now lets try the same thing with RDDs
      scala> def f(as: org.apache.spark.rdd.RDD[A]) = as.map(_.a)
      scala> val rdd = sc.parallelize(new B :: Nil)
      rdd: org.apache.spark.rdd.RDD[B] = ParallelCollectionRDD[2] at parallelize at <console>:42
      // :(
      scala> f(rdd)
      <console>:45: error: type mismatch;
       found   : org.apache.spark.rdd.RDD[B]
       required: org.apache.spark.rdd.RDD[A]
      Note: B <: A, but class RDD is invariant in type T.
      You may wish to define T as +T instead. (SLS 4.5)

      Is it possible to make RDDs covariant?

      Probably? In terms of the public user interface, they are mostly covariant. (Internally we use the type parameter T in a lot of mutable state that breaks the covariance contract, but I think with casting we can 'promise' the compiler that we are behaving). There are also a lot of complications with other types that we return which are invariant.

      What will it take to make RDDs covariant?

      As I mention above, all of our mutable internal state is going to require casting to avoid using T. This seems to be okay, it makes our life only slightly harder. This extra work required because we are basically promising the compiler that even if an RDD is implicitly upcast, internally we are keeping all the checkpointed data of the correct type. Since an RDD is immutable, we are okay!

      We also need to modify all the places where we use T in function parameters. So for example:

      def ++[U >: T : ClassTag](other: RDD[U]): RDD[U] = this.union(other).asInstanceOf[RDD[U]]

      We are now allowing you to append an RDD of a less specific type, and then returning a less specific new RDD. This I would argue is a good change. We are strictly improving the power of the RDD interface, while maintaining reasonable type semantics.

      So, why wouldn't we do it?

      There are a lot of places where we interact with invariant types. We return both Maps and Arrays from a lot of public functions. Arrays are invariant (but if we returned immutable sequences instead.... we would be good), and Maps are invariant in the Key (once again, immutable sequences of tuples would be great here).

      I don't think this is a deal breaker, and we may even be able to get away with it, without changing the returns types of these functions. For example, I think that this should work, though once again requires make promises to the compiler:

         * Return an array that contains all of the elements in this RDD.
        def collect[U >: T](): Array[U] = {
          val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
          Array.concat(results: _*).asInstanceOf[Array[U]]

      I started working on this here. Thoughts / suggestions are welcome!


        Issue Links



              marmbrus Michael Armbrust
              marmbrus Michael Armbrust
              1 Vote for this issue
              17 Start watching this issue