Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-20636

Eliminate unnecessary shuffle with adjacent Window expressions

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.1.1
    • Fix Version/s: 3.0.0
    • Component/s: Optimizer, SQL
    • Labels:
      None

      Description

      Consider the following example:

      w1 = Window.partitionBy("sno")
      w2 = Window.partitionBy("sno", "pno")
      
      supply \
          .select('sno', 'pno', 'qty', F.sum('qty').over(w2).alias('sum_qty_2')) \
          .select('sno', 'pno', 'qty', F.col('sum_qty_2'), F.sum('qty').over(w1).alias('sum_qty_1')) \
          .explain()
      
      == Optimized Logical Plan ==
      Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1112L], [sno#980]
      +- Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1105L], [sno#980, pno#981]
         +- Relation[sno#980,pno#981,qty#982L] parquet
      
      == Physical Plan ==
      Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1112L], [sno#980]
      +- *Sort [sno#980 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(sno#980, 200)
            +- Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1105L], [sno#980, pno#981]
               +- *Sort [sno#980 ASC NULLS FIRST, pno#981 ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(sno#980, pno#981, 200)
                     +- *FileScan parquet [sno#980,pno#981,qty#982L] ...
      

      A more efficient query plan can be achieved by flipping the Window expressions to eliminate an unnecessary shuffle as follows:

      == Optimized Logical Plan ==
      Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1087L], [sno#980, pno#981]
      +- Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1085L], [sno#980]
         +- Relation[sno#980,pno#981,qty#982L] parquet
      
      == Physical Plan ==
      Window [sum(qty#982L) windowspecdefinition(sno#980, pno#981, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_2#1087L], [sno#980, pno#981]
      +- *Sort [sno#980 ASC NULLS FIRST, pno#981 ASC NULLS FIRST], false, 0
         +- Window [sum(qty#982L) windowspecdefinition(sno#980, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum_qty_1#1085L], [sno#980]
            +- *Sort [sno#980 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(sno#980, 200)
                  +- *FileScan parquet [sno#980,pno#981,qty#982L] ...
      

        Attachments

          Activity

            People

            • Assignee:
              ptkool Michael Styles
              Reporter:
              ptkool Michael Styles
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: