Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32609

Incorrect exchange reuse with DataSourceV2

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.0, 2.4.1, 2.4.2, 2.4.3, 2.4.4, 2.4.5, 2.4.6
    • 2.4.7
    • SQL
    • Important

    Description

       

      spark.conf.set("spark.sql.exchange.reuse","true")
      spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
      df.createOrReplaceTempView(table)
          
      df = spark.sql(""" 
      WITH t1 AS (
          SELECT 
              d_year, d_month_seq
          FROM (
              SELECT t1.d_year , t2.d_month_seq          
              FROM 
              date_dim t1
              cross join
              date_dim t2
              where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
              and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
              )
          GROUP BY d_year, d_month_seq)
         
       SELECT
          prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
       FROM t1 curr_yr cross join t1 prev_yr
       WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1
       ORDER BY d_month_seq
       LIMIT 100
       
       """)
      
      df.explain()
      df.show()

       

      the repro query :
      A. defines a temp table t1  
      B. cross join t1 (year 2002)  and  t2 (year 2001)

      With reuse exchange enabled, the plan incorrectly "decides" to re-use persisted shuffle writes of A filtering on year 2002 , for year 2001.

      == Physical Plan ==
      TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L])
      +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS year#24367L, d_month_seq#24371L]
         +- CartesianProduct
            :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], functions=[])
            :  +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200)
            :     +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], functions=[])
            :        +- BroadcastNestedLoopJoin BuildRight, Cross
            :           :- *(1) Project [d_year#23551L]
            :           :  +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
            :           +- BroadcastExchange IdentityBroadcastMode
            :              +- *(2) Project [d_month_seq#24371L]
            :                 +- *(2) ScanV2 BigQueryDataSourceV2[d_month_seq#24371L] (Filters: [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])
            +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], functions=[])
               +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200)

       

       

      And the result is obviously incorrect because prev_year should be 2001

      +---------+----+-----------+
      |prev_year|year|d_month_seq|
      +---------+----+-----------+
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      |     2002|2002|       1212|
      +---------+----+-----------+
      only showing top 20 rows
      

       

       

      Attachments

        Activity

          People

            mingjial Mingjia Liu
            mingjial Mingjia Liu
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - 48h
                48h
                Remaining:
                Remaining Estimate - 48h
                48h
                Logged:
                Time Spent - Not Specified
                Not Specified