Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.4.0, 2.4.1, 2.4.2, 2.4.3, 2.4.4, 2.4.5, 2.4.6
-
Important
Description
spark.conf.set("spark.sql.exchange.reuse","true") spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load() df.createOrReplaceTempView(table) df = spark.sql(""" WITH t1 AS ( SELECT d_year, d_month_seq FROM ( SELECT t1.d_year , t2.d_month_seq FROM date_dim t1 cross join date_dim t2 where t1.d_day_name = "Monday" and t1.d_fy_year > 2000 and t2.d_day_name = "Monday" and t2.d_fy_year > 2000 ) GROUP BY d_year, d_month_seq) SELECT prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq FROM t1 curr_yr cross join t1 prev_yr WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1 ORDER BY d_month_seq LIMIT 100 """) df.explain() df.show()
the repro query :
A. defines a temp table t1
B. cross join t1 (year 2002) and t2 (year 2001)
With reuse exchange enabled, the plan incorrectly "decides" to re-use persisted shuffle writes of A filtering on year 2002 , for year 2001.
== Physical Plan == TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L]) +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS year#24367L, d_month_seq#24371L] +- CartesianProduct :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], functions=[]) : +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200) : +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], functions=[]) : +- BroadcastNestedLoopJoin BuildRight, Cross : :- *(1) Project [d_year#23551L] : : +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]]) : +- BroadcastExchange IdentityBroadcastMode : +- *(2) Project [d_month_seq#24371L] : +- *(2) ScanV2 BigQueryDataSourceV2[d_month_seq#24371L] (Filters: [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]]) +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], functions=[]) +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200)
And the result is obviously incorrect because prev_year should be 2001
+---------+----+-----------+ |prev_year|year|d_month_seq| +---------+----+-----------+ | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| +---------+----+-----------+ only showing top 20 rows