Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
3.0.1, 3.1.0, 3.2.0
-
None
-
None
Description
This ticket aims at fixing the bug that throws a unsupported exception when running the TPCDS q5 with AQE enabled (this option is enabled by default now):
java.lang.UnsupportedOperationException: BroadcastExchange does not support the execute() code path. at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecute(BroadcastExchangeExec.scala:189) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.exchange.ReusedExchangeExec.doExecute(Exchange.scala:60) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecute(QueryStageExec.scala:115) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321) at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:397) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:118) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ...
I've checked the AQE code and I found `EnsureRequirements` wrongly puts `BroadcastExchange` on a top of `BroadcastQueryStage` in the `reOptimize` phase as follows:
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#2183] +- BroadcastQueryStage 2 +- ReusedExchange [d_date_sk#1086], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#1963]
A root cause is that a `Cast` class in a required child's distribution does not have a `timeZoneId` field (`timeZoneId=None`), and a `Cast` class in `child.outputPartitioning` has it. So, this difference can make the distribution requirement check fail in `EnsureRequirements`:
https://github.com/apache/spark/blob/1e85707738a830d33598ca267a6740b3f06b1861/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L47-L50
Attachments
Issue Links
- duplicates
-
SPARK-33822 TPCDS Q5 fails if spark.sql.adaptive.enabled=true
- Resolved
- links to