Description
Repro query:
spark.conf.set("spark.sql.exchange.reuse","true") spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim') #spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load() df.createOrReplaceTempView(table) df = spark.sql(""" WITH t1 AS ( SELECT d_year, d_month_seq FROM ( SELECT t1.d_year , t2.d_month_seq FROM date_dim t1 cross join date_dim t2 where t1.d_day_name = "Monday" and t1.d_fy_year > 2000 and t2.d_day_name = "Monday" and t2.d_fy_year > 2000 ) GROUP BY d_year, d_month_seq) SELECT prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq FROM t1 curr_yr cross join t1 prev_yr WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 ORDER BY d_month_seq LIMIT 100 """) df.explain() #df.show()
The above query has different plans with Parquet and DataSourceV2. Both plans are correct tho. However, the DataSourceV2 plan is less optimized :
Sub-plan [5-7] is exactly the same as sub-plan [1-3]( Aggregate on BHJed dataset of two tables that are filtered, projected the same way).
Therefore, in the below parquet plan, exchange that happens after [1-3] is reused to replace [5-6].
However, the DataSourceV2 plan failed to do so.
Parquet:
== Physical Plan == TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS FIRST], output=[prev_year#21451L,year#21452L,d_month_seq#21456L]) +- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS year#21452L, d_month_seq#21456L] +- CartesianProduct :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], functions=[]) : +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200) : +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], functions=[]) : +- BroadcastNestedLoopJoin BuildRight, Cross : :- *(1) Project [d_year#20481L] : : +- *(1) Filter (((((isnotnull(d_year#20481L) && isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && (d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L = 2002)) : : +- *(1) FileScan parquet [d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: Parquet, Location: InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_year), IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), Grea..., ReadSchema: struct<d_year:bigint,d_fy_year:bigint,d_day_name:string> : +- BroadcastExchange IdentityBroadcastMode : +- *(2) Project [d_month_seq#21456L] : +- *(2) Filter (((isnotnull(d_day_name#21467) && isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && (d_fy_year#21464L > 2000)) : +- *(2) FileScan parquet [d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: Parquet, Location: InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., ReadSchema: struct<d_month_seq:bigint,d_fy_year:bigint,d_day_name:string> +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], functions=[]) +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
DataSourceV2:
== Physical Plan == TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, output=prev_year#22320L,year#22321L,d_month_seq#22325L) +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS year#22321L, d_month_seq#22325L +- CartesianProduct :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[]) : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200) : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[]) : +- BroadcastNestedLoopJoin BuildRight, Cross : :- *(1) Project d_year#21696L : : +- *(1) ScanV2 BigQueryDataSourceV2d_year#21696L (Filters: [isnotnull(d_day_name#21704), (d_day_name#21704 = Monday), isnotnull(d_fy_year#21701L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]]) : +- BroadcastExchange IdentityBroadcastMode : +- *(2) Project d_month_seq#22325L : +- *(2) ScanV2 BigQueryDataSourceV2d_month_seq#22325L (Filters: [isnotnull(d_day_name#22336), (d_day_name#22336 = Monday), isnotnull(d_fy_year#22333L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]]) +- *(8) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[]) +- Exchange hashpartitioning(d_year#22356L, d_month_seq#22409L, 200) +- *(7) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[]) +- BroadcastNestedLoopJoin BuildRight, Cross :- *(5) Project d_year#22356L : +- *(5) ScanV2 BigQueryDataSourceV2d_year#22356L (Filters: [isnotnull(d_day_name#22364), (d_day_name#22364 = Monday), isnotnull(d_fy_year#22361L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]]) +- BroadcastExchange IdentityBroadcastMode +- *(6) Project d_month_seq#22409L +- *(6) ScanV2 BigQueryDataSourceV2d_month_seq#22409L (Filters: [isnotnull(d_day_name#22420), (d_day_name#22420 = Monday), isnotnull(d_fy_year#22417L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]])