Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18711

NPE in generated SpecificMutableProjection for Aggregator

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.1.0
    • 2.1.0
    • SQL
    • None

    Description

      this is a bug in the branch-2.1, but i don't think it was in 2.1.0-rc1
      code (contrived, but based on real code we run):

        case class Holder(i: Int)
      
        val agg1 = new Aggregator[Int, Tuple1[Option[Holder]], Seq[(String, Int, Int)]] {
          def zero: Tuple1[Option[Holder]] = {
            val x = Tuple1(None)
            println(s"zero ${x}")
            x
          }
      
          def reduce(b: Tuple1[Option[Holder]], a: Int): Tuple1[Option[Holder]] = {
            println(s"reduce ${b} ${a}")
            Tuple1(Some(Holder(b._1.map(_.i + a).getOrElse(a))))
          }
      
          def merge(b1: Tuple1[Option[Holder]], b2: Tuple1[Option[Holder]]): Tuple1[Option[Holder]] = {
            println(s"merge ${b1} ${b2}")
            (b1._1, b2._1) match {
              case (Some(Holder(i1)), Some(Holder(i2))) => Tuple1(Some(Holder(i1 + i2)))
              case (Some(Holder(i1)), _) => Tuple1(Some(Holder(i1)))
              case (_, Some(Holder(i2))) => Tuple1(Some(Holder(i2)))
              case _ => Tuple1(None)
            }
          }
      
          def finish(reduction: Tuple1[Option[Holder]]): Seq[(String, Int, Int)] = {
            println(s"finish ${reduction}")
            Seq(("ha", reduction._1.get.i, 0))
          }
      
          def bufferEncoder: Encoder[Tuple1[Option[Holder]]] = ExpressionEncoder[Tuple1[Option[Holder]]]()
      
          def outputEncoder: Encoder[Seq[(String, Int, Int)]] = ExpressionEncoder[Seq[(String, Int, Int)]]()
        }
      
        val x = Seq(("a", 1), ("a", 2))
          .toDS
          .groupByKey(_._1)
          .mapValues(_._2)
          .agg(agg1.toColumn)
        x.printSchema
        x.show
      

      result is:

      org.apache.spark.executor.Executor: Exception in task 1.0 in stage 146.0 (TID 423)
      java.lang.NullPointerException
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply_0$(Unknown Source)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source)
      	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:223)
      	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:221)
      	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:159)
      	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      	at org.apache.spark.scheduler.Task.run(Task.scala:99)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      

      the error seems to be in the code generation for the aggregator result.

      Attachments

        Activity

          People

            cloud_fan Wenchen Fan
            koert koert kuipers
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: