Description
The following example of joining two bucketed tables introduces a full shuffle:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0") val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k") df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2") val t1 = spark.table("t1") val t2 = spark.table("t2") val joined = t1.join(t2, t1("i") === t2("i")) joined.explain(true) == Physical Plan == *(5) SortMergeJoin [i#44], [i#50], Inner :- *(2) Sort [i#44 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i#44, 200), true, [id=#105] : +- *(1) Project [i#44, j#45, k#46] : +- *(1) Filter isnotnull(i#44) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true, DataFilters: [isnotnull(i#44)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8 +- *(4) Sort [i#50 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i#50, 200), true, [id=#115] +- *(3) Project [i#50, j#51, k#52] +- *(3) Filter isnotnull(i#50) +- *(3) ColumnarToRow +- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true, DataFilters: [isnotnull(i#50)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4
But one side can be coalesced to eliminate the shuffle.
Attachments
Issue Links
- is duplicated by
-
SPARK-29693 Bucket map join if the one's bucket number is the multiple of the other
- Resolved
- links to