Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-33828 SQL Adaptive Query Execution QA
  3. SPARK-33822

TPCDS Q5 fails if spark.sql.adaptive.enabled=true

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 3.0.0, 3.0.1, 3.1.0, 3.2.0
    • 3.0.2, 3.1.0
    • SQL

    Description

      *PROBLEM STATEMENT*

      >>> tables = ['call_center', 'catalog_page', 'catalog_returns', 'catalog_sales', 'customer', 'customer_address', 'customer_demographics', 'date_dim', 'household_demographics', 'income_band', 'inventory', 'item', 'promotion', 'reason', 'ship_mode', 'store', 'store_returns', 'store_sales', 'time_dim', 'warehouse', 'web_page', 'web_returns', 'web_sales', 'web_site']
      
      >>> for t in tables:
      ...     spark.sql("CREATE TABLE %s USING PARQUET LOCATION '/Users/dongjoon/data/10g/%s'" % (t, t))
      
      >>> spark.sql(spark.sparkContext.wholeTextFiles("/Users/dongjoon/data/query/q5.sql").take(1)[0][1]).show(10000)
      +---------------+--------------------+-------------+-----------+-------------+
      |        channel|                  id|        sales|    returns|       profit|
      +---------------+--------------------+-------------+-----------+-------------+
      |           null|                null|1143646603.07|30617460.71|-317540732.87|
      |catalog channel|                null| 393609478.06| 9451732.79| -44801262.72|
      |catalog channel|catalog_pageAAAAA...|         0.00|   39037.48|    -25330.29|
      ...
      +---------------+--------------------+-------------+-----------+-------------+
      
      >>> sql("set spark.sql.adaptive.enabled=true")
      
      >>> spark.sql(spark.sparkContext.wholeTextFiles("/Users/dongjoon/data/query/q5.sql").take(1)[0][1]).show(10000)
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File "/Users/dongjoon/APACHE/spark-release/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/dataframe.py", line 440, in show
          print(self._jdf.showString(n, 20, vertical))
        File "/Users/dongjoon/APACHE/spark-release/spark-3.0.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
        File "/Users/dongjoon/APACHE/spark-release/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/utils.py", line 128, in deco
          return f(*a, **kw)
        File "/Users/dongjoon/APACHE/spark-release/spark-3.0.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
      py4j.protocol.Py4JJavaError: An error occurred while calling o160.showString.
      : java.lang.UnsupportedOperationException: BroadcastExchange does not support the execute() code path.
      	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecute(BroadcastExchangeExec.scala:190)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
      	at org.apache.spark.sql.execution.exchange.ReusedExchangeExec.doExecute(Exchange.scala:61)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
      	at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecute(QueryStageExec.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
      	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316)
      	at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:392)
      	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:120)
      	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:182)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      

      *maropu's Analysis*
      This ticket aims at fixing the bug that throws a unsupported exception when running the TPCDS q5 with AQE enabled (this option is enabled by default now):

      java.lang.UnsupportedOperationException: BroadcastExchange does not support the execute() code path.
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecute(BroadcastExchangeExec.scala:189)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.exchange.ReusedExchangeExec.doExecute(Exchange.scala:60)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecute(QueryStageExec.scala:115)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321)
        at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:397)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:118)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        ...
      

      I've checked the AQE code and I found `EnsureRequirements` wrongly puts `BroadcastExchange` on a top of `BroadcastQueryStage` in the `reOptimize` phase as follows:

      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#2183]
        +- BroadcastQueryStage 2
          +- ReusedExchange [d_date_sk#1086], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#1963]
      

      A root cause is that a `Cast` class in a required child's distribution does not have a `timeZoneId` field (`timeZoneId=None`), and a `Cast` class in `child.outputPartitioning` has it. So, this difference can make the distribution requirement check fail in `EnsureRequirements`:
      https://github.com/apache/spark/blob/1e85707738a830d33598ca267a6740b3f06b1861/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L47-L50

      Attachments

        Issue Links

          Activity

            People

              maropu Takeshi Yamamuro
              dongjoon Dongjoon Hyun
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: