Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-31350

Coalesce bucketed tables for join if applicable

    XMLWordPrintableJSON

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.1.0
    • Fix Version/s: 3.1.0
    • Component/s: SQL
    • Labels:
      None

      Description

      The following example of joining two bucketed tables introduces a full shuffle:

      spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
      val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
      val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k")
      df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
      df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2")
      val t1 = spark.table("t1")
      val t2 = spark.table("t2")
      val joined = t1.join(t2, t1("i") === t2("i"))
      joined.explain(true)
      
      
      == Physical Plan ==
      *(5) SortMergeJoin [i#44], [i#50], Inner
      :- *(2) Sort [i#44 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(i#44, 200), true, [id=#105]
      :     +- *(1) Project [i#44, j#45, k#46]
      :        +- *(1) Filter isnotnull(i#44)
      :           +- *(1) ColumnarToRow
      :              +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true, DataFilters: [isnotnull(i#44)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8
      +- *(4) Sort [i#50 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(i#50, 200), true, [id=#115]
            +- *(3) Project [i#50, j#51, k#52]
               +- *(3) Filter isnotnull(i#50)
                  +- *(3) ColumnarToRow
                     +- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true, DataFilters: [isnotnull(i#50)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4
      

      But one side can be coalesced to eliminate the shuffle.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                imback82 Terry Kim
                Reporter:
                imback82 Terry Kim
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: