Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-38034

Optimize time complexity and extend applicable cases for TransposeWindow

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.2.0
    • 3.3.1, 3.2.3, 3.4.0
    • SQL
    • None

    Description

      TransposeWindow rule will try to eliminate unnecessary shuffle:

      /**
       * Transpose Adjacent Window Expressions.
       * - If the partition spec of the parent Window expression is compatible with the partition spec
       *   of the child window expression, transpose them.
       */
      object TransposeWindow extends Rule[LogicalPlan] {
        private def compatiblePartitions(ps1 : Seq[Expression], ps2: Seq[Expression]): Boolean = {
          ps1.length < ps2.length && ps2.take(ps1.length).permutations.exists(ps1.zip(_).forall {
            case (l, r) => l.semanticEquals(r)
          })
        }
      
        private def windowsCompatible(w1: Window, w2: Window): Boolean = {
          w1.references.intersect(w2.windowOutputSet).isEmpty &&
            w1.expressions.forall(_.deterministic) &&
            w2.expressions.forall(_.deterministic) &&
            compatiblePartitions(w1.partitionSpec, w2.partitionSpec)
        }
      
        def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
          _.containsPattern(WINDOW), ruleId) {
          case w1 @ Window(_, _, _, w2 @ Window(_, _, _, grandChild))
            if windowsCompatible(w1, w2) =>
            Project(w1.output, w2.copy(child = w1.copy(child = grandChild)))
      
          case w1 @ Window(_, _, _, Project(pl, w2 @ Window(_, _, _, grandChild)))
            if windowsCompatible(w1, w2) && w1.references.subsetOf(grandChild.outputSet) =>
            Project(
              pl ++ w1.windowOutputSet,
              w2.copy(child = w1.copy(child = grandChild)))
        }
      } 

      but the function compatiblePartitions will only take the first n elements of the ps2 sequence, for some cases, this will not take effect, like the case below: 

      val df = spark.range(10).selectExpr("id AS a", "id AS b", "id AS c", "id AS d") 
      df.selectExpr(
          "sum(`d`) OVER(PARTITION BY `b`,`a`) as e", 
          "sum(`c`) OVER(PARTITION BY `a`) as f"
        ).explain
      
      

      Current plan

      == Physical Plan ==
      *(5) Project [e#10L, f#11L]
      +- Window [sum(c#4L) windowspecdefinition(a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#11L], [a#2L]
         +- *(4) Sort [a#2L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(a#2L, 200), true, [id=#41]
               +- *(3) Project [a#2L, c#4L, e#10L]
                  +- Window [sum(d#5L) windowspecdefinition(b#3L, a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#10L], [b#3L, a#2L]
                     +- *(2) Sort [b#3L ASC NULLS FIRST, a#2L ASC NULLS FIRST], false, 0
                        +- Exchange hashpartitioning(b#3L, a#2L, 200), true, [id=#33]
                           +- *(1) Project [id#0L AS d#5L, id#0L AS b#3L, id#0L AS a#2L, id#0L AS c#4L]
                              +- *(1) Range (0, 10, step=1, splits=10) 

      Expected plan:

      == Physical Plan ==
      *(4) Project [e#924L, f#925L]
      +- Window [sum(d#43L) windowspecdefinition(b#41L, a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#924L], [b#41L, a#40L]
         +- *(3) Sort [b#41L ASC NULLS FIRST, a#40L ASC NULLS FIRST], false, 0
            +- *(3) Project [d#43L, b#41L, a#40L, f#925L]
               +- Window [sum(c#42L) windowspecdefinition(a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#925L], [a#40L]
                  +- *(2) Sort [a#40L ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(a#40L, 200), true, [id=#282]
                        +- *(1) Project [id#38L AS d#43L, id#38L AS b#41L, id#38L AS a#40L, id#38L AS c#42L]
                           +- *(1) Range (0, 10, step=1, splits=10) 

      Also the permutations method has a O(n!) time complexity, which is very expensive when there are many partition columns, we could try to optimize it.

      Attachments

        Activity

          People

            constzhou zhou xiang
            constzhou zhou xiang
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: