diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java index 2d621e9..e840938 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java @@ -59,13 +59,11 @@ import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionSpec; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitioningSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.CurrentRowSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.RangeBoundarySpec; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.ValueBoundarySpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFrameSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -426,38 +424,26 @@ private PartitioningSpec getPSpec(RexWindow window) { private WindowFrameSpec getWindowRange(RexWindow window) { // NOTE: in Hive AST Rows->Range(Physical) & Range -> Values (logical) - - WindowFrameSpec windowFrame = new WindowFrameSpec(); - BoundarySpec start = null; RexWindowBound ub = window.getUpperBound(); if (ub != null) { - start = getWindowBound(ub, window.isRows()); + start = getWindowBound(ub); } BoundarySpec end = null; RexWindowBound lb = window.getLowerBound(); if (lb != null) { - end = getWindowBound(lb, window.isRows()); + end = getWindowBound(lb); } - if (start != null || end != null) { - if (start != null) { - windowFrame.setStart(start); - } - if (end != null) { - windowFrame.setEnd(end); - } - } - - return windowFrame; + return new WindowFrameSpec(window.isRows() ? WindowType.ROWS : WindowType.RANGE, start, end); } - private BoundarySpec getWindowBound(RexWindowBound wb, boolean isRows) { + private BoundarySpec getWindowBound(RexWindowBound wb) { BoundarySpec boundarySpec; if (wb.isCurrentRow()) { - boundarySpec = new CurrentRowSpec(); + boundarySpec = new BoundarySpec(Direction.CURRENT); } else { final Direction direction; final int amt; @@ -471,11 +457,8 @@ private BoundarySpec getWindowBound(RexWindowBound wb, boolean isRows) { } else { amt = RexLiteral.intValue(wb.getOffset()); } - if (isRows) { - boundarySpec = new RangeBoundarySpec(direction, amt); - } else { - boundarySpec = new ValueBoundarySpec(direction, amt); - } + + boundarySpec = new BoundarySpec(direction, amt); } return boundarySpec; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index bf79e95..fdb468d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -214,10 +214,10 @@ import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionSpec; import org.apache.hadoop.hive.ql.parse.QBExpr.Opcode; import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.RangeBoundarySpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowExpressionSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -3012,10 +3012,9 @@ private int getWindowSpecIndx(ASTNode wndAST) { WindowSpec wndSpec = ((WindowFunctionSpec) wExpSpec).getWindowSpec(); List partitionKeys = getPartitionKeys(wndSpec.getPartition(), converter, inputRR); List orderKeys = getOrderKeys(wndSpec.getOrder(), converter, inputRR); - RexWindowBound upperBound = getBound(wndSpec.getWindowFrame().start, converter); - RexWindowBound lowerBound = getBound(wndSpec.getWindowFrame().end, converter); - boolean isRows = ((wndSpec.getWindowFrame().start instanceof RangeBoundarySpec) || (wndSpec.getWindowFrame().end instanceof RangeBoundarySpec)) ? true - : false; + RexWindowBound upperBound = getBound(wndSpec.getWindowFrame().getStart(), converter); + RexWindowBound lowerBound = getBound(wndSpec.getWindowFrame().getEnd(), converter); + boolean isRows = wndSpec.getWindowFrame().getWindowType() == WindowType.ROWS; w = cluster.getRexBuilder().makeOver(calciteAggFnRetType, calciteAggFn, calciteAggFnArgs, partitionKeys, ImmutableList. copyOf(orderKeys), lowerBound, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java index 519f10d..f75566d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java @@ -50,19 +50,16 @@ import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionedTableFunctionSpec; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitioningSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.CurrentRowSpec; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.RangeBoundarySpec; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.ValueBoundarySpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowExpressionSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFrameSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.PTFDesc; import org.apache.hadoop.hive.ql.plan.PTFDeserializer; import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; -import org.apache.hadoop.hive.ql.plan.ptf.CurrentRowDef; import org.apache.hadoop.hive.ql.plan.ptf.OrderDef; import org.apache.hadoop.hive.ql.plan.ptf.OrderExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; @@ -70,9 +67,7 @@ import org.apache.hadoop.hive.ql.plan.ptf.PTFQueryInputDef; import org.apache.hadoop.hive.ql.plan.ptf.PartitionDef; import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; -import org.apache.hadoop.hive.ql.plan.ptf.RangeBoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.ShapeDetails; -import org.apache.hadoop.hive.ql.plan.ptf.ValueBoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFunctionDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowTableFunctionDef; @@ -521,11 +516,12 @@ private WindowFrameDef translate(String wFnName, ShapeDetails inpShape, WindowSp * Since we componentize Windowing, no need to translate * the Partition & Order specs of individual WFns. */ - return translate(inpShape, spec.getWindowFrame()); + return translate(inpShape, spec.getWindowFrame(), spec.getOrder().getExpressions()); } private WindowFrameDef translate(ShapeDetails inpShape, - WindowFrameSpec spec) + WindowFrameSpec spec, + List orderExpressions) throws SemanticException { if (spec == null) { return null; @@ -539,38 +535,32 @@ private WindowFrameDef translate(ShapeDetails inpShape, "Window range invalid, start boundary is greater than end boundary: %s", spec)); } - return new WindowFrameDef(translate(inpShape, s), translate(inpShape, e)); + WindowFrameDef winFrame = new WindowFrameDef( + spec.getWindowType(), + new BoundaryDef(s.direction, s.getAmt()), + new BoundaryDef(e.direction, e.getAmt())); + winFrame.setOrderDef(buildOrderExpressions(inpShape, orderExpressions)); + return winFrame; } - private BoundaryDef translate(ShapeDetails inpShape, BoundarySpec bndSpec) + private OrderDef buildOrderExpressions(ShapeDetails inpShape, List orderExpressions) throws SemanticException { - if (bndSpec instanceof ValueBoundarySpec) { - ValueBoundarySpec vBndSpec = (ValueBoundarySpec) bndSpec; - ValueBoundaryDef vbDef = new ValueBoundaryDef(vBndSpec.getDirection(), vBndSpec.getAmt()); - for (OrderExpression oe : vBndSpec.getOrderExpressions()) { - PTFTranslator.validateNoLeadLagInValueBoundarySpec(oe.getExpression()); - PTFExpressionDef exprDef = null; - try { - exprDef = buildExpressionDef(inpShape, oe.getExpression()); - } catch (HiveException he) { - throw new SemanticException(he); - } - PTFTranslator.validateValueBoundaryExprType(exprDef.getOI()); - OrderExpressionDef orderExprDef = new OrderExpressionDef(exprDef); - orderExprDef.setOrder(oe.getOrder()); - orderExprDef.setNullOrder(oe.getNullOrder()); - vbDef.addOrderExpressionDef(orderExprDef); + OrderDef orderDef = new OrderDef(); + for (OrderExpression oe : orderExpressions) { + PTFTranslator.validateNoLeadLagInValueBoundarySpec(oe.getExpression()); + PTFExpressionDef exprDef = null; + try { + exprDef = buildExpressionDef(inpShape, oe.getExpression()); + } catch (HiveException he) { + throw new SemanticException(he); } - return vbDef; - } - else if (bndSpec instanceof RangeBoundarySpec) { - RangeBoundarySpec rBndSpec = (RangeBoundarySpec) bndSpec; - return new RangeBoundaryDef(rBndSpec.getDirection(), rBndSpec.getAmt()); - } else if (bndSpec instanceof CurrentRowSpec) { - CurrentRowDef cbDef = new CurrentRowDef(); - return cbDef; + PTFTranslator.validateValueBoundaryExprType(exprDef.getOI()); + OrderExpressionDef orderExprDef = new OrderExpressionDef(exprDef); + orderExprDef.setOrder(oe.getOrder()); + orderExprDef.setNullOrder(oe.getNullOrder()); + orderDef.addExpression(orderExprDef); } - throw new SemanticException("Unknown Boundary: " + bndSpec); + return orderDef; } static void setupWdwFnEvaluator(WindowFunctionDef def) throws HiveException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 71d34eb..30987e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -150,14 +150,12 @@ import org.apache.hadoop.hive.ql.parse.QBSubQuery.SubQueryType; import org.apache.hadoop.hive.ql.parse.SubQueryUtils.ISubQueryJoinInfo; import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.CurrentRowSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.RangeBoundarySpec; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.ValueBoundarySpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowExpressionSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFrameSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes; @@ -12760,13 +12758,12 @@ private WindowFrameSpec processWindowFrame(ASTNode node) throws SemanticExceptio if ( node.getChildCount() > 1 ) { end = processBoundary(type, (ASTNode) node.getChild(1)); } - - return new WindowFrameSpec(start, end); + // Note: TOK_WINDOWVALUES means RANGE type, TOK_WINDOWRANGE means ROWS type + return new WindowFrameSpec(type == HiveParser.TOK_WINDOWVALUES ? WindowType.RANGE : WindowType.ROWS, start, end); } private BoundarySpec processBoundary(int frameType, ASTNode node) throws SemanticException { - BoundarySpec bs = frameType == HiveParser.TOK_WINDOWRANGE ? - new RangeBoundarySpec() : new ValueBoundarySpec(); + BoundarySpec bs = new BoundarySpec(); int type = node.getType(); boolean hasAmt = true; @@ -12779,7 +12776,7 @@ private BoundarySpec processBoundary(int frameType, ASTNode node) throws Semant bs.setDirection(Direction.FOLLOWING); break; case HiveParser.KW_CURRENT: - bs = new CurrentRowSpec(); + bs.setDirection(Direction.CURRENT); hasAmt = false; break; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java index ef5186a..d5bcbe6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java @@ -20,12 +20,9 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.List; - import org.antlr.runtime.CommonToken; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo; -import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionSpec; @@ -140,13 +137,13 @@ public void validateAndMakeEffective() throws SemanticException { // 3. For missing Wdw Frames or for Frames with only a Start Boundary, completely // specify them by the rules in {@link effectiveWindowFrame} - effectiveWindowFrame(wFn, wdwSpec); + effectiveWindowFrame(wFn); // 4. Validate the effective Window Frames with the rules in {@link validateWindowFrame} validateWindowFrame(wdwSpec); // 5. Add the Partition expressions as the Order if there is no Order and validate Order spec. - setAndValidateOrderSpec(wFn, wdwSpec); + setAndValidateOrderSpec(wFn); } } @@ -199,24 +196,22 @@ private void applyConstantPartition(WindowSpec wdwSpec) { } /* - * - A Window Frame that has only the /start/boundary, then it is interpreted as: - BETWEEN AND CURRENT ROW - * - A Window Specification with an Order Specification and no Window - * Frame is interpreted as: - ROW BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + * - A Window Frame that has only the start boundary, then it is interpreted as: + * BETWEEN AND CURRENT ROW + * - A Window Specification with an Order Specification and no Window Frame is + * interpreted as: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW * - A Window Specification with no Order and no Window Frame is interpreted as: - ROW BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + * ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING */ - private void effectiveWindowFrame(WindowFunctionSpec wFn, WindowSpec wdwSpec) + private void effectiveWindowFrame(WindowFunctionSpec wFn) throws SemanticException { - + WindowSpec wdwSpec = wFn.getWindowSpec(); WindowFunctionInfo wFnInfo = FunctionRegistry.getWindowFunctionInfo(wFn.getName()); boolean supportsWindowing = wFnInfo == null ? true : wFnInfo.isSupportsWindow(); WindowFrameSpec wFrame = wdwSpec.getWindowFrame(); OrderSpec orderSpec = wdwSpec.getOrder(); if ( wFrame == null ) { if (!supportsWindowing ) { - if ( wFn.getName().toLowerCase().equals(FunctionRegistry.LAST_VALUE_FUNC_NAME) && orderSpec != null ) { /* @@ -224,33 +219,37 @@ private void effectiveWindowFrame(WindowFunctionSpec wFn, WindowSpec wdwSpec) * last value among rows with the same Sort Key value. */ wFrame = new WindowFrameSpec( - new CurrentRowSpec(), - new RangeBoundarySpec(Direction.FOLLOWING, 0) + WindowType.ROWS, + new BoundarySpec(Direction.CURRENT), + new BoundarySpec(Direction.FOLLOWING, 0) + ); + } else { + wFrame = new WindowFrameSpec( + WindowType.ROWS, + new BoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT), + new BoundarySpec(Direction.FOLLOWING, BoundarySpec.UNBOUNDED_AMOUNT) ); } - else { + } else { + if ( orderSpec == null ) { wFrame = new WindowFrameSpec( - new RangeBoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT), - new RangeBoundarySpec(Direction.FOLLOWING, BoundarySpec.UNBOUNDED_AMOUNT) + WindowType.ROWS, + new BoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT), + new BoundarySpec(Direction.FOLLOWING, BoundarySpec.UNBOUNDED_AMOUNT) ); + } else { + wFrame = new WindowFrameSpec( + WindowType.RANGE, + new BoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT), + new BoundarySpec(Direction.CURRENT) + ); } } - else if ( orderSpec == null ) { - wFrame = new WindowFrameSpec( - new RangeBoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT), - new RangeBoundarySpec(Direction.FOLLOWING, BoundarySpec.UNBOUNDED_AMOUNT) - ); - } - else { - wFrame = new WindowFrameSpec( - new ValueBoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT), - new CurrentRowSpec() - ); - } + wdwSpec.setWindowFrame(wFrame); } else if ( wFrame.getEnd() == null ) { - wFrame.setEnd(new CurrentRowSpec()); + wFrame.setEnd(new BoundarySpec(Direction.CURRENT)); } } @@ -273,19 +272,19 @@ private void validateWindowFrame(WindowSpec wdwSpec) throws SemanticException { /** * Add default order spec if there is no order and validate order spec for valued based * windowing since only one sort key is allowed. - * @param wdwSpec + * @param wFn Window function spec * @throws SemanticException */ - private void setAndValidateOrderSpec(WindowFunctionSpec wFn, WindowSpec wdwSpec) throws SemanticException { + private void setAndValidateOrderSpec(WindowFunctionSpec wFn) throws SemanticException { + WindowSpec wdwSpec = wFn.getWindowSpec(); wdwSpec.ensureOrderSpec(wFn); - WindowFrameSpec wFrame = wdwSpec.getWindowFrame(); OrderSpec order = wdwSpec.getOrder(); BoundarySpec start = wFrame.getStart(); BoundarySpec end = wFrame.getEnd(); - if (start instanceof ValueBoundarySpec || end instanceof ValueBoundarySpec) { + if (wFrame.getWindowType() == WindowType.RANGE) { if (order == null || order.getExpressions().size() == 0) { throw new SemanticException("Range based Window Frame needs to specify ORDER BY clause"); } @@ -304,13 +303,6 @@ private void setAndValidateOrderSpec(WindowFunctionSpec wFn, WindowSpec wdwSpec) if ( order.getExpressions().size() != 1 && !multiOrderAllowed) { throw new SemanticException("Range value based Window Frame can have only 1 Sort Key"); } - - if (start instanceof ValueBoundarySpec) { - ((ValueBoundarySpec)start).setOrderExpressions(order.getExpressions()); - } - if (end instanceof ValueBoundarySpec) { - ((ValueBoundarySpec)end).setOrderExpressions(order.getExpressions()); - } } } @@ -518,22 +510,20 @@ public String toString() { */ public static class WindowFrameSpec { - BoundarySpec start; - BoundarySpec end; + private WindowType windowType; + private BoundarySpec start; + private BoundarySpec end; - public WindowFrameSpec() { - } - - public WindowFrameSpec(BoundarySpec start, BoundarySpec end) + public WindowFrameSpec(WindowType windowType, BoundarySpec start, BoundarySpec end) { - super(); + this.windowType = windowType; this.start = start; this.end = end; } - public WindowFrameSpec(BoundarySpec start) + public WindowFrameSpec(WindowType windowType, BoundarySpec start) { - this(start, null); + this(windowType, start, null); } public BoundarySpec getStart() @@ -556,10 +546,15 @@ public void setEnd(BoundarySpec end) this.end = end; } + public WindowType getWindowType() { + return this.windowType; + } + @Override public String toString() { - return String.format("window(start=%s, end=%s)", start, end); + return String.format("window(type=%s, start=%s, end=%s)", + this.windowType, start, end); } } @@ -571,6 +566,13 @@ public String toString() FOLLOWING }; + // The types for ROWS BETWEEN or RANGE BETWEEN windowing spec + public static enum WindowType + { + ROWS, + RANGE + }; + /* * A Boundary specifies how many rows back/forward a WindowFrame extends from the * current row. A Boundary is specified as: @@ -580,152 +582,41 @@ public String toString() * - Value Boundary :: which is specified as the amount the value of an Expression must decrease/increase */ - public abstract static class BoundarySpec implements Comparable + public static class BoundarySpec implements Comparable { public static int UNBOUNDED_AMOUNT = Integer.MAX_VALUE; - public abstract Direction getDirection(); - public abstract void setDirection(Direction dir); - public abstract void setAmt(int amt); - public abstract int getAmt(); - } - - public static class RangeBoundarySpec extends BoundarySpec - { - Direction direction; int amt; - public RangeBoundarySpec() { - } - - public RangeBoundarySpec(Direction direction, int amt) - { - super(); - this.direction = direction; - this.amt = amt; - } - - @Override - public Direction getDirection() - { - return direction; - } - - @Override - public void setDirection(Direction direction) - { - this.direction = direction; - } - - @Override - public int getAmt() - { - return amt; - } - - @Override - public void setAmt(int amt) - { - this.amt = amt; - } - - @Override - public String toString() - { - return String.format("range(%s %s)", (amt == UNBOUNDED_AMOUNT ? "Unbounded" : amt), - direction); - } - - public int compareTo(BoundarySpec other) - { - int c = direction.compareTo(other.getDirection()); - if (c != 0) { - return c; - } - - RangeBoundarySpec rb = (RangeBoundarySpec) other; - // Valid range is "range/rows between 10 preceding and 2 preceding" for preceding case - return this.direction == Direction.PRECEDING ? rb.amt - amt : amt - rb.amt; + public BoundarySpec() { } - } - - public static class CurrentRowSpec extends BoundarySpec - { - public CurrentRowSpec() { - } - - @Override - public String toString() - { - return "currentRow"; - } - - @Override - public Direction getDirection() { - return Direction.CURRENT; + public BoundarySpec(Direction direction) { + this(direction, 0); } - @Override - public void setDirection(Direction dir) {} - @Override - public void setAmt(int amt) {} - - public int compareTo(BoundarySpec other) + public BoundarySpec(Direction direction, int amt) { - return getDirection().compareTo(other.getDirection()); - } - - @Override - public int getAmt() {return 0;} - } - - public static class ValueBoundarySpec extends BoundarySpec - { - Direction direction; - int amt; - List orderExpressions; - - public ValueBoundarySpec() { - } - - public ValueBoundarySpec(Direction direction, int amt) - { - super(); this.direction = direction; this.amt = amt; } - @Override public Direction getDirection() { return direction; } - @Override public void setDirection(Direction direction) { this.direction = direction; } - public List getOrderExpressions() - { - return orderExpressions; - } - - public void setOrderExpressions(List orderExpressions) - { - this.orderExpressions = orderExpressions; - } - - @Override public int getAmt() { return amt; } - @Override public void setAmt(int amt) { this.amt = amt; @@ -734,16 +625,12 @@ public void setAmt(int amt) @Override public String toString() { - StringBuilder exprs = new StringBuilder(); - if (orderExpressions != null) { - for (int i=0; i extends AbstractList { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java b/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java index c26de3f..95e3176 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java @@ -28,9 +28,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType; import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; -import org.apache.hadoop.hive.ql.plan.ptf.CurrentRowDef; -import org.apache.hadoop.hive.ql.plan.ptf.RangeBoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; @@ -52,18 +51,18 @@ public static WindowFrameDef wdwFrame(int p, int f) { BoundaryDef start, end; if (p == 0) { - start = new CurrentRowDef(); + start = new BoundaryDef(Direction.CURRENT, 0); } else { - start = new RangeBoundaryDef(Direction.PRECEDING, p); + start = new BoundaryDef(Direction.PRECEDING, p); } if (f == 0) { - end = new CurrentRowDef(); + end = new BoundaryDef(Direction.CURRENT, 0); } else { - end = new RangeBoundaryDef(Direction.FOLLOWING, f); + end = new BoundaryDef(Direction.FOLLOWING, f); } - return new WindowFrameDef(start, end); + return new WindowFrameDef(WindowType.ROWS, start, end); } public void sumDouble(Iterator inVals, int inSz, int numPreceding,