Description
Consider the following example:
w1 = Window.partitionBy("sno") w2 = Window.partitionBy("sno", "pno") supply \ .select('sno', 'pno', 'qty', F.sum('qty').over(w2).alias('sum_qty_2')) \ .select('sno', 'pno', 'qty', F.col('sum_qty_2'), F.sum('qty').over(w1).alias('sum_qty_1')) \ .explain() == Optimized Logical Plan == Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1112L], [sno#980] +- Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1105L], [sno#980, pno#981] +- Relation[sno#980,pno#981,qty#982L] parquet == Physical Plan == Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1112L], [sno#980] +- *Sort [sno#980 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(sno#980, 200) +- Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1105L], [sno#980, pno#981] +- *Sort [sno#980 ASC NULLS FIRST, pno#981 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(sno#980, pno#981, 200) +- *FileScan parquet [sno#980,pno#981,qty#982L] ...
A more efficient query plan can be achieved by flipping the Window expressions to eliminate an unnecessary shuffle as follows:
== Optimized Logical Plan == Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1087L], [sno#980, pno#981] +- Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1085L], [sno#980] +- Relation[sno#980,pno#981,qty#982L] parquet == Physical Plan == Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1087L], [sno#980, pno#981] +- *Sort [sno#980 ASC NULLS FIRST, pno#981 ASC NULLS FIRST], false, 0 +- Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1085L], [sno#980] +- *Sort [sno#980 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(sno#980, 200) +- *FileScan parquet [sno#980,pno#981,qty#982L] ...