Description
TransposeWindow rule will try to eliminate unnecessary shuffle:
/** * Transpose Adjacent Window Expressions. * - If the partition spec of the parent Window expression is compatible with the partition spec * of the child window expression, transpose them. */ object TransposeWindow extends Rule[LogicalPlan] { private def compatiblePartitions(ps1 : Seq[Expression], ps2: Seq[Expression]): Boolean = { ps1.length < ps2.length && ps2.take(ps1.length).permutations.exists(ps1.zip(_).forall { case (l, r) => l.semanticEquals(r) }) } private def windowsCompatible(w1: Window, w2: Window): Boolean = { w1.references.intersect(w2.windowOutputSet).isEmpty && w1.expressions.forall(_.deterministic) && w2.expressions.forall(_.deterministic) && compatiblePartitions(w1.partitionSpec, w2.partitionSpec) } def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( _.containsPattern(WINDOW), ruleId) { case w1 @ Window(_, _, _, w2 @ Window(_, _, _, grandChild)) if windowsCompatible(w1, w2) => Project(w1.output, w2.copy(child = w1.copy(child = grandChild))) case w1 @ Window(_, _, _, Project(pl, w2 @ Window(_, _, _, grandChild))) if windowsCompatible(w1, w2) && w1.references.subsetOf(grandChild.outputSet) => Project( pl ++ w1.windowOutputSet, w2.copy(child = w1.copy(child = grandChild))) } }
but the function compatiblePartitions will only take the first n elements of the ps2 sequence, for some cases, this will not take effect, like the case below:
val df = spark.range(10).selectExpr("id AS a", "id AS b", "id AS c", "id AS d") df.selectExpr( "sum(`d`) OVER(PARTITION BY `b`,`a`) as e", "sum(`c`) OVER(PARTITION BY `a`) as f" ).explain
Current plan
== Physical Plan == *(5) Project [e#10L, f#11L] +- Window [sum(c#4L) windowspecdefinition(a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#11L], [a#2L] +- *(4) Sort [a#2L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#2L, 200), true, [id=#41] +- *(3) Project [a#2L, c#4L, e#10L] +- Window [sum(d#5L) windowspecdefinition(b#3L, a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#10L], [b#3L, a#2L] +- *(2) Sort [b#3L ASC NULLS FIRST, a#2L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(b#3L, a#2L, 200), true, [id=#33] +- *(1) Project [id#0L AS d#5L, id#0L AS b#3L, id#0L AS a#2L, id#0L AS c#4L] +- *(1) Range (0, 10, step=1, splits=10)
Expected plan:
== Physical Plan == *(4) Project [e#924L, f#925L] +- Window [sum(d#43L) windowspecdefinition(b#41L, a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#924L], [b#41L, a#40L] +- *(3) Sort [b#41L ASC NULLS FIRST, a#40L ASC NULLS FIRST], false, 0 +- *(3) Project [d#43L, b#41L, a#40L, f#925L] +- Window [sum(c#42L) windowspecdefinition(a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#925L], [a#40L] +- *(2) Sort [a#40L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#40L, 200), true, [id=#282] +- *(1) Project [id#38L AS d#43L, id#38L AS b#41L, id#38L AS a#40L, id#38L AS c#42L] +- *(1) Range (0, 10, step=1, splits=10)
Also the permutations method has a O(n!) time complexity, which is very expensive when there are many partition columns, we could try to optimize it.