Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32767

Bucket join should work if spark.sql.shuffle.partitions larger than bucket number

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.0.0
    • Fix Version/s: 3.0.2, 3.1.0
    • Component/s: SQL
    • Labels:
      None

      Description

      How to reproduce this issue:

          spark.range(1000).write.bucketBy(432, "id").saveAsTable("t1")
          spark.range(1000).write.bucketBy(34, "id").saveAsTable("t2")
          sql("set spark.sql.shuffle.partitions=600")
          sql("set spark.sql.autoBroadcastJoinThreshold=-1")
          sql("select * from t1 join t2 on t1.id = t2.id").explain()
      
      == Physical Plan ==
      *(5) SortMergeJoin [id#26L], [id#27L], Inner
      :- *(2) Sort [id#26L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#26L, 600), true, [id=#65]
      :     +- *(1) Filter isnotnull(id#26L)
      :        +- *(1) ColumnarToRow
      :           +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters: [isnotnull(id#26L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32444/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out of 432
      +- *(4) Sort [id#27L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#27L, 600), true, [id=#74]
            +- *(3) Filter isnotnull(id#27L)
               +- *(3) ColumnarToRow
                  +- FileScan parquet default.t2[id#27L] Batched: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32444/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 34 out of 34
      
      

      Expected:

      == Physical Plan ==
      *(4) SortMergeJoin [id#26L], [id#27L], Inner
      :- *(1) Sort [id#26L ASC NULLS FIRST], false, 0
      :  +- *(1) Filter isnotnull(id#26L)
      :     +- *(1) ColumnarToRow
      :        +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters: [isnotnull(id#26L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32444/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out of 432
      +- *(3) Sort [id#27L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#27L, 432), true, [id=#69]
            +- *(2) Filter isnotnull(id#27L)
               +- *(2) ColumnarToRow
                  +- FileScan parquet default.t2[id#27L] Batched: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32444/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 34 out of 34
      
      

        Attachments

          Activity

            People

            • Assignee:
              yumwang Yuming Wang
              Reporter:
              yumwang Yuming Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: