Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32708

Query optimization fails to reuse exchange with DataSourceV2

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.4.7
    • Fix Version/s: 2.4.8
    • Component/s: SQL
    • Labels:
      None

      Description

      Repro query:

      spark.conf.set("spark.sql.exchange.reuse","true")
      spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
      #spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load() 
      df.createOrReplaceTempView(table)
       
      df = spark.sql(""" 
      WITH t1 AS (
       SELECT 
       d_year, d_month_seq
       FROM (
       SELECT t1.d_year , t2.d_month_seq 
       FROM 
       date_dim t1
       cross join
       date_dim t2
       where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
       and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
       )
       GROUP BY d_year, d_month_seq)
       
       SELECT
       prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
       FROM t1 curr_yr cross join t1 prev_yr
       WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
       ORDER BY d_month_seq
       LIMIT 100
       
       """)
      df.explain()
      #df.show()
      

       

      The above query has different plans with Parquet and DataSourceV2. Both plans are correct tho. However, the DataSourceV2 plan is less optimized :

      Sub-plan [5-7] is exactly the same as sub-plan [1-3]( Aggregate on BHJed dataset of two tables that are filtered, projected the same way).

      Therefore, in the below parquet plan, exchange that happens after [1-3] is reused to replace [5-6].

      However, the DataSourceV2 plan failed to do so.

       

      Parquet:

      == Physical Plan ==
      TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS FIRST], output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
      +- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS year#21452L, d_month_seq#21456L]
         +- CartesianProduct
            :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], functions=[])
            :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
            :     +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], functions=[])
            :        +- BroadcastNestedLoopJoin BuildRight, Cross
            :           :- *(1) Project [d_year#20481L]
            :           :  +- *(1) Filter (((((isnotnull(d_year#20481L) && isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && (d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L = 2002))
            :           :     +- *(1) FileScan parquet [d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: Parquet, Location: InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_year), IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), Grea..., ReadSchema: struct<d_year:bigint,d_fy_year:bigint,d_day_name:string>
            :           +- BroadcastExchange IdentityBroadcastMode
            :              +- *(2) Project [d_month_seq#21456L]
            :                 +- *(2) Filter (((isnotnull(d_day_name#21467) && isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && (d_fy_year#21464L > 2000))
            :                    +- *(2) FileScan parquet [d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: Parquet, Location: InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., ReadSchema: struct<d_month_seq:bigint,d_fy_year:bigint,d_day_name:string>
            +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], functions=[])
               +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)

       

      DataSourceV2:

      == Physical Plan ==
       TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, output=prev_year#22320L,year#22321L,d_month_seq#22325L)
       +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS year#22321L, d_month_seq#22325L
       +- CartesianProduct
       :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
       : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200)
       : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
       : +- BroadcastNestedLoopJoin BuildRight, Cross
       : :- *(1) Project d_year#21696L
       : : +- *(1) ScanV2 BigQueryDataSourceV2d_year#21696L (Filters: [isnotnull(d_day_name#21704), (d_day_name#21704 = Monday), isnotnull(d_fy_year#21701L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
       : +- BroadcastExchange IdentityBroadcastMode
       : +- *(2) Project d_month_seq#22325L
       : +- *(2) ScanV2 BigQueryDataSourceV2d_month_seq#22325L (Filters: [isnotnull(d_day_name#22336), (d_day_name#22336 = Monday), isnotnull(d_fy_year#22333L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
       +- *(8) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[])
       +- Exchange hashpartitioning(d_year#22356L, d_month_seq#22409L, 200)
       +- *(7) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[])
       +- BroadcastNestedLoopJoin BuildRight, Cross
       :- *(5) Project d_year#22356L
       : +- *(5) ScanV2 BigQueryDataSourceV2d_year#22356L (Filters: [isnotnull(d_day_name#22364), (d_day_name#22364 = Monday), isnotnull(d_fy_year#22361L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
       +- BroadcastExchange IdentityBroadcastMode
       +- *(6) Project d_month_seq#22409L
       +- *(6) ScanV2 BigQueryDataSourceV2d_month_seq#22409L (Filters: [isnotnull(d_day_name#22420), (d_day_name#22420 = Monday), isnotnull(d_fy_year#22417L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])

       

        Attachments

          Activity

            People

            • Assignee:
              mingjial Mingjia Liu
              Reporter:
              mingjial Mingjia Liu
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Due:
                Created:
                Updated:
                Resolved: