Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40278

Used databricks spark-sql-pref with Spark 3.3 to run 3TB tpcds test failed

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 3.3.0
    • None
    • SQL
    • None

    Description

      I used databricks spark-sql-pref + Spark 3.3 to run 3TB TPCDS q24a or q24b, the test code as follows:

      val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T"
      val databaseName = "tpcds_database"
      val scaleFactor = "3072"
      val format = "parquet"
      import com.databricks.spark.sql.perf.tpcds.TPCDSTables
      val tables = new TPCDSTables(
            spark.sqlContext,dsdgenDir = "./tpcds-kit/tools",
            scaleFactor = scaleFactor,
            useDoubleForDecimal = false,useStringForDate = false)
      spark.sql(s"create database $databaseName")
      tables.createTemporaryTables(rootDir, format)
      spark.sql(s"use $databaseName")// TPCDS 24a or 24b
      val result = spark.sql(""" with ssales as
       (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
              i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid
       from store_sales, store_returns, store, item, customer, customer_address
       where ss_ticket_number = sr_ticket_number
         and ss_item_sk = sr_item_sk
         and ss_customer_sk = c_customer_sk
         and ss_item_sk = i_item_sk
         and ss_store_sk = s_store_sk
         and c_birth_country = upper(ca_country)
         and s_zip = ca_zip
       and s_market_id = 8
       group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
                i_current_price, i_manager_id, i_units, i_size)
       select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
       from ssales
       where i_color = 'pale'
       group by c_last_name, c_first_name, s_store_name
       having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect()
       sc.stop() 

      The above test may failed due to `Stage cancelled because SparkContext was shut down` of stage 31 and stage 36 when AQE enabled as follows:

       

       

      The DAG corresponding to sql is as follows:

      The details as follows:

       

       

      == Physical Plan ==
      AdaptiveSparkPlan (42)
      +- == Final Plan ==
         LocalTableScan (1)
      +- == Initial Plan ==
         Filter (41)
         +- HashAggregate (40)
            +- Exchange (39)
               +- HashAggregate (38)
                  +- HashAggregate (37)
                     +- Exchange (36)
                        +- HashAggregate (35)
                           +- Project (34)
                              +- BroadcastHashJoin Inner BuildRight (33)
                                 :- Project (29)
                                 :  +- BroadcastHashJoin Inner BuildRight (28)
                                 :     :- Project (24)
                                 :     :  +- BroadcastHashJoin Inner BuildRight (23)
                                 :     :     :- Project (19)
                                 :     :     :  +- BroadcastHashJoin Inner BuildRight (18)
                                 :     :     :     :- Project (13)
                                 :     :     :     :  +- SortMergeJoin Inner (12)
                                 :     :     :     :     :- Sort (6)
                                 :     :     :     :     :  +- Exchange (5)
                                 :     :     :     :     :     +- Project (4)
                                 :     :     :     :     :        +- Filter (3)
                                 :     :     :     :     :           +- Scan parquet  (2)
                                 :     :     :     :     +- Sort (11)
                                 :     :     :     :        +- Exchange (10)
                                 :     :     :     :           +- Project (9)
                                 :     :     :     :              +- Filter (8)
                                 :     :     :     :                 +- Scan parquet  (7)
                                 :     :     :     +- BroadcastExchange (17)
                                 :     :     :        +- Project (16)
                                 :     :     :           +- Filter (15)
                                 :     :     :              +- Scan parquet  (14)
                                 :     :     +- BroadcastExchange (22)
                                 :     :        +- Filter (21)
                                 :     :           +- Scan parquet  (20)
                                 :     +- BroadcastExchange (27)
                                 :        +- Filter (26)
                                 :           +- Scan parquet  (25)
                                 +- BroadcastExchange (32)
                                    +- Filter (31)
                                       +- Scan parquet  (30)
      
      
      (1) LocalTableScan
      Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
      Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
      
      (2) Scan parquet 
      Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
      Batched: true
      Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales]
      PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)]
      ReadSchema: struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)>
      
      (3) Filter
      Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
      Condition : (((isnotnull(ss_ticket_number#138L) AND isnotnull(ss_item_sk#131)) AND isnotnull(ss_store_sk#136)) AND isnotnull(ss_customer_sk#132))
      
      (4) Project
      Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
      Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
      
      (5) Exchange
      Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
      Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), ENSURE_REQUIREMENTS, [id=#309]
      
      (6) Sort
      Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
      Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS FIRST], false, 0
      
      (7) Scan parquet 
      Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
      Batched: true
      Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns]
      PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)]
      ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint>
      
      (8) Filter
      Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
      Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177))
      
      (9) Project
      Output [2]: [sr_item_sk#177, sr_ticket_number#184L]
      Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
      
      (10) Exchange
      Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
      Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), ENSURE_REQUIREMENTS, [id=#310]
      
      (11) Sort
      Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
      Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS FIRST], false, 0
      
      (12) SortMergeJoin
      Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131]
      Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177]
      Join condition: None
      
      (13) Project
      Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149]
      Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L]
      
      (14) Scan parquet 
      Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
      Batched: true
      Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store]
      PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), IsNotNull(s_store_sk), IsNotNull(s_zip)]
      ReadSchema: struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string>
      
      (15) Filter
      Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
      Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689))
      
      (16) Project
      Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
      Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
      
      (17) BroadcastExchange
      Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
      Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#316]
      
      (18) BroadcastHashJoin
      Left keys [1]: [ss_store_sk#136]
      Right keys [1]: [s_store_sk#664]
      Join condition: None
      
      (19) Project
      Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689]
      Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
      
      (20) Scan parquet 
      Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
      Batched: true
      Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item]
      PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), IsNotNull(i_item_sk)]
      ReadSchema: struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int>
      
      (21) Filter
      Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
      Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND isnotnull(i_item_sk#564))
      
      (22) BroadcastExchange
      Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
      Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#320]
      
      (23) BroadcastHashJoin
      Left keys [1]: [ss_item_sk#131]
      Right keys [1]: [i_item_sk#564]
      Join condition: None
      
      (24) Project
      Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
      Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
      
      (25) Scan parquet 
      Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
      Batched: true
      Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer]
      PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)]
      ReadSchema: struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string>
      
      (26) Filter
      Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
      Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426))
      
      (27) BroadcastExchange
      Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
      Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#324]
      
      (28) BroadcastHashJoin
      Left keys [1]: [ss_customer_sk#132]
      Right keys [1]: [c_customer_sk#412]
      Join condition: None
      
      (29) Project
      Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426]
      Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
      
      (30) Scan parquet 
      Output [3]: [ca_state#456, ca_zip#457, ca_country#458]
      Batched: true
      Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address]
      PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)]
      ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string>
      
      (31) Filter
      Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
      Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457))
      
      (32) BroadcastExchange
      Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
      Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), input[1, string, false]),false), [id=#328]
      
      (33) BroadcastHashJoin
      Left keys [2]: [c_birth_country#426, s_zip#689]
      Right keys [2]: [upper(ca_country#458), ca_zip#457]
      Join condition: None
      
      (34) Project
      Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456]
      Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, ca_zip#457, ca_country#458]
      
      (35) HashAggregate
      Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456]
      Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579]
      Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))]
      Aggregate Attributes [1]: [sum#870L]
      Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
      
      (36) Exchange
      Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
      Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, 300), ENSURE_REQUIREMENTS, [id=#333]
      
      (37) HashAggregate
      Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
      Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579]
      Functions [1]: [sum(UnscaledValue(ss_net_paid#149))]
      Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L]
      Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852]
      
      (38) HashAggregate
      Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852]
      Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
      Functions [1]: [partial_sum(netpaid#852)]
      Aggregate Attributes [2]: [sum#866, isEmpty#867]
      Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
      
      (39) Exchange
      Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
      Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337]
      
      (40) HashAggregate
      Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
      Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
      Functions [1]: [sum(netpaid#852)]
      Aggregate Attributes [1]: [sum(netpaid#852)#854]
      Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, sum(netpaid#852)#854 AS paid#850]
      
      (41) Filter
      Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
      Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > cast(Subquery subquery#851, [id=#294] as decimal(33,8))))
      
      (42) AdaptiveSparkPlan
      Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
      Arguments: isFinalPlan=true 

       

       

      And I manually revert SPARK-35442, the problem no longer exists.

       

      The DAG corresponding to sql is as follows:

      The details as follows:

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              LuciferYang Yang Jie
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: