Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.3.0, 2.3.1
-
None
-
./bin/spark-shell --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.0 /_/ Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_162 Branch master Compiled by user sameera on 2018-02-22T19:24:29Z Revision a0d7949896e70f427e7f3942ff340c9484ff0aab Url git@github.com:sameeragarwal/spark.git Type --help for more information.
./bin/spark-shell --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.0 /_/ Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_162 Branch master Compiled by user sameera on 2018-02-22T19:24:29Z Revision a0d7949896e70f427e7f3942ff340c9484ff0aab Url git@github.com:sameeragarwal/spark.git Type --help for more information.
Description
While exploring bucketing I found the following join query of non-bucketed and bucketed tables that ends up with two exchanges and two sorts in the physical plan for the non-bucketed join side.
// Make sure that you don't end up with a BroadcastHashJoin and a BroadcastExchange // Disable auto broadcasting spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val bucketedTableName = "bucketed_4_id" val large = spark.range(1000000) large.write .bucketBy(4, "id") .sortBy("id") .mode("overwrite") .saveAsTable(bucketedTableName) // Describe the table and include bucketing spec only val descSQL = sql(s"DESC FORMATTED $bucketedTableName").filter($"col_name".contains("Bucket") || $"col_name" === "Sort Columns") scala> descSQL.show(truncate = false) +--------------+---------+-------+ |col_name |data_type|comment| +--------------+---------+-------+ |Num Buckets |4 | | |Bucket Columns|[`id`] | | |Sort Columns |[`id`] | | +--------------+---------+-------+ val bucketedTable = spark.table(bucketedTableName) val t1 = spark.range(4) .repartition(2, $"id") // Use just 2 partitions .sortWithinPartitions("id") // sort partitions val q = t1.join(bucketedTable, "id") // Note two exchanges and sorts scala> q.explain == Physical Plan == *(5) Project [id#79L] +- *(5) SortMergeJoin [id#79L], [id#77L], Inner :- *(3) Sort [id#79L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#79L, 4) : +- *(2) Sort [id#79L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#79L, 2) : +- *(1) Range (0, 4, step=1, splits=8) +- *(4) Sort [id#77L ASC NULLS FIRST], false, 0 +- *(4) Project [id#77L] +- *(4) Filter isnotnull(id#77L) +- *(4) FileScan parquet default.bucketed_4_id[id#77L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> q.foreach(_ => ())
Attachments
Attachments
Issue Links
- is related to
-
SPARK-17570 Avoid Hash and Exchange in Sort Merge join if bucketing factor is multiple for tables
- Resolved