Details
-
Improvement
-
Status: In Progress
-
Major
-
Resolution: Unresolved
-
3.1.0
-
None
-
None
Description
Currently, we execute the SQL statement:
select MDTTemp.* from (select * from distinctAgg where a > 2 distribute by a, b,c) MDTTemp
left join (select * from testData5 where f > 1) ProjData
on MDTTemp.b = ProjData.g and
MDTTemp.c = ProjData.h and
MDTTemp.d < (ProjData.j - 3) and
MDTTemp.d >= (ProjData.j + 3)
the physical plan the explain looks like:
== Physical Plan ==
*Project a#203, b#204, c#205, d#206, e#207
+- SortMergeJoin b#204, c#205, g#222, h#223, LeftOuter, ((d#206 < (j#224 - 3)) && (d#206 >= (j#224 + 3)))
:- *Sort b#204 ASC, c#205 ASC, false, 0
: +- Exchange hashpartitioning(b#204, c#205, 5)
: +- Exchange hashpartitioning(a#203, b#204, c#205, 5)
: +- *Filter (a#203 > 2)
: +- Scan ExistingRDDa#203,b#204,c#205,d#206,e#207
+- *Sort g#222 ASC, h#223 ASC, false, 0
+- Exchange hashpartitioning(g#222, h#223, 5)
+- *Project g#222, h#223, j#224
+- *Filter (f#221 > 1)
+- Scan ExistingRDDf#221,g#222,h#223,j#224,k#225
There is a redundancy ShuffleExchange that is not necessary. This PR will provide a rule to remove redundancy ShuffleExchange in the planner. now the explain looks like:
== Physical Plan ==
*Project a#203, b#204, c#205, d#206, e#207
+- SortMergeJoin b#204, c#205, g#222, h#223, LeftOuter, ((d#206 < (j#224 - 3)) && (d#206 >= (j#224 + 3)))
:- *Sort b#204 ASC, c#205 ASC, false, 0
: +- Exchange hashpartitioning(b#204, c#205, 5)
: +- *Filter (a#203 > 2)
: +- Scan ExistingRDDa#203,b#204,c#205,d#206,e#207
+- *Sort g#222 ASC, h#223 ASC, false, 0
+- Exchange hashpartitioning(g#222, h#223, 5)
+- *Project g#222, h#223, j#224
+- *Filter (f#221 > 1)
+- Scan ExistingRDDf#221,g#222,h#223,j#224,k#225
and I have add a test case:
val N = 2 << 20
runJoinBenchmark("sort merge join", N) {
val df1 = sparkSession.range(N)
.selectExpr(s"(id * 15485863) % ${N*10} as k1")
val df2 = sparkSession.range(N)
.selectExpr(s"(id * 15485867) % ${N*10} as k2")
val df = df1.join(df2.repartition(20), col("k1") === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
df.count()
}
To test the performance of the following:
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
sort merge join Repartition off 3520 / 4364 0.6 1678.5 1.0X
sort merge join Repartition on 1946 / 2203 1.1 927.9 1.8X