Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23731

FileSourceScanExec throws NullPointerException in subexpression elimination

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.2.1, 2.3.0, 2.3.1
    • Fix Version/s: 2.3.2, 2.4.0
    • Component/s: SQL
    • Labels:
      None

      Description

      While working with a SQL with many CASE WHEN and ScalarSubqueries I faced the following exception (in Spark 2.3.0):

      Caused by: java.lang.NullPointerException
        at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:167)
        at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:502)
        at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:158)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:285)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:285)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:285)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:257)
        at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
        at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:358)
        at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
        at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:136)
        at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:132)
        at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.get(HashMap.scala:70)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:54)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:95)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1168)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1168)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1168)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1218)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:310)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:373)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:362)
        at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:161)
        at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
        at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

      I've got the query and the parquet files that lead to the NPE, but cannot disclose it due to NDA. I've been trying to replicate it with different queries, but failed, and after over 3 days of exploring subexpression elimination I gave up.

      The NPE technically is due to @transient relation: HadoopFsRelation that is not re-created on executors, but don't know why this @transient is required in the first place and more importantly how to write a test to reproduce it.

      FYI Disabling subexpression elimination with spark.sql.subexpressionElimination.enabled Spark internal configuration property helps.

        Attachments

          Activity

            People

            • Assignee:
              hyukjin.kwon Hyukjin Kwon
              Reporter:
              jlaskowski Jacek Laskowski
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: