Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23544

Remove redundancy ShuffleExchange in the planner

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: In Progress
    • Major
    • Resolution: Unresolved
    • 3.1.0
    • None
    • SQL
    • None

    Description

      Currently, we execute the SQL statement:
      select MDTTemp.* from (select * from distinctAgg where a > 2 distribute by a, b,c) MDTTemp
      left join (select * from testData5 where f > 1) ProjData
      on MDTTemp.b = ProjData.g and
      MDTTemp.c = ProjData.h and
      MDTTemp.d < (ProjData.j - 3) and
      MDTTemp.d >= (ProjData.j + 3)

      the physical plan the explain looks like:
      == Physical Plan ==
      *Project a#203, b#204, c#205, d#206, e#207
      +- SortMergeJoin b#204, c#205, g#222, h#223, LeftOuter, ((d#206 < (j#224 - 3)) && (d#206 >= (j#224 + 3)))
      :- *Sort b#204 ASC, c#205 ASC, false, 0
      : +- Exchange hashpartitioning(b#204, c#205, 5)
      : +- Exchange hashpartitioning(a#203, b#204, c#205, 5)
      : +- *Filter (a#203 > 2)
      : +- Scan ExistingRDDa#203,b#204,c#205,d#206,e#207
      +- *Sort g#222 ASC, h#223 ASC, false, 0
      +- Exchange hashpartitioning(g#222, h#223, 5)
      +- *Project g#222, h#223, j#224
      +- *Filter (f#221 > 1)
      +- Scan ExistingRDDf#221,g#222,h#223,j#224,k#225

      There is a redundancy ShuffleExchange that is not necessary. This PR will provide a rule to remove redundancy ShuffleExchange in the planner. now the explain looks like:

      == Physical Plan ==
      *Project a#203, b#204, c#205, d#206, e#207
      +- SortMergeJoin b#204, c#205, g#222, h#223, LeftOuter, ((d#206 < (j#224 - 3)) && (d#206 >= (j#224 + 3)))
      :- *Sort b#204 ASC, c#205 ASC, false, 0
      : +- Exchange hashpartitioning(b#204, c#205, 5)
      : +- *Filter (a#203 > 2)
      : +- Scan ExistingRDDa#203,b#204,c#205,d#206,e#207
      +- *Sort g#222 ASC, h#223 ASC, false, 0
      +- Exchange hashpartitioning(g#222, h#223, 5)
      +- *Project g#222, h#223, j#224
      +- *Filter (f#221 > 1)
      +- Scan ExistingRDDf#221,g#222,h#223,j#224,k#225

       

      and I have add a test case:

      val N = 2 << 20
      runJoinBenchmark("sort merge join", N) {
      val df1 = sparkSession.range(N)
      .selectExpr(s"(id * 15485863) % ${N*10} as k1")
      val df2 = sparkSession.range(N)
      .selectExpr(s"(id * 15485867) % ${N*10} as k2")
      val df = df1.join(df2.repartition(20), col("k1") === col("k2"))
      assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
      df.count()
      }

       

      To test the performance of the following:

      Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
      Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
      sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
      ------------------------------------------------------------------------------------------------
      sort merge join Repartition off 3520 / 4364 0.6 1678.5 1.0X
      sort merge join Repartition on 1946 / 2203 1.1 927.9 1.8X

      Attachments

        Activity

          People

            Unassigned Unassigned
            heary-cao caoxuewen
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: