Description
this is a bug in the branch-2.1, but i don't think it was in 2.1.0-rc1
code (contrived, but based on real code we run):
case class Holder(i: Int) val agg1 = new Aggregator[Int, Tuple1[Option[Holder]], Seq[(String, Int, Int)]] { def zero: Tuple1[Option[Holder]] = { val x = Tuple1(None) println(s"zero ${x}") x } def reduce(b: Tuple1[Option[Holder]], a: Int): Tuple1[Option[Holder]] = { println(s"reduce ${b} ${a}") Tuple1(Some(Holder(b._1.map(_.i + a).getOrElse(a)))) } def merge(b1: Tuple1[Option[Holder]], b2: Tuple1[Option[Holder]]): Tuple1[Option[Holder]] = { println(s"merge ${b1} ${b2}") (b1._1, b2._1) match { case (Some(Holder(i1)), Some(Holder(i2))) => Tuple1(Some(Holder(i1 + i2))) case (Some(Holder(i1)), _) => Tuple1(Some(Holder(i1))) case (_, Some(Holder(i2))) => Tuple1(Some(Holder(i2))) case _ => Tuple1(None) } } def finish(reduction: Tuple1[Option[Holder]]): Seq[(String, Int, Int)] = { println(s"finish ${reduction}") Seq(("ha", reduction._1.get.i, 0)) } def bufferEncoder: Encoder[Tuple1[Option[Holder]]] = ExpressionEncoder[Tuple1[Option[Holder]]]() def outputEncoder: Encoder[Seq[(String, Int, Int)]] = ExpressionEncoder[Seq[(String, Int, Int)]]() } val x = Seq(("a", 1), ("a", 2)) .toDS .groupByKey(_._1) .mapValues(_._2) .agg(agg1.toColumn) x.printSchema x.show
result is:
org.apache.spark.executor.Executor: Exception in task 1.0 in stage 146.0 (TID 423) java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:223) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:221) at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:159) at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
the error seems to be in the code generation for the aggregator result.