Rank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersConvert to IssueLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.5.0
    • 2.0.0
    • SQL
    • None

    Description

      Rationale
      The window operator currently uses Hive UDAFs for all aggregation operations. This is fine in terms of performance and functionality. However they limit extensibility, and they are quite opaque in terms of processing and memory usage. The later blocks advanced optimizations such as code generation and tungsten style (advanced) memory management.

      Requirements
      We want to adress this by replacing the Hive UDAFs with native Spark SQL UDAFs. A redesign of the Spark UDAFs is currently underway, see SPARK-4366. The new window UDAFs should use this new standard, in order to make them as future proof as possible. Although we are replacing the standard Hive UDAFs, other existing Hive UDAFs should still be supported.

      The new window UDAFs should, at least, cover all existing Hive standard window UDAFs:

      1. FIRST_VALUE
      2. LAST_VALUE
      3. LEAD
      4. LAG
      5. ROW_NUMBER
      6. RANK
      7. DENSE_RANK
      8. PERCENT_RANK
      9. NTILE
      10. CUME_DIST

      All these function imply a row order; this means that in order to use these functions properly an
      ORDER BY clause must be defined.

      The first and last value UDAFs are already present in Spark SQL. The only thing which needs to be added is skip NULL functionality.

      LEAD and LAG are not aggregates. These expressions return the value of an expression a number of rows before (LAG) or ahead (LEAD) of the current row. These expression put a constraint on the Window frame in which they are executed: this can only be a Row frame with equal offsets.

      The ROW_NUMBER() function can be seen as a count in a running row frame (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW).

      RANK(), DENSE_RANK(), PERCENT_RANK(), NTILE(..) & CUME_DIST() are dependent on the actual value of values in the ORDER BY clause. The ORDER BY expression(s) must be made available before these functions are evaluated. All these functions will have a fixed frame, but this will be dependent on the implementation (probably a running row frame).

      PERCENT_RANK(), NTILE(..) & CUME_DIST() are also dependent on the size of the partition being evaluated. The partition size must either be made available during evaluation (this is perfectly feasible in the current implementation) or the expression must be divided over two window and a merging expression, for instance PERCENT_RANK() would look like this:

      (RANK() OVER (PARTITION BY x ORDER BY y) - 1) / (COUNT(*) OVER (PARTITION BY x) - 1)
      

      Design
      The old WindowFunction interface will be replaced by the following (initial/very early) design (including sub-classes):

      /**
       * A window function is a function that can only be evaluated in the context of a window operator.
       */
      trait WindowFunction {
        self: Expression =>
      
        /**
         * Define the frame in which the window operator must be executed.
         */
        def frame: WindowFrame = UnspecifiedFrame
      }
      
      /**
       * Base class for LEAD/LAG offset window functions.
       *
       * These are ordinary expressions, the idea is that the Window operator will process these in a
       * separate (specialized) window frame.
       */
      abstract class OffsetWindowFunction(val child: Expression, val offset: Int, val default: Expression) {
        override def deterministic: Boolean = false
        ...
      }
      
      case class Lead(child: Expression, offset: Int, default: Expression) extends OffsetWindowFunction(child, offset, default) {
        override val frame = SpecifiedWindowFrame(RowFrame, ValuePreceding(offset), ValuePreceding(offset))
      
        ...
      }
      
      case class Lag(child: Expression, offset: Int, default: Expression) extends OffsetWindowFunction(child, offset, default) {
        override val frame = SpecifiedWindowFrame(RowFrame, ValueFollowing(offset), ValueFollowing(offset))
      
        ...
      }
      
      case class RowNumber() extends AlgebraicAggregate with WindowFunction {
        override def deterministic: Boolean = false
        override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)
        ...
      }
      
      abstact class RankLike(val order: Seq[Expression] = Nil) extends AlgebraicAggregate with WindowFunction {
        override def deterministic: Boolean = true
      
        // This can be injected by either the Planner or the Window operator.
        def withOrderSpec(orderSpec: Seq[Expression]): AggregateWindowFuntion
      
        // This will be injected by the Window operator.
        // Only needed by: PERCENT_RANK(), NTILE(..) & CUME_DIST(). Maybe put this in a subclass.
        def withPartitionSize(size: MutableLiteral): AggregateWindowFuntion
      
        // We can do this as long as partition size is available before execution...
        override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)
        ...
      }
      
      case class Rank(order: Seq[Expression] = Nil) extends RankLike(order) {
        ...
      }
      
      case class DenseRank(order: Seq[Expression] = Nil) extends RankLike(order) {
        ...
      }
      
      case class PercentRank(order: Seq[Expression] = Nil) extends RankLike(order) {
        ...
      }
      
      case class NTile(order: Seq[Expression] = Nil, buckets: Int) extends RankLike(order) {
        override def deterministic: Boolean = false
        ...
      }
      
      case class CumeDist(order: Seq[Expression] = Nil) extends RankLike(order) {
        ...
      }
      

      This change will have impact on quite a few other classes as well:

      • org.apache.spark.sql.catalyst.expressions.WindowExpression
      • org.apache.spark.sql.catalyst.analysis.FunctionRegistry (Add Functions)
      • org.apache.spark.sql.execution.Window (Add another window frame processor, Add support for new UDAFs)
      • org.apache.spark.sql.expressions.Window (Remove Hive-Only stuff)
      • org.apache.spark.sql.hive.HiveQl (Use regular UnresolvedFunction)
      • org.apache.spark.sql.hive.HiveWindowFunction (Remove Most of this)
      • org.apache.spark.sql.hive.ResolveHiveWindowFunction (Remove Most of this)

      Unknowns & Challenges
      There are still a few unknowns and challengers, mainly because the work on SPARK-4366 is still in full swing:

      • How will we retain Hive UDAF functionality?
      • What will a WindowExpression containing an AggregateFunction look like? Will there be an intermediate AggregateExpression2? Or is this only present when distinct values and/or a non-Complete processing mode is requested?
      • The new implementation moves the responsibility of distinct processing to the operator. It also
        adds two aggregate evaluation paths: AggregateFunction2 & AlgebraicAggregate (it is assumed that the current AggregateFunction doesn't require a third). Are there posibilities of code reuse? Or
        do we have to implement everything from scratch?

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            hvanhovell Herman van Hövell
            hvanhovell Herman van Hövell
            Yin Huai Yin Huai
            Votes:
            1 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment