Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19471

A confusing NullPointerException when creating table

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 2.1.0
    • Fix Version/s: 2.3.0
    • Component/s: SQL
    • Labels:
      None

      Description

      After upgrading our Spark from 1.6.2 to 2.1.0, I encounter a confusing NullPointerException when creating table under Spark 2.1.0, but the problem does not exists in Spark 1.6.1.

      Environment: Hive 1.2.1, Hadoop 2.6.4

      ==================== Code ==================== 
      // spark is an instance of HiveContext 
      // merge is a Hive UDF 
      val df = spark.sql("SELECT merge(field_a, null) AS new_a, field_b AS new_b FROM tb_1 group by field_a, field_b") 
      df.createTempView("tb_temp") 
      spark.sql("create table tb_result stored as parquet as " + 
        "SELECT new_a" + 
        "FROM tb_temp" + 
        "LEFT JOIN `tb_2` ON " + 
        "if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL), concat('GrLSRwZE_', cast((rand() * 200) AS int)), (`tb_temp`.`new_b`)) = `tb_2`.`fka6862f17`") 
      
      ==================== Physical Plan ==================== 
      *Project [new_a] 
      +- *BroadcastHashJoin [if (((new_b = ) || isnull(new_b))) concat(GrLSRwZE_, cast(cast((_nondeterministic * 200.0) as int) as string)) else new_b], [fka6862f17], LeftOuter, BuildRight 
         :- HashAggregate(keys=[field_a, field_b], functions=[], output=[new_a, new_b, _nondeterministic]) 
         :  +- Exchange(coordinator ) hashpartitioning(field_a, field_b, 180), coordinator[target post-shuffle partition size: 1024880] 
         :     +- *HashAggregate(keys=[field_a, field_b], functions=[], output=[field_a, field_b]) 
         :        +- *FileScan parquet bdp.tb_1[field_a,field_b] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct 
         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) 
            +- *Project [fka6862f17] 
               +- *FileScan parquet bdp.tb_2[fka6862f17] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct 
      
      What does '*' mean before HashAggregate? 
      
      ==================== Exception ==================== 
      org.apache.spark.SparkException: Task failed while writing rows 
      ... 
      java.lang.NullPointerException 
              at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_2$(Unknown Source) 
              at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
              at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$3.apply(AggregationIterator.scala:260) 
              at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$3.apply(AggregationIterator.scala:259) 
              at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:392) 
              at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79) 
              at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
              at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
              at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
              at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:252) 
              at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:199) 
              at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:197) 
              at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) 
              at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:202) 
              at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$4.apply(FileFormatWriter.scala:138) 
              at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$4.apply(FileFormatWriter.scala:137) 
              at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
              at org.apache.spark.scheduler.Task.run(Task.scala:99) 
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
              at java.lang.Thread.run(Thread.java:745) 
      

      I also found that when I changed my code as follow:

      spark.sql("create table tb_result stored as parquet as " + 
        "SELECT new_b" + 
        "FROM tb_temp" + 
        "LEFT JOIN `tb_2` ON " + 
        "if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL), concat('GrLSRwZE_', cast((rand() * 200) AS int)), (`tb_temp`.`new_b`)) = `tb_2`.`fka6862f17`") 
      
      or 
      
      spark.sql("create table tb_result stored as parquet as " + 
        "SELECT new_a" + 
        "FROM tb_temp" + 
        "LEFT JOIN `tb_2` ON " + 
        "if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL), concat('GrLSRwZE_', cast((200) AS int)), (`tb_temp`.`new_b`)) = `tb_2`.`fka6862f17`") 
      
      will not have this problem. 
      
      == Physical Plan of select new_b ... == 
      *Project [new_b] 
      +- *BroadcastHashJoin [if (((new_b = ) || isnull(new_b))) concat(GrLSRwZE_, cast(cast((_nondeterministic * 200.0) as int) as string)) else new_b], [fka6862f17], LeftOuter, BuildRight 
         :- *HashAggregate(keys=[field_a, field_b], functions=[], output=[new_b, _nondeterministic]) 
         :  +- Exchange(coordinator ) hashpartitioning(field_a, field_b, 180), coordinator[target post-shuffle partition size: 1024880] 
         :     +- *HashAggregate(keys=[field_a, field_b], functions=[], output=[field_a, field_b]) 
         :        +- *FileScan parquet bdp.tb_1[field_a,field_b] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct 
         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) 
            +- *Project [fka6862f17] 
               +- *FileScan parquet bdp.tb_2[fka6862f17] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct 
      

      Difference is `HashAggregate(keys=[field_a, field_b], functions=[], output=[new_b, _nondeterministic])` has a '*' char before it.

      It looks like something wrong with WholeStageCodegen when combine HiveUDF + rand() + group by + join.

        Attachments

          Activity

            People

            • Assignee:
              donnyzone Feng Zhu
              Reporter:
              stanzhai StanZhai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: