Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37547

Unexpected NullPointerException when Aggregator.finish returns null

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.1.2, 3.2.0
    • None
    • Spark Core
    • None

    Description

      I'm migrating existing code (Java 8) from Spark 2.4 to Spark 3 and I see NullPointerException when an Aggregator returns null in finish method for a custom class.

      I've created simple snippet to repro the issue.

      public class SparkTest {
        public static void main(String[] args) {
          SparkConf conf = new SparkConf().setAppName("name").setMaster("local[*]");
          SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
          List<String> data = Arrays.asList("1", "2", "3");
          Dataset<String> dataset = spark.createDataset(data, Encoders.STRING());
          Dataset<Row> aggDataset = dataset.groupBy("value").agg(new EntityAggregator().toColumn().name("agg"));
          aggDataset.show();
        }
      } 
      public class EntityAggregator extends Aggregator<Row, EntityAgg, EntityAgg> { public EntityAgg zero() { return new EntityAgg(0l); } 
      public EntityAgg reduce(EntityAgg agg, Row row) { return agg; } 
      public EntityAgg merge(EntityAgg e1, EntityAgg e2) { return e1; } 
      public Encoder<EntityAgg> bufferEncoder() { return Encoders.bean(EntityAgg.class); } 
      public Encoder<EntityAgg> outputEncoder() { return Encoders.bean(EntityAgg.class); } 
      public EntityAgg finish(EntityAgg reduction) { return null; } 
      }
      
      public class EntityAgg {
        private long field;
        public EntityAgg() { }
        public EntityAgg(long field) { this.field = field; }
        public long getField() { return field; }
        public void setField(long field) { this.field = field; }
      } 

      Expected behavior is to print table like this

      +-----+----+
      |value| agg|
      +-----+----+
      |    3|null|
      |    1|null|
      |    2|null|
      +-----+----+
      

      This code works fine for 2.4 but fails with the following stacktrace for Spark 3 (I tested for 3.1.2 and 3.2.0)

      Caused by: java.lang.NullPointerException
          at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:49)
          at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:259)
          at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:85)
          at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:32)
          at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
          at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
          at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
          at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
          at org.apache.spark.scheduler.Task.run(Task.scala:131)
          at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
          at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)

      Another observation, that if I change EntityAgg to String in Aggregator then It works fine.

      I've found a test in github that should check for this behavior. https://github.com/apache/spark/blob/branch-3.1/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala#L338 

      I haven't found similar issue so please point me to open ticket if there is any.

      Attachments

        Activity

          People

            Unassigned Unassigned
            andreiharbunou Andrei
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: