Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-31869

BroadcastHashJoinExe's outputPartitioning can utilize the build side

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 3.1.0
    • Fix Version/s: 3.1.0
    • Component/s: SQL
    • Labels:
      None

      Description

      Currently, the BroadcastHashJoinExec's outputPartitioning only uses the streamed side's outputPartitioning. Thus, if the join key is from the build side for the join where one side is BroadcastHashJoinExec:

      spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500")
      val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
      val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2")
      val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3")
      val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4")
      
      // join1 is a sort merge join.
      val join1 = t1.join(t2, t1("i1") === t2("i2"))
      
      // join2 is a broadcast join where t3 is broadcasted.
      val join2 = join1.join(t3, join1("i1") === t3("i3"))
      
      // Join on the column from the broadcasted side (i3).
      val join3 = join2.join(t4, join2("i3") === t4("i4"))
      
      join3.explain
      

      it produces Exchange hashpartitioning(i2#103, 200):

      == Physical Plan ==
      *(6) SortMergeJoin [i3#29], [i4#40], Inner
      :- *(4) Sort [i3#29 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(i3#29, 200), true, [id=#55]
      :     +- *(3) BroadcastHashJoin [i1#7], [i3#29], Inner, BuildRight
      :        :- *(3) SortMergeJoin [i1#7], [i2#18], Inner
      :        :  :- *(1) Sort [i1#7 ASC NULLS FIRST], false, 0
      :        :  :  +- Exchange hashpartitioning(i1#7, 200), true, [id=#28]
      :        :  :     +- LocalTableScan [i1#7, j1#8]
      :        :  +- *(2) Sort [i2#18 ASC NULLS FIRST], false, 0
      :        :     +- Exchange hashpartitioning(i2#18, 200), true, [id=#29]
      :        :        +- LocalTableScan [i2#18, j2#19]
      :        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#34]
      :           +- LocalTableScan [i3#29, j3#30]
      +- *(5) Sort [i4#40 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(i4#40, 200), true, [id=#39]
            +- LocalTableScan [i4#40, j4#41]
      

       But, since BroadcastHashJoinExec is only for equi-join, if the streamed side has HashPartitioning, BroadcastHashJoinExec can utilize the info to eliminate the exchange.

        Attachments

          Activity

            People

            • Assignee:
              imback82 Terry Kim
              Reporter:
              imback82 Terry Kim
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: