Description
What I did
I define the following code:
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Bucketing")
.master("local[4]")
.config("spark.sql.bucketing.coalesceBucketsInJoin.enabled", True)
.config("spark.sql.autoBroadcastJoinThreshold", "-1")
.getOrCreate()
)
df1 = spark.range(0, 100)
df2 = spark.range(0, 100, 2)
df1.write.bucketBy(4, "id").mode("overwrite").saveAsTable("t1")
df2.write.bucketBy(2, "id").mode("overwrite").saveAsTable("t2")
t1 = spark.table("t1")
t2 = spark.table("t2")
t2.join(t1, "id").explain()
What happened
There is an Exchange node in the join plan
What is expected
The plan should not contain any Exchange/Shuffle nodes, because t1's number of buckets is 4 and t2's number of buckets is 2, and their ratio is 2 which is less than 4 (spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio) and CoalesceBucketsInJoin should be applied
Attachments
Issue Links
- is duplicated by
-
SPARK-43087 Support coalesce buckets in join in AQE
- Resolved
- links to