Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-34079

Merge non-correlated scalar subqueries

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.2.0
    • 3.3.0
    • SQL
    • None

    Description

      Prepare table:

      CREATE TABLE store_sales (  ss_sold_date_sk INT,  ss_sold_time_sk INT,  ss_item_sk INT,  ss_customer_sk INT,  ss_cdemo_sk INT,  ss_hdemo_sk INT,  ss_addr_sk INT,  ss_store_sk INT,  ss_promo_sk INT,  ss_ticket_number INT,  ss_quantity INT,  ss_wholesale_cost DECIMAL(7,2),  ss_list_price DECIMAL(7,2),  ss_sales_price DECIMAL(7,2),  ss_ext_discount_amt DECIMAL(7,2),  ss_ext_sales_price DECIMAL(7,2),  ss_ext_wholesale_cost DECIMAL(7,2),  ss_ext_list_price DECIMAL(7,2),  ss_ext_tax DECIMAL(7,2),  ss_coupon_amt DECIMAL(7,2),  ss_net_paid DECIMAL(7,2),  ss_net_paid_inc_tax DECIMAL(7,2),ss_net_profit DECIMAL(7,2));
      CREATE TABLE reason (  r_reason_sk INT,  r_reason_id varchar(255),  r_reason_desc varchar(255));
      

      SQL:

      WITH bucket_result AS (
      SELECT
          CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_quantity END)) > 62316685
            THEN (avg(CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_ext_discount_amt END))
          ELSE (avg(CASE WHEN ss_quantity BETWEEN 1 AND 20 THEN ss_net_paid END)) END bucket1,
          CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN ss_quantity END)) > 19045798
            THEN (avg(CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN ss_ext_discount_amt END))
          ELSE (avg(CASE WHEN ss_quantity BETWEEN 21 AND 40 THEN ss_net_paid END)) END bucket2,
          CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN ss_quantity END)) > 365541424
            THEN (avg(CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN ss_ext_discount_amt END))
          ELSE (avg(CASE WHEN ss_quantity BETWEEN 41 AND 60 THEN ss_net_paid END)) END bucket3,
          CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN ss_quantity END)) > 19045798
            THEN (avg(CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN ss_ext_discount_amt END))
          ELSE (avg(CASE WHEN ss_quantity BETWEEN 61 AND 80 THEN ss_net_paid END)) END bucket4,
          CASE WHEN (count (CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN ss_quantity END)) > 365541424
            THEN (avg(CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN ss_ext_discount_amt END))
          ELSE (avg(CASE WHEN ss_quantity BETWEEN 81 AND 100 THEN ss_net_paid END)) END bucket5
        FROM store_sales
      )
      SELECT
        (SELECT bucket1 FROM bucket_result) as bucket1,
        (SELECT bucket2 FROM bucket_result) as bucket2,
        (SELECT bucket3 FROM bucket_result) as bucket3,
        (SELECT bucket4 FROM bucket_result) as bucket4,
        (SELECT bucket5 FROM bucket_result) as bucket5
      FROM reason
      WHERE r_reason_sk = 1;
      
      

      Plan of Spark SQL:

      == Physical Plan ==
      AdaptiveSparkPlan isFinalPlan=false
      +- Project [Subquery subquery#0, [id=#23] AS bucket1#1, Subquery subquery#2, [id=#34] AS bucket2#3, Subquery subquery#4, [id=#45] AS bucket3#5, Subquery subquery#6, [id=#56] AS bucket4#7, Subquery subquery#8, [id=#67] AS bucket5#9]
         :  :- Subquery subquery#0, [id=#23]
         :  :  +- AdaptiveSparkPlan isFinalPlan=false
         :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_quantity#28 else null), avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_net_paid#38 else null))])
         :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#21]
         :  :           +- HashAggregate(keys=[], functions=[partial_count(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_quantity#28 else null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_ext_discount_amt#32 else null)), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 1) AND (ss_quantity#28 <= 20))) ss_net_paid#38 else null))])
         :  :              +- FileScan parquet default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
         :  :- Subquery subquery#2, [id=#34]
         :  :  +- AdaptiveSparkPlan isFinalPlan=false
         :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) ss_quantity#28 else null), avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) ss_net_paid#38 else null))])
         :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#32]
         :  :           +- HashAggregate(keys=[], functions=[partial_count(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) ss_quantity#28 else null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) ss_ext_discount_amt#32 else null)), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 21) AND (ss_quantity#28 <= 40))) ss_net_paid#38 else null))])
         :  :              +- FileScan parquet default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
         :  :- Subquery subquery#4, [id=#45]
         :  :  +- AdaptiveSparkPlan isFinalPlan=false
         :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) ss_quantity#28 else null), avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) ss_net_paid#38 else null))])
         :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#43]
         :  :           +- HashAggregate(keys=[], functions=[partial_count(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) ss_quantity#28 else null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) ss_ext_discount_amt#32 else null)), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 41) AND (ss_quantity#28 <= 60))) ss_net_paid#38 else null))])
         :  :              +- FileScan parquet default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
         :  :- Subquery subquery#6, [id=#56]
         :  :  +- AdaptiveSparkPlan isFinalPlan=false
         :  :     +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) ss_quantity#28 else null), avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) ss_net_paid#38 else null))])
         :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#54]
         :  :           +- HashAggregate(keys=[], functions=[partial_count(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) ss_quantity#28 else null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) ss_ext_discount_amt#32 else null)), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 61) AND (ss_quantity#28 <= 80))) ss_net_paid#38 else null))])
         :  :              +- FileScan parquet default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
         :  +- Subquery subquery#8, [id=#67]
         :     +- AdaptiveSparkPlan isFinalPlan=false
         :        +- HashAggregate(keys=[], functions=[count(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) ss_quantity#28 else null), avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) ss_ext_discount_amt#32 else null)), avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) ss_net_paid#38 else null))])
         :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#65]
         :              +- HashAggregate(keys=[], functions=[partial_count(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) ss_quantity#28 else null), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) ss_ext_discount_amt#32 else null)), partial_avg(UnscaledValue(if (((ss_quantity#28 >= 81) AND (ss_quantity#28 <= 100))) ss_net_paid#38 else null))])
         :                 +- FileScan parquet default.store_sales[ss_quantity#28,ss_ext_discount_amt#32,ss_net_paid#38] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ss_quantity:int,ss_ext_discount_amt:decimal(7,2),ss_net_paid:decimal(7,2)>
         +- Filter (isnotnull(r_reason_sk#15) AND (r_reason_sk#15 = 1))
            +- FileScan parquet default.reason[r_reason_sk#15] Batched: true, DataFilters: [isnotnull(r_reason_sk#15), (r_reason_sk#15 = 1)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [IsNotNull(r_reason_sk), EqualTo(r_reason_sk,1)], ReadSchema: struct<r_reason_sk:int>
      

      Plan of PostgreSQL:

                                            QUERY PLAN
      --------------------------------------------------------------------------------------
       Seq Scan on reason  (cost=51.80..62.67 rows=1 width=160)
         Filter: (r_reason_sk = 1)
         CTE bucket_result
           ->  Aggregate  (cost=51.68..51.70 rows=1 width=160)
                 ->  Seq Scan on store_sales  (cost=0.00..13.40 rows=340 width=32)
         InitPlan 2 (returns $1)
           ->  CTE Scan on bucket_result  (cost=0.00..0.02 rows=1 width=32)
         InitPlan 3 (returns $2)
           ->  CTE Scan on bucket_result bucket_result_1  (cost=0.00..0.02 rows=1 width=32)
         InitPlan 4 (returns $3)
           ->  CTE Scan on bucket_result bucket_result_2  (cost=0.00..0.02 rows=1 width=32)
         InitPlan 5 (returns $4)
           ->  CTE Scan on bucket_result bucket_result_3  (cost=0.00..0.02 rows=1 width=32)
         InitPlan 6 (returns $5)
           ->  CTE Scan on bucket_result bucket_result_4  (cost=0.00..0.02 rows=1 width=32)
      (15 rows)
      

      It seems Spark SQL scan store_sales five times, but PostgreSQL scan store_sales only once.

      Attachments

        Activity

          People

            petertoth Peter Toth
            yumwang Yuming Wang
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: