Description
Currently, the optimizer will break on the following code:
val schema = StructType(Seq( StructField("colA", StringType, true), StructField("colB", IntegerType, true) )) var df = sqlContext.sparkSession.createDataFrame(new util.ArrayList[Row](), schema) val w = Window.partitionBy("colA") df = df.withColumn("col1", sum("colB").over(w)) df = df.withColumn("col3", sum("colB").over(w)) df = df.withColumn("col4", sum("col3").over(w)) df = df.withColumn("col2", sum("col1").over(w)) df = df.select("col2") df.explain(true)
with the following stacktrace:
next on empty iterator java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63) at scala.collection.IterableLike$class.head(IterableLike.scala:107) at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48) at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126) at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48) at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$$anonfun$apply$15.applyOrElse(Optimizer.scala:803) at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$$anonfun$apply$15.applyOrElse(Optimizer.scala:798) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:282) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:282) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:281) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:330) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:328) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:330) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:328) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:330) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:328) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$.apply(Optimizer.scala:798) at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$.apply(Optimizer.scala:797) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:109) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:106) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:106) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:98) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:98) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$executeAndTrack$1.apply(RuleExecutor.scala:77) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$executeAndTrack$1.apply(RuleExecutor.scala:77) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:76) at org.apache.spark.sql.execution.QueryExecution$$anonfun$optimizedPlan$1.apply(QueryExecution.scala:74) at org.apache.spark.sql.execution.QueryExecution$$anonfun$optimizedPlan$1.apply(QueryExecution.scala:74) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution$$anonfun$org$apache$spark$sql$execution$QueryExecution$$writePlans$3.apply(QueryExecution.scala:135) at org.apache.spark.sql.execution.QueryExecution$$anonfun$org$apache$spark$sql$execution$QueryExecution$$writePlans$3.apply(QueryExecution.scala:135) at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:316) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:135) at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:142) at org.apache.spark.sql.execution.command.ExplainCommand.run(commands.scala:168) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset.explain(Dataset.scala:492) at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2$Tests$1.test(TestSparkSessionSuite.scala:51) at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2.apply$mcV$sp(TestSparkSessionSuite.scala:56) at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2.apply(TestSparkSessionSuite.scala:35) at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2.apply(TestSparkSessionSuite.scala:35) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196) at org.scalatest.FunSuite.runTest(FunSuite.scala:1560) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384) at scala.collection.immutable.List.foreach(List.scala:392) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461) at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229) at org.scalatest.FunSuite.runTests(FunSuite.scala:1560) at org.scalatest.Suite$class.run(Suite.scala:1147) at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233) at org.scalatest.SuperEngine.runImpl(Engine.scala:521) at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:53) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210) at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:53) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334) at scala.collection.immutable.List.foreach(List.scala:392) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334) at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011) at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010) at org.scalatest.tools.Runner$.run(Runner.scala:850) at org.scalatest.tools.Runner.run(Runner.scala) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
We believe this has to do with the removal of this https://github.com/apache/spark/pull/23343/files#diff-a636a87d8843eeccca90140be91d4fafL606 from the ColumnPruning rule to the RemoveNoopOperators rule, which gets applied later. This means that you're not removing empty Window expression nodes before you run the CollapseWindow rule, which would result in empty window expressions when applying the CollapseWindow rule.
There's a few ways we could go about solving this:
- Check for whether the windowExpressions is empty in the CollapseWindow rule. If it's empty, don't collapse the windows, and let the later RemoveNoopOperators rule remove it.
- Add the removal of empty window expressions back into ColumnPruning
- Move the RemoveNoopOperators rule after ColumnPruning and before CollapseWindow.
#2 feels like a hack, and I'm don't have enough familiarity with the Optimizer to understand all the implications of reordering rules.
Attachments
Issue Links
- relates to
-
SPARK-26390 ColumnPruning rule should only do column pruning
- Resolved
- links to