Description
To reproduce:
import org.apache.spark.sql.catalyst.plans.logical._
def range(i: Int) = Range(1, i, 1, 1)
val union = Union(Stream(range(3), range(5), range(7)))
spark.sessionState.planner.plan(union).next().execute()
produces
java.lang.UnsupportedOperationException at org.apache.spark.sql.execution.PlanLater.doExecute(SparkStrategies.scala:55) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
The SparkPlan looks like this:
:- Range (1, 3, step=1, splits=1) :- PlanLater Range (1, 5, step=1, splits=Some(1)) +- PlanLater Range (1, 7, step=1, splits=Some(1))
So not all of it was planned (some PlanLater still in there).
This appears to be a longstanding issue.
I traced it to the use of var in TreeNode.
For example in mapChildren:
case args: Traversable[_] => args.map { case arg: TreeNode[_] if containsChild(arg) => val newChild = f(arg.asInstanceOf[BaseType]) if (!(newChild fastEquals arg)) { changed = true
If args is a Stream then changed will never be set here, ultimately causing the method to return the original plan.
Attachments
Issue Links
- relates to
-
SPARK-33260 SortExec produces incorrect results if sortOrder is a Stream
- Resolved
- links to