Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37063 SQL Adaptive Query Execution QA: Phase 2
  3. SPARK-38030

Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.1
    • 3.3.0, 3.1.4, 3.2.2
    • SQL
    • None

    Description

      One of our user queries failed in Spark 3.1.1 when using AQE with the following stacktrace mentioned below (some parts of the plan have been redacted, but the structure is preserved).

      Debugging this issue, we found that the failure was within AQE calling QueryPlan.canonicalized.

      The query contains a cast over a column with non-nullable struct fields. Canonicalization removes nullability information from the child AttributeReference of the Cast, however it does not remove nullability information from the Cast's target dataType. This causes the checkInputDataTypes to return false because the child is now nullable and cast target data type is not, leading to resolved=false and hence the UnresolvedException.

      org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
      Exchange RoundRobinPartitioning(1), REPARTITION_BY_NUM, [id=#232]
      +- Union
         :- Project [cast(columnA#30) as struct<...>]
         :  +- BatchScan[columnA#30] hive.tbl 
         +- Project [cast(columnA#35) as struct<...>]
            +- BatchScan[columnA#35] hive.tbl
      
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:475)
        at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:464)
        at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:87)
        at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:58)
        at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:301)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:405)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:404)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:184)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:179)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:279)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:825)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:784)
        ... 85 elided
      Caused by: java.lang.reflect.InvocationTargetException: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to nullable on unresolved object, tree: '
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:508)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:77)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:507)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        ... 127 more
      Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to nullable on unresolved object, tree: '
        at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.nullable(unresolved.scala:150)
        at org.apache.spark.sql.execution.UnionExec.$anonfun$output$5(basicPhysicalOperators.scala:655)
        at org.apache.spark.sql.execution.UnionExec.$anonfun$output$5$adapted(basicPhysicalOperators.scala:655)
        at scala.collection.LinearSeqOptimized.exists(LinearSeqOptimized.scala:95)
        at scala.collection.LinearSeqOptimized.exists$(LinearSeqOptimized.scala:92)
        at scala.collection.immutable.List.exists(List.scala:89)
        at org.apache.spark.sql.execution.UnionExec.$anonfun$output$4(basicPhysicalOperators.scala:655)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.execution.UnionExec.output(basicPhysicalOperators.scala:653)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.<init>(ShuffleExchangeExec.scala:121)
        ... 135 more
      

      Repro in Spark 3.1.1

        test("SPARK-XXXXX: Repro: Query with cast containing non-nullable columns fails with AQE") {
          import scala.collection.JavaConverters._
          withSQLConf(
            SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
              (ConvertToLocalRelation.ruleName + "," + PropagateEmptyRelation.ruleName),
            SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
            val nameType = StructType(Seq(StructField("firstName", StringType, nullable = false)))
            val schema = StructType(Seq(StructField("name", nameType, nullable = false)))
            // change column name from firstName to fname, should be irrelevant for Cast
            val newNameType = StructType(Seq(StructField("fname", StringType, nullable = false)))
      
            val df = spark.createDataFrame(List.empty[Row].asJava, schema)
            val df1 = df.withColumn("newName", 'name.cast(newNameType))
            val df2 = df1.union(df1).repartition(1)
            df2.show()
          }
        }
      

      I believe that during canonicalization, the nullability hints should also be removed from the target data type of Cast so that they match with the child's canonicalized representation.

      --------

      This exact issue is not reproducible in master. This is because the code which would previously trigger access on an unresolved object is now lazy and hence does not trigger the issue. However the root cause is still present in master, and some other codepath which depends on canonicalized representations can trigger the same issue, although I haven't been able to come up with a good example yet.

      Attachments

        Activity

          People

            shardulm Shardul Mahadik
            shardulm Shardul Mahadik
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: