Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32461 Shuffled hash join improvement
  3. SPARK-32383

Preserve hash join (BHJ and SHJ) stream side ordering

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Trivial
    • Resolution: Fixed
    • 3.1.0
    • 3.1.0
    • SQL
    • None

    Description

      Currently `BroadcastHashJoinExec` and `ShuffledHashJoinExec` do not preserve children output ordering information (inherit from `SparkPlan.outputOrdering`, which is Nil). This can add unnecessary sort in complex queries involved multiple joins.

      Example:

       

      withSQLConf(
            SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") {
            val df1 = spark.range(100).select($"id".as("k1"))
            val df2 = spark.range(100).select($"id".as("k2"))
            val df3 = spark.range(3).select($"id".as("k3"))
            val df4 = spark.range(100).select($"id".as("k4"))
            val plan = df1.join(df2, $"k1" === $"k2")
              .join(df3, $"k1" === $"k3")
              .join(df4, $"k1" === $"k4")
              .queryExecution
              .executedPlan
      }
      

       

      Current physical plan (extra sort on `k1` before top sort merge join):

      *(9) SortMergeJoin [k1#220L], [k4#232L], Inner
      :- *(6) Sort [k1#220L ASC NULLS FIRST], false, 0
      :  +- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight
      :     :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner
      :     :  :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0
      :     :  :  +- Exchange hashpartitioning(k1#220L, 5), true, [id=#128]
      :     :  :     +- *(1) Project [id#218L AS k1#220L]
      :     :  :        +- *(1) Range (0, 100, step=1, splits=2)
      :     :  +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0
      :     :     +- Exchange hashpartitioning(k2#224L, 5), true, [id=#134]
      :     :        +- *(3) Project [id#222L AS k2#224L]
      :     :           +- *(3) Range (0, 100, step=1, splits=2)
      :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#141]
      :        +- *(5) Project [id#226L AS k3#228L]
      :           +- *(5) Range (0, 3, step=1, splits=2)
      +- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(k4#232L, 5), true, [id=#148]
            +- *(7) Project [id#230L AS k4#232L]
               +- *(7) Range (0, 100, step=1, splits=2)
      

      Ideal physical plan (no extra sort on `k1` before top sort merge join):

      *(9) SortMergeJoin [k1#220L], [k4#232L], Inner
      :- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight
      :  :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner
      :  :  :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0
      :  :  :  +- Exchange hashpartitioning(k1#220L, 5), true, [id=#127]
      :  :  :     +- *(1) Project [id#218L AS k1#220L]
      :  :  :        +- *(1) Range (0, 100, step=1, splits=2)
      :  :  +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0
      :  :     +- Exchange hashpartitioning(k2#224L, 5), true, [id=#133]
      :  :        +- *(3) Project [id#222L AS k2#224L]
      :  :           +- *(3) Range (0, 100, step=1, splits=2)
      :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#140]
      :     +- *(5) Project [id#226L AS k3#228L]
      :        +- *(5) Range (0, 3, step=1, splits=2)
      +- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(k4#232L, 5), true, [id=#146]
            +- *(7) Project [id#230L AS k4#232L]
               +- *(7) Range (0, 100, step=1, splits=2)

       

      Attachments

        Activity

          People

            chengsu Cheng Su
            chengsu Cheng Su
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: