Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-4459

JavaRDDLike.groupBy[K](f: JFunction[T, K]) may fail with typechecking errors



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.0.2, 1.1.0
    • 1.1.2, 1.2.1
    • Java API
    • None


      I believe this issue is essentially the same as SPARK-668.
      Original error:

      [ERROR] /Users/saldaal1/workspace/JavaSparkSimpleApp/src/main/java/SimpleApp.java:[29,105] no suitable method found for groupBy(org.apache.spark.api.java.function.Function<scala.Tuple2<java.lang.String,java.lang.Long>,java.lang.Long>)
      [ERROR] method org.apache.spark.api.java.JavaPairRDD.<K>groupBy(org.apache.spark.api.java.function.Function<scala.Tuple2<K,java.lang.Long>,K>) is not applicable
      [ERROR] (inferred type does not conform to equality constraint(s)

      from core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

      211  /**
      212    * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
      213    * mapping to that key.
      214    */
      215   def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
      216     implicit val ctagK: ClassTag[K] = fakeClassTag
      217     implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
      218     JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
      219   }

      Then in core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala:

        45 class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
        46                        (implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V])
        47   extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {

      The problem is that the type parameter T in JavaRDDLike is Tuple2[K,V], which means the combined signature for groupBy in the JavaPairRDD is

      groupBy[K](f: JFunction[Tuple2[K,V], K])

      which imposes an unfortunate correlation between the Tuple2 and the return type of the grouping function, namely that the return type of the grouping function must be the same as the first type of the JavaPairRDD.

      If we compare the method signature to flatMap:

      105   /**
      106    *  Return a new RDD by first applying a function to all elements of this
      107    *  RDD, and then flattening the results.
      108    */
      109   def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
      110     import scala.collection.JavaConverters._
      111     def fn = (x: T) => f.call(x).asScala
      112     JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
      113   }

      we see there should be an easy fix by changing the type parameter of the groupBy function from K to U.




            alokito Alok Saldanha
            alokito Alok Saldanha
            0 Vote for this issue
            3 Start watching this issue