Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19122

Unnecessary shuffle+sort added if join predicates ordering differ from bucketing and sorting order

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.2, 2.1.0
    • 2.3.0
    • SQL
    • None

    Description

      `table1` and `table2` are sorted and bucketed on columns `j` and `k` (in respective order)

      This is how they are generated:

      val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1)
      df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table1")
      df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table2")
      

      Now, if join predicates are specified in query in same order as bucketing and sort order, there is no shuffle and sort.

      scala> hc.sql("SET spark.sql.autoBroadcastJoinThreshold=1")
      scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.j=b.j AND a.k=b.k").explain(true)
      
      == Physical Plan ==
      *SortMergeJoin [j#61, k#62], [j#100, k#101], Inner
      :- *Project [i#60, j#61, k#62]
      :  +- *Filter (isnotnull(k#62) && isnotnull(j#61))
      :     +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/table1], PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
      +- *Project [i#99, j#100, k#101]
         +- *Filter (isnotnull(j#100) && isnotnull(k#101))
            +- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/table2], PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
      

      The same query with join predicates in different order from bucketing and sort order leads to extra shuffle and sort being introduced

      scala> hc.sql("SET spark.sql.autoBroadcastJoinThreshold=1")
      scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.k=b.k AND a.j=b.j ").explain(true)
      
      == Physical Plan ==
      *SortMergeJoin [k#62, j#61], [k#101, j#100], Inner
      :- *Sort [k#62 ASC NULLS FIRST, j#61 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(k#62, j#61, 200)
      :     +- *Project [i#60, j#61, k#62]
      :        +- *Filter (isnotnull(k#62) && isnotnull(j#61))
      :           +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/table1], PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
      +- *Sort [k#101 ASC NULLS FIRST, j#100 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(k#101, j#100, 200)
            +- *Project [i#99, j#100, k#101]
               +- *Filter (isnotnull(j#100) && isnotnull(k#101))
                  +- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/table2], PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
      

      Attachments

        Activity

          People

            tejasp Tejas Patil
            tejasp Tejas Patil
            Votes:
            0 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: