Description
Currently, ReusedExchange would rewrite output partitioning if child's partitioning is HashPartitioning, but it does not do the same when child's partitioning is RangePartitioning, sometimes, it could introduce extra shuffle, see:
val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j") val df1 = df.as("t1") val df2 = df.as("t2") val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right") t.cache.orderBy($"t2.j").explain
Before fix:
== Physical Plan == *(1) Sort [j#14 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200) +- InMemoryTableScan [i#5, j#6, i#13, j#14] +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as... : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) : +- LocalTableScan [i#5, j#6] +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)", like:
== Physical Plan == *(1) Sort [j#14 ASC NULLS FIRST], true, 0 +- InMemoryTableScan [i#5, j#6, i#13, j#14] +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) : +- LocalTableScan [i#5, j#6] +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
Attachments
Issue Links
- links to