Description
How to reproduce this issue:
spark.range(1000).write.bucketBy(432, "id").saveAsTable("t1") spark.range(1000).write.bucketBy(34, "id").saveAsTable("t2") sql("set spark.sql.shuffle.partitions=600") sql("set spark.sql.autoBroadcastJoinThreshold=-1") sql("select * from t1 join t2 on t1.id = t2.id").explain()
== Physical Plan == *(5) SortMergeJoin [id#26L], [id#27L], Inner :- *(2) Sort [id#26L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#26L, 600), true, [id=#65] : +- *(1) Filter isnotnull(id#26L) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters: [isnotnull(id#26L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32444/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out of 432 +- *(4) Sort [id#27L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#27L, 600), true, [id=#74] +- *(3) Filter isnotnull(id#27L) +- *(3) ColumnarToRow +- FileScan parquet default.t2[id#27L] Batched: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32444/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 34 out of 34
Expected:
== Physical Plan == *(4) SortMergeJoin [id#26L], [id#27L], Inner :- *(1) Sort [id#26L ASC NULLS FIRST], false, 0 : +- *(1) Filter isnotnull(id#26L) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters: [isnotnull(id#26L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32444/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out of 432 +- *(3) Sort [id#27L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#27L, 432), true, [id=#69] +- *(2) Filter isnotnull(id#27L) +- *(2) ColumnarToRow +- FileScan parquet default.t2[id#27L] Batched: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32444/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 34 out of 34