Description
Enable adaptive execution should not add more ShuffleExchange. How to reproduce:
import org.apache.spark.sql.SaveMode spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) spark.conf.set("spark.sql.shuffle.partitions", 4) val bucketedTableName = "bucketed_table" spark.range(10).write.bucketBy(4, "id").sortBy("id").mode(SaveMode.Overwrite).saveAsTable(bucketedTableName) val bucketedTable = spark.table(bucketedTableName) val df = spark.range(4) df.join(bucketedTable, "id").explain() spark.conf.set("spark.sql.adaptive.enabled", true) spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 5) df.join(bucketedTable, "id").explain()
Output:
== Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- Project [id#5L] +- SortMergeJoin [id#5L], [id#3L], Inner :- Sort [id#5L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#5L, 5), true, [id=#92] : +- Range (0, 4, step=1, splits=16) +- Sort [id#3L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#3L, 5), true, [id=#93] +- Project [id#3L] +- Filter isnotnull(id#3L) +- FileScan parquet default.bucketed_table[id#3L] Batched: true, DataFilters: [isnotnull(id#3L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-preview-bin-hadoop3.2/spark-warehouse/bucketed_table], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 4 out of 4
Attachments
Issue Links
- links to