Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37547

Unexpected NullPointerException when Aggregator.finish returns null

Rank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsAdd voteVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.1.2, 3.2.0
    • None
    • Spark Core
    • None

    Description

      I'm migrating existing code (Java 8) from Spark 2.4 to Spark 3 and I see NullPointerException when an Aggregator returns null in finish method for a custom class.

      I've created simple snippet to repro the issue.

      public class SparkTest {
        public static void main(String[] args) {
          SparkConf conf = new SparkConf().setAppName("name").setMaster("local[*]");
          SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
          List<String> data = Arrays.asList("1", "2", "3");
          Dataset<String> dataset = spark.createDataset(data, Encoders.STRING());
          Dataset<Row> aggDataset = dataset.groupBy("value").agg(new EntityAggregator().toColumn().name("agg"));
          aggDataset.show();
        }
      } 
      public class EntityAggregator extends Aggregator<Row, EntityAgg, EntityAgg> { public EntityAgg zero() { return new EntityAgg(0l); } 
      public EntityAgg reduce(EntityAgg agg, Row row) { return agg; } 
      public EntityAgg merge(EntityAgg e1, EntityAgg e2) { return e1; } 
      public Encoder<EntityAgg> bufferEncoder() { return Encoders.bean(EntityAgg.class); } 
      public Encoder<EntityAgg> outputEncoder() { return Encoders.bean(EntityAgg.class); } 
      public EntityAgg finish(EntityAgg reduction) { return null; } 
      }
      
      public class EntityAgg {
        private long field;
        public EntityAgg() { }
        public EntityAgg(long field) { this.field = field; }
        public long getField() { return field; }
        public void setField(long field) { this.field = field; }
      } 

      Expected behavior is to print table like this

      +-----+----+
      |value| agg|
      +-----+----+
      |    3|null|
      |    1|null|
      |    2|null|
      +-----+----+
      

      This code works fine for 2.4 but fails with the following stacktrace for Spark 3 (I tested for 3.1.2 and 3.2.0)

      Caused by: java.lang.NullPointerException
          at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:49)
          at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:259)
          at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:85)
          at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:32)
          at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
          at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
          at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
          at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
          at org.apache.spark.scheduler.Task.run(Task.scala:131)
          at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
          at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)

      Another observation, that if I change EntityAgg to String in Aggregator then It works fine.

      I've found a test in github that should check for this behavior. https://github.com/apache/spark/blob/branch-3.1/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala#L338 

      I haven't found similar issue so please point me to open ticket if there is any.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            andreiharbunou Andrei

            Dates

              Created:
              Updated:

              Slack

                Issue deployment