Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27514

Empty window expression results in error in optimizer

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.0.0
    • SQL
    • None

    Description

      Currently, the optimizer will break on the following code:

      val schema = StructType(Seq(
        StructField("colA", StringType, true),
        StructField("colB", IntegerType, true)
      ))
      
      var df = sqlContext.sparkSession.createDataFrame(new util.ArrayList[Row](), schema)
      val w = Window.partitionBy("colA")
      df = df.withColumn("col1", sum("colB").over(w))
      df = df.withColumn("col3", sum("colB").over(w))
      df = df.withColumn("col4", sum("col3").over(w))
      df = df.withColumn("col2", sum("col1").over(w))
      df = df.select("col2")
      df.explain(true)
      

      with the following stacktrace:

      next on empty iterator
      java.util.NoSuchElementException: next on empty iterator
      at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
      at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
      at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
      at scala.collection.IterableLike$class.head(IterableLike.scala:107)
      at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)
      at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
      at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)
      at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$$anonfun$apply$15.applyOrElse(Optimizer.scala:803)
      at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$$anonfun$apply$15.applyOrElse(Optimizer.scala:798)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:282)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:282)
      at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:281)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
      at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:330)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:191)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:328)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
      at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:330)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:191)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:328)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
      at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:330)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:191)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:328)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
      at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
      at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$.apply(Optimizer.scala:798)
      at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$.apply(Optimizer.scala:797)
      at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:109)
      at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:106)
      at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
      at scala.collection.immutable.List.foldLeft(List.scala:84)
      at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:106)
      at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:98)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:98)
      at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$executeAndTrack$1.apply(RuleExecutor.scala:77)
      at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$executeAndTrack$1.apply(RuleExecutor.scala:77)
      at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
      at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:76)
      at org.apache.spark.sql.execution.QueryExecution$$anonfun$optimizedPlan$1.apply(QueryExecution.scala:74)
      at org.apache.spark.sql.execution.QueryExecution$$anonfun$optimizedPlan$1.apply(QueryExecution.scala:74)
      at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
      at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
      at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
      at org.apache.spark.sql.execution.QueryExecution$$anonfun$org$apache$spark$sql$execution$QueryExecution$$writePlans$3.apply(QueryExecution.scala:135)
      at org.apache.spark.sql.execution.QueryExecution$$anonfun$org$apache$spark$sql$execution$QueryExecution$$writePlans$3.apply(QueryExecution.scala:135)
      at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:316)
      at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:135)
      at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:142)
      at org.apache.spark.sql.execution.command.ExplainCommand.run(commands.scala:168)
      at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
      at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
      at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
      at org.apache.spark.sql.Dataset.explain(Dataset.scala:492)
      at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2$Tests$1.test(TestSparkSessionSuite.scala:51)
      at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2.apply$mcV$sp(TestSparkSessionSuite.scala:56)
      at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2.apply(TestSparkSessionSuite.scala:35)
      at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2.apply(TestSparkSessionSuite.scala:35)
      at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
      at org.scalatest.Transformer.apply(Transformer.scala:22)
      at org.scalatest.Transformer.apply(Transformer.scala:20)
      at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
      at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104)
      at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
      at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
      at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
      at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
      at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
      at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
      at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
      at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
      at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
      at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
      at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
      at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
      at org.scalatest.Suite$class.run(Suite.scala:1147)
      at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
      at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
      at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
      at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
      at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
      at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:53)
      at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
      at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
      at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:53)
      at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
      at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
      at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
      at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
      at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
      at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
      at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
      at org.scalatest.tools.Runner$.run(Runner.scala:850)
      at org.scalatest.tools.Runner.run(Runner.scala)
      at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
      at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)

      We believe this has to do with the removal of this https://github.com/apache/spark/pull/23343/files#diff-a636a87d8843eeccca90140be91d4fafL606 from the ColumnPruning rule to the RemoveNoopOperators rule, which gets applied later. This means that you're not removing empty Window expression nodes before you run the CollapseWindow rule, which would result in empty window expressions when applying the CollapseWindow rule.

      There's a few ways we could go about solving this:

      1. Check for whether the windowExpressions is empty in the CollapseWindow rule. If it's empty, don't collapse the windows, and let the later RemoveNoopOperators rule remove it.
      2. Add the removal of empty window expressions back into ColumnPruning
      3. Move the RemoveNoopOperators rule after ColumnPruning and before CollapseWindow.

      #2 feels like a hack, and I'm don't have enough familiarity with the Optimizer to understand all the implications of reordering rules.

       

      Attachments

        Issue Links

          Activity

            People

              yifeih Yifei Huang
              yifeih Yifei Huang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: