The window operator currently uses Hive UDAFs for all aggregation operations. This is fine in terms of performance and functionality. However they limit extensibility, and they are quite opaque in terms of processing and memory usage. The later blocks advanced optimizations such as code generation and tungsten style (advanced) memory management.
We want to adress this by replacing the Hive UDAFs with native Spark SQL UDAFs. A redesign of the Spark UDAFs is currently underway, see
SPARK-4366. The new window UDAFs should use this new standard, in order to make them as future proof as possible. Although we are replacing the standard Hive UDAFs, other existing Hive UDAFs should still be supported.
The new window UDAFs should, at least, cover all existing Hive standard window UDAFs:
All these function imply a row order; this means that in order to use these functions properly an
ORDER BY clause must be defined.
The first and last value UDAFs are already present in Spark SQL. The only thing which needs to be added is skip NULL functionality.
LEAD and LAG are not aggregates. These expressions return the value of an expression a number of rows before (LAG) or ahead (LEAD) of the current row. These expression put a constraint on the Window frame in which they are executed: this can only be a Row frame with equal offsets.
The ROW_NUMBER() function can be seen as a count in a running row frame (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW).
RANK(), DENSE_RANK(), PERCENT_RANK(), NTILE(..) & CUME_DIST() are dependent on the actual value of values in the ORDER BY clause. The ORDER BY expression(s) must be made available before these functions are evaluated. All these functions will have a fixed frame, but this will be dependent on the implementation (probably a running row frame).
PERCENT_RANK(), NTILE(..) & CUME_DIST() are also dependent on the size of the partition being evaluated. The partition size must either be made available during evaluation (this is perfectly feasible in the current implementation) or the expression must be divided over two window and a merging expression, for instance PERCENT_RANK() would look like this:
The old WindowFunction interface will be replaced by the following (initial/very early) design (including sub-classes):
This change will have impact on quite a few other classes as well:
- org.apache.spark.sql.catalyst.analysis.FunctionRegistry (Add Functions)
- org.apache.spark.sql.execution.Window (Add another window frame processor, Add support for new UDAFs)
- org.apache.spark.sql.expressions.Window (Remove Hive-Only stuff)
- org.apache.spark.sql.hive.HiveQl (Use regular UnresolvedFunction)
- org.apache.spark.sql.hive.HiveWindowFunction (Remove Most of this)
- org.apache.spark.sql.hive.ResolveHiveWindowFunction (Remove Most of this)
Unknowns & Challenges
There are still a few unknowns and challengers, mainly because the work on
SPARK-4366 is still in full swing:
- How will we retain Hive UDAF functionality?
- What will a WindowExpression containing an AggregateFunction look like? Will there be an intermediate AggregateExpression2? Or is this only present when distinct values and/or a non-Complete processing mode is requested?
- The new implementation moves the responsibility of distinct processing to the operator. It also
adds two aggregate evaluation paths: AggregateFunction2 & AlgebraicAggregate (it is assumed that the current AggregateFunction doesn't require a third). Are there posibilities of code reuse? Or
do we have to implement everything from scratch?