Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
3.3.0
-
None
-
None
Description
I used databricks spark-sql-pref + Spark 3.3 to run 3TB TPCDS q24a or q24b, the test code as follows:
val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T" val databaseName = "tpcds_database" val scaleFactor = "3072" val format = "parquet" import com.databricks.spark.sql.perf.tpcds.TPCDSTables val tables = new TPCDSTables( spark.sqlContext,dsdgenDir = "./tpcds-kit/tools", scaleFactor = scaleFactor, useDoubleForDecimal = false,useStringForDate = false) spark.sql(s"create database $databaseName") tables.createTemporaryTables(rootDir, format) spark.sql(s"use $databaseName")// TPCDS 24a or 24b val result = spark.sql(""" with ssales as (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid from store_sales, store_returns, store, item, customer, customer_address where ss_ticket_number = sr_ticket_number and ss_item_sk = sr_item_sk and ss_customer_sk = c_customer_sk and ss_item_sk = i_item_sk and ss_store_sk = s_store_sk and c_birth_country = upper(ca_country) and s_zip = ca_zip and s_market_id = 8 group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size) select c_last_name, c_first_name, s_store_name, sum(netpaid) paid from ssales where i_color = 'pale' group by c_last_name, c_first_name, s_store_name having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect() sc.stop()
The above test may failed due to `Stage cancelled because SparkContext was shut down` of stage 31 and stage 36 when AQE enabled as follows:
The DAG corresponding to sql is as follows:
The details as follows:
== Physical Plan == AdaptiveSparkPlan (42) +- == Final Plan == LocalTableScan (1) +- == Initial Plan == Filter (41) +- HashAggregate (40) +- Exchange (39) +- HashAggregate (38) +- HashAggregate (37) +- Exchange (36) +- HashAggregate (35) +- Project (34) +- BroadcastHashJoin Inner BuildRight (33) :- Project (29) : +- BroadcastHashJoin Inner BuildRight (28) : :- Project (24) : : +- BroadcastHashJoin Inner BuildRight (23) : : :- Project (19) : : : +- BroadcastHashJoin Inner BuildRight (18) : : : :- Project (13) : : : : +- SortMergeJoin Inner (12) : : : : :- Sort (6) : : : : : +- Exchange (5) : : : : : +- Project (4) : : : : : +- Filter (3) : : : : : +- Scan parquet (2) : : : : +- Sort (11) : : : : +- Exchange (10) : : : : +- Project (9) : : : : +- Filter (8) : : : : +- Scan parquet (7) : : : +- BroadcastExchange (17) : : : +- Project (16) : : : +- Filter (15) : : : +- Scan parquet (14) : : +- BroadcastExchange (22) : : +- Filter (21) : : +- Scan parquet (20) : +- BroadcastExchange (27) : +- Filter (26) : +- Scan parquet (25) +- BroadcastExchange (32) +- Filter (31) +- Scan parquet (30) (1) LocalTableScan Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] (2) Scan parquet Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales] PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)] ReadSchema: struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)> (3) Filter Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] Condition : (((isnotnull(ss_ticket_number#138L) AND isnotnull(ss_item_sk#131)) AND isnotnull(ss_store_sk#136)) AND isnotnull(ss_customer_sk#132)) (4) Project Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149] Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] (5) Exchange Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149] Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), ENSURE_REQUIREMENTS, [id=#309] (6) Sort Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149] Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS FIRST], false, 0 (7) Scan parquet Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns] PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)] ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint> (8) Filter Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177)) (9) Project Output [2]: [sr_item_sk#177, sr_ticket_number#184L] Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] (10) Exchange Input [2]: [sr_item_sk#177, sr_ticket_number#184L] Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), ENSURE_REQUIREMENTS, [id=#310] (11) Sort Input [2]: [sr_item_sk#177, sr_ticket_number#184L] Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS FIRST], false, 0 (12) SortMergeJoin Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131] Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177] Join condition: None (13) Project Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149] Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L] (14) Scan parquet Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store] PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), IsNotNull(s_store_sk), IsNotNull(s_zip)] ReadSchema: struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string> (15) Filter Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689] Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689)) (16) Project Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689] (17) BroadcastExchange Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#316] (18) BroadcastHashJoin Left keys [1]: [ss_store_sk#136] Right keys [1]: [s_store_sk#664] Join condition: None (19) Project Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689] Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] (20) Scan parquet Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item] PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), IsNotNull(i_item_sk)] ReadSchema: struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int> (21) Filter Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND isnotnull(i_item_sk#564)) (22) BroadcastExchange Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#320] (23) BroadcastHashJoin Left keys [1]: [ss_item_sk#131] Right keys [1]: [i_item_sk#564] Join condition: None (24) Project Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] (25) Scan parquet Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer] PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)] ReadSchema: struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string> (26) Filter Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426)) (27) BroadcastExchange Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#324] (28) BroadcastHashJoin Left keys [1]: [ss_customer_sk#132] Right keys [1]: [c_customer_sk#412] Join condition: None (29) Project Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426] Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] (30) Scan parquet Output [3]: [ca_state#456, ca_zip#457, ca_country#458] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address] PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)] ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string> (31) Filter Input [3]: [ca_state#456, ca_zip#457, ca_country#458] Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457)) (32) BroadcastExchange Input [3]: [ca_state#456, ca_zip#457, ca_country#458] Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), input[1, string, false]),false), [id=#328] (33) BroadcastHashJoin Left keys [2]: [c_birth_country#426, s_zip#689] Right keys [2]: [upper(ca_country#458), ca_zip#457] Join condition: None (34) Project Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456] Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, ca_zip#457, ca_country#458] (35) HashAggregate Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456] Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579] Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))] Aggregate Attributes [1]: [sum#870L] Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L] (36) Exchange Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L] Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, 300), ENSURE_REQUIREMENTS, [id=#333] (37) HashAggregate Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L] Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579] Functions [1]: [sum(UnscaledValue(ss_net_paid#149))] Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L] Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852] (38) HashAggregate Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852] Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669] Functions [1]: [partial_sum(netpaid#852)] Aggregate Attributes [2]: [sum#866, isEmpty#867] Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869] (39) Exchange Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869] Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337] (40) HashAggregate Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869] Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669] Functions [1]: [sum(netpaid#852)] Aggregate Attributes [1]: [sum(netpaid#852)#854] Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, sum(netpaid#852)#854 AS paid#850] (41) Filter Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > cast(Subquery subquery#851, [id=#294] as decimal(33,8)))) (42) AdaptiveSparkPlan Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] Arguments: isFinalPlan=true
And I manually revert SPARK-35442, the problem no longer exists.
The DAG corresponding to sql is as follows:
The details as follows:
Attachments
Issue Links
- relates to
-
SPARK-40834 Use SparkListenerSQLExecutionEnd to track final SQL status in UI
- Resolved