diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java index 2e6a880..2bdb1d2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java @@ -24,7 +24,6 @@ import java.util.Stack; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java index fe698ef..4c2f42e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java @@ -99,7 +99,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, private static void splitTask(SparkTask currentTask, ReduceWork reduceWork, ParseContext parseContext) throws SemanticException { SparkWork currentWork = currentTask.getWork(); - Set> reduceSinkSet = + Set> reduceSinkSet = SparkMapJoinResolver.getOp(reduceWork, ReduceSinkOperator.class); if (currentWork.getChildren(reduceWork).size() == 1 && canSplit(currentWork) && reduceSinkSet.size() == 1) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java index 28afc6b..aa60e70 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java @@ -273,7 +273,7 @@ private void validateWindowFrame(WindowSpec wdwSpec) throws SemanticException { } if ( end.getDirection() == Direction.PRECEDING && - start.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT ) { + end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT ) { throw new SemanticException("End of a WindowFrame cannot be UNBOUNDED PRECEDING"); } @@ -558,6 +558,18 @@ public String toString() public abstract void setDirection(Direction dir); public abstract void setAmt(int amt); public abstract int getAmt(); + + public int getDirectedAmt() { + return getDirection() == Direction.PRECEDING ? -getAmt() : getAmt(); + } + + public int compareTo(BoundarySpec other) { + int c = getDirection().compareTo(other.getDirection()); + if (c == 0) { + c = getDirectedAmt() - other.getDirectedAmt(); + } + return c; + } } public static class RangeBoundarySpec extends BoundarySpec @@ -606,17 +618,6 @@ public String toString() return String.format("range(%s %s)", (amt == UNBOUNDED_AMOUNT ? "Unbounded" : amt), direction); } - - public int compareTo(BoundarySpec other) - { - int c = direction.compareTo(other.getDirection()); - if (c != 0) { - return c; - } - RangeBoundarySpec rb = (RangeBoundarySpec) other; - return amt - rb.amt; - } - } public static class CurrentRowSpec extends BoundarySpec @@ -636,13 +637,13 @@ public Direction getDirection() { } @Override - public void setDirection(Direction dir) {} + public void setDirection(Direction dir) { + throw new IllegalStateException("Should not set direction for CURRENT row spec"); + } + @Override - public void setAmt(int amt) {} - - public int compareTo(BoundarySpec other) - { - return getDirection().compareTo(other.getDirection()); + public void setAmt(int amt) { + throw new IllegalStateException("Should not set value for CURRENT row spec"); } @Override @@ -704,17 +705,6 @@ public String toString() { return String.format("value(%s %s %s)", expression.toStringTree(), amt, direction); } - - public int compareTo(BoundarySpec other) - { - int c = direction.compareTo(other.getDirection()); - if (c != 0) { - return c; - } - ValueBoundarySpec vb = (ValueBoundarySpec) other; - return amt - vb.amt; - } - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java index e4ea358..a0e2d77 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hive.ql.parse.WindowingSpec; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; public class WindowFunctionDef extends WindowExpressionDef { @@ -93,4 +94,14 @@ public void setPivotResult(boolean pivotResult) { this.pivotResult = pivotResult; } + // todo window frame not including current row cannot be evaluated + public boolean windowIncludesCurrent() { + return windowFrame == null || + (windowFrame.getStart().getDirection() != windowFrame.getEnd().getDirection()); + } + + public boolean isUnboundedFollowing() { + return windowFrame != null && + windowFrame.getEnd().getAmt() == WindowingSpec.BoundarySpec.UNBOUNDED_AMOUNT; + } } \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java index 12a327f..13194f4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java @@ -26,8 +26,6 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; -import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; import org.apache.hadoop.hive.ql.util.JavaDataModel; @@ -159,11 +157,8 @@ public AggregationBuffer getNewAggregationBuffer() throws HiveException { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); - return new GenericUDAFStreamingEvaluator.SumAvgEnhancer(this, - start.getAmt(), end.getAmt()) { + wFrmDef) { @Override protected DoubleWritable getNextResult( @@ -172,8 +167,7 @@ protected DoubleWritable getNextResult( AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; Double r = myagg.count == 0 ? null : myagg.sum; long cnt = myagg.count; - if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { + if (ss.isWindowFull()) { Object[] o = ss.intermediateVals.remove(0); if (o != null) { Double d = (Double) o[0]; @@ -289,11 +283,8 @@ public AggregationBuffer getNewAggregationBuffer() throws HiveException { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); - return new GenericUDAFStreamingEvaluator.SumAvgEnhancer( - this, start.getAmt(), end.getAmt()) { + this, wFrmDef) { @Override protected HiveDecimalWritable getNextResult( @@ -302,8 +293,7 @@ protected HiveDecimalWritable getNextResult( AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; HiveDecimal r = myagg.count == 0 ? null : myagg.sum; long cnt = myagg.count; - if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { + if (ss.isWindowFull()) { Object[] o = ss.intermediateVals.remove(0); if (o != null) { HiveDecimal d = (HiveDecimal) o[0]; diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java index f679387..aca94f6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; -import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; import org.apache.hadoop.hive.ql.util.JavaDataModel; @@ -154,9 +153,7 @@ public Object terminate(AggregationBuffer agg) throws HiveException { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); - return new FirstValStreamingFixedWindow(this, start.getAmt(), end.getAmt()); + return new FirstValStreamingFixedWindow(this, wFrmDef); } } @@ -178,8 +175,8 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { private final Deque valueChain; - public State(int numPreceding, int numFollowing, AggregationBuffer buf) { - super(numPreceding, numFollowing, buf); + public State(int numPreceding, int numFollowing, int offset, AggregationBuffer buf) { + super(numPreceding, numFollowing, offset, buf); valueChain = new ArrayDeque(numPreceding + numFollowing + 1); } @@ -212,9 +209,8 @@ protected void reset() { } } - public FirstValStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, int numPreceding, - int numFollowing) { - super(wrappedEval, numPreceding, numFollowing); + public FirstValStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, WindowFrameDef wFrmDef) { + super(wrappedEval, wFrmDef); } @Override @@ -225,7 +221,7 @@ public int getRowsRemainingAfterTerminate() throws HiveException { @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); - return new State(numPreceding, numFollowing, underlying); + return new State(numPreceding, numFollowing, offset, underlying); } protected ObjectInspector inputOI() { @@ -274,12 +270,8 @@ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveExcep } s.numRows++; - if (s.valueChain.size() > 0) { - int fIdx = (Integer) s.valueChain.getFirst().idx; - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && s.numRows > fIdx + s.numPreceding + s.numFollowing) { - s.valueChain.removeFirst(); - } + if (!s.valueChain.isEmpty() && s.isWindowFull(s.valueChain.getFirst().idx)) { + s.valueChain.removeFirst(); } } @@ -291,17 +283,12 @@ public Object terminate(AggregationBuffer agg) throws HiveException { for (int i = 0; i < s.numFollowing; i++) { s.results.add(r == null ? null : r.val); s.numRows++; - if (r != null) { - int fIdx = (Integer) r.idx; - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && s.numRows > fIdx + s.numPreceding + s.numFollowing - && !s.valueChain.isEmpty()) { - s.valueChain.removeFirst(); - r = !s.valueChain.isEmpty() ? s.valueChain.getFirst() : r; - } + if (r != null && s.isWindowFull(r.idx)) { + s.valueChain.removeFirst(); + r = !s.valueChain.isEmpty() ? s.valueChain.getFirst() : r; } } - + super.terminate(agg); return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java index e099154..0f0498c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java @@ -25,8 +25,6 @@ import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; -import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -141,9 +139,7 @@ public Object terminate(AggregationBuffer agg) throws HiveException { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); - return new LastValStreamingFixedWindow(this, start.getAmt(), end.getAmt()); + return new LastValStreamingFixedWindow(this, wFrmDef); } } @@ -154,8 +150,8 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { private Object lastValue; private int lastIdx; - public State(int numPreceding, int numFollowing, AggregationBuffer buf) { - super(numPreceding, numFollowing, buf); + public State(int numPreceding, int numFollowing, int offset, AggregationBuffer buf) { + super(numPreceding, numFollowing, offset, buf); lastValue = null; lastIdx = -1; } @@ -179,9 +175,8 @@ protected void reset() { } } - public LastValStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, int numPreceding, - int numFollowing) { - super(wrappedEval, numPreceding, numFollowing); + public LastValStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, WindowFrameDef wFrmDef) { + super(wrappedEval, wFrmDef); } @Override @@ -192,7 +187,7 @@ public int getRowsRemainingAfterTerminate() throws HiveException { @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); - return new State(numPreceding, numFollowing, underlying); + return new State(numPreceding, numFollowing, offset, underlying); } protected ObjectInspector inputOI() { @@ -218,12 +213,9 @@ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveExcep if (!lb.skipNulls || o != null) { s.lastValue = o; s.lastIdx = s.numRows; - } else if (lb.skipNulls && s.lastIdx != -1) { - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && s.numRows > s.lastIdx + s.numPreceding + s.numFollowing) { - s.lastValue = null; - s.lastIdx = -1; - } + } else if (lb.skipNulls && s.lastIdx != -1 && s.isWindowFull(s.lastIdx)) { + s.lastValue = null; + s.lastIdx = -1; } if (s.numRows >= (s.numFollowing)) { @@ -237,17 +229,15 @@ public Object terminate(AggregationBuffer agg) throws HiveException { State s = (State) agg; LastValueBuffer lb = (LastValueBuffer) s.wrappedBuf; - if (lb.skipNulls && s.lastIdx != -1) { - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && s.numRows > s.lastIdx + s.numPreceding + s.numFollowing) { - s.lastValue = null; - s.lastIdx = -1; - } + if (lb.skipNulls && s.lastIdx != -1 && s.isWindowFull(s.lastIdx)) { + s.lastValue = null; + s.lastIdx = -1; } for (int i = 0; i < s.numFollowing; i++) { s.results.add(s.lastValue); } + super.terminate(agg); return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java index a153818..3a8a616 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java @@ -27,10 +27,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; -import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.UDFType; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; @@ -130,9 +128,7 @@ public Object terminate(AggregationBuffer agg) throws HiveException { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); - return new MaxStreamingFixedWindow(this, start.getAmt(), end.getAmt()); + return new MaxStreamingFixedWindow(this, wFrmDef); } } @@ -166,8 +162,8 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { class State extends GenericUDAFStreamingEvaluator.StreamingState { private final Deque maxChain; - public State(int numPreceding, int numFollowing, AggregationBuffer buf) { - super(numPreceding, numFollowing, buf); + public State(int numPreceding, int numFollowing, int offset, AggregationBuffer buf) { + super(numPreceding, numFollowing, offset, buf); maxChain = new ArrayDeque(numPreceding + numFollowing + 1); } @@ -201,15 +197,14 @@ protected void reset() { } - public MaxStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, - int numPreceding, int numFollowing) { - super(wrappedEval, numPreceding, numFollowing); + public MaxStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, WindowFrameDef wFrmDef) { + super(wrappedEval, wFrmDef); } @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); - return new State(numPreceding, numFollowing, underlying); + return new State(numPreceding, numFollowing, offset, underlying); } protected ObjectInspector inputOI() { @@ -253,8 +248,7 @@ public void iterate(AggregationBuffer agg, Object[] parameters) s.numRows++; int fIdx = (Integer) s.maxChain.getFirst()[1]; - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && s.numRows > fIdx + s.numPreceding + s.numFollowing) { + if (s.isWindowFull(fIdx)) { s.maxChain.removeFirst(); } } @@ -275,23 +269,18 @@ private boolean isGreater(Object in, Object last) { @Override public Object terminate(AggregationBuffer agg) throws HiveException { - State s = (State) agg; - Object[] r = s.maxChain.getFirst(); + Object[] r = s.maxChain.isEmpty() ? null : s.maxChain.getFirst(); for (int i = 0; i < s.numFollowing; i++) { - s.results.add(r[0]); + s.results.add(r == null ? null : r[0]); s.numRows++; - int fIdx = (Integer) r[1]; - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && s.numRows - s.numFollowing + i > fIdx + s.numPreceding - && !s.maxChain.isEmpty()) { + if (r != null && s.isWindowFull((Integer) r[1])) { s.maxChain.removeFirst(); r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : r; } } - - return null; + return super.terminate(agg); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java index d931d52..747661e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax.MaxStreamingFixedWindow; @@ -125,18 +124,15 @@ public Object terminate(AggregationBuffer agg) throws HiveException { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); - return new MinStreamingFixedWindow(this, start.getAmt(), end.getAmt()); + return new MinStreamingFixedWindow(this, wFrmDef); } } static class MinStreamingFixedWindow extends MaxStreamingFixedWindow { - public MinStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, - int numPreceding, int numFollowing) { - super(wrappedEval, numPreceding, numFollowing); + public MinStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, WindowFrameDef wFrmDef) { + super(wrappedEval, wFrmDef); } protected ObjectInspector inputOI() { diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java index d68c085..5741053 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java @@ -22,8 +22,10 @@ import java.util.List; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -32,14 +34,30 @@ GenericUDAFEvaluator implements ISupportStreamingModeForWindowing { protected final GenericUDAFEvaluator wrappedEval; + + protected final int offset; protected final int numPreceding; protected final int numFollowing; - public GenericUDAFStreamingEvaluator(GenericUDAFEvaluator wrappedEval, - int numPreceding, int numFollowing) { + public GenericUDAFStreamingEvaluator(GenericUDAFEvaluator wrappedEval, WindowFrameDef frameDef) { this.wrappedEval = wrappedEval; - this.numPreceding = numPreceding; - this.numFollowing = numFollowing; + BoundaryDef start = frameDef.getStart(); + BoundaryDef end = frameDef.getEnd(); + if (start.getDirection() == WindowingSpec.Direction.FOLLOWING) { + offset = start.getAmt(); + numPreceding = 0; + numFollowing = end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT ? + BoundarySpec.UNBOUNDED_AMOUNT : end.getAmt() - start.getAmt(); + } else if (end.getDirection() == WindowingSpec.Direction.PRECEDING) { + offset = -end.getAmt(); + numPreceding = start.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT ? + BoundarySpec.UNBOUNDED_AMOUNT : start.getAmt() - end.getAmt(); + numFollowing = 0; + } else { + offset = 0; + numPreceding = start.getAmt(); + numFollowing = end.getAmt(); + } this.mode = wrappedEval.mode; } @@ -48,20 +66,50 @@ public GenericUDAFStreamingEvaluator(GenericUDAFEvaluator wrappedEval, final int numPreceding; final int numFollowing; final List results; + final int offset; int numRows; - StreamingState(int numPreceding, int numFollowing, AggregationBuffer buf) { + StreamingState(int numPreceding, int numFollowing, int offset, AggregationBuffer buf) { this.wrappedBuf = buf; this.numPreceding = numPreceding; this.numFollowing = numFollowing; - results = new ArrayList(); - numRows = 0; + this.results = new ArrayList(); + for (int x = offset; x < 0; x++) { + results.add(null); // nulls for minus offset (preceding + preceding) + } + this.offset = Math.max(0, offset); } protected void reset() { results.clear(); numRows = 0; } + + protected Object getNextResult() { + if (results.size() > offset) { + T1 res = results.remove(offset); + if (res == null) { + return ISupportStreamingModeForWindowing.NULL_RESULT; + } + return res; + } + return null; + } + + protected void terminate() { + for (int x = offset; x > 0; x--) { + results.add(null); // nulls for positive offset (following + following) + } + } + + protected boolean isWindowFull() { + return isWindowFull(0); + } + + protected boolean isWindowFull(int offset) { + return numPreceding != BoundarySpec.UNBOUNDED_AMOUNT && + numRows > numPreceding + numFollowing + offset; + } } @Override @@ -78,6 +126,12 @@ public void reset(AggregationBuffer agg) throws HiveException { } @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + ((StreamingState) agg).terminate(); + return null; + } + + @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { throw new HiveException(getClass().getSimpleName() + ": terminatePartial not supported"); @@ -91,32 +145,23 @@ public void merge(AggregationBuffer agg, Object partial) throws HiveException { @Override public Object getNextResult(AggregationBuffer agg) throws HiveException { - StreamingState ss = (StreamingState) agg; - if (!ss.results.isEmpty()) { - T1 res = ss.results.remove(0); - if (res == null) { - return ISupportStreamingModeForWindowing.NULL_RESULT; - } - return res; - } - return null; + return ((StreamingState) agg).getNextResult(); } public static abstract class SumAvgEnhancer extends GenericUDAFStreamingEvaluator { - public SumAvgEnhancer(GenericUDAFEvaluator wrappedEval, int numPreceding, - int numFollowing) { - super(wrappedEval, numPreceding, numFollowing); + public SumAvgEnhancer(GenericUDAFEvaluator wrappedEval, WindowFrameDef frameDef) { + super(wrappedEval, frameDef); } class SumAvgStreamingState extends StreamingState { final List intermediateVals; - SumAvgStreamingState(int numPreceding, int numFollowing, + SumAvgStreamingState(int numPreceding, int numFollowing, int offset, AggregationBuffer buf) { - super(numPreceding, numFollowing, buf); + super(numPreceding, numFollowing, offset, buf); intermediateVals = new ArrayList(); } @@ -152,7 +197,7 @@ protected void reset() { @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); - return new SumAvgStreamingState(numPreceding, numFollowing, underlying); + return new SumAvgStreamingState(numPreceding, numFollowing, offset, underlying); } @Override @@ -175,13 +220,11 @@ public void iterate(AggregationBuffer agg, Object[] parameters) @Override public Object terminate(AggregationBuffer agg) throws HiveException { SumAvgStreamingState ss = (SumAvgStreamingState) agg; - Object o = wrappedEval.terminate(ss.wrappedBuf); - for (int i = 0; i < ss.numFollowing; i++) { ss.results.add(getNextResult(ss)); ss.numRows++; } - return o; + return super.terminate(agg); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java index d1118f1..37f7a96 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java @@ -24,8 +24,6 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; -import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -184,11 +182,8 @@ public Object terminate(AggregationBuffer agg) throws HiveException { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); - return new GenericUDAFStreamingEvaluator.SumAvgEnhancer( - this, start.getAmt(), end.getAmt()) { + this, wFrmDef) { @Override protected HiveDecimalWritable getNextResult( @@ -196,9 +191,8 @@ protected HiveDecimalWritable getNextResult( throws HiveException { SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf; HiveDecimal r = myagg.empty ? null : myagg.sum; - if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { - HiveDecimal d = (HiveDecimal) ss.intermediateVals.remove(0); + if (ss.isWindowFull()) { + HiveDecimal d = ss.intermediateVals.remove(0); d = d == null ? HiveDecimal.ZERO : d; r = r == null ? null : r.subtract(d); } @@ -303,11 +297,9 @@ public Object terminate(AggregationBuffer agg) throws HiveException { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); - return new GenericUDAFStreamingEvaluator.SumAvgEnhancer(this, - start.getAmt(), end.getAmt()) { + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer(this, + wFrmDef) { @Override protected DoubleWritable getNextResult( @@ -315,9 +307,8 @@ protected DoubleWritable getNextResult( throws HiveException { SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf; Double r = myagg.empty ? null : myagg.sum; - if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { - Double d = (Double) ss.intermediateVals.remove(0); + if (ss.isWindowFull()) { + Double d = ss.intermediateVals.remove(0); d = d == null ? 0.0 : d; r = r == null ? null : r - d; } @@ -421,11 +412,8 @@ public Object terminate(AggregationBuffer agg) throws HiveException { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); - return new GenericUDAFStreamingEvaluator.SumAvgEnhancer(this, - start.getAmt(), end.getAmt()) { + wFrmDef) { @Override protected LongWritable getNextResult( @@ -433,9 +421,8 @@ protected LongWritable getNextResult( throws HiveException { SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf; Long r = myagg.empty ? null : myagg.sum; - if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { - Long d = (Long) ss.intermediateVals.remove(0); + if (ss.isWindowFull()) { + Long d = ss.intermediateVals.remove(0); d = d == null ? 0 : d; r = r == null ? null : r - d; } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java index 903a9b0..3ad8fc8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java @@ -69,8 +69,7 @@ public void execute(PTFPartitionIterator pItr, PTFPartition outP) throws HiveException { ArrayList> oColumns = new ArrayList>(); PTFPartition iPart = pItr.getPartition(); - StructObjectInspector inputOI; - inputOI = (StructObjectInspector) iPart.getOutputOI(); + StructObjectInspector inputOI = iPart.getOutputOI(); WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef(); Order order = wTFnDef.getOrder().getExpressions().get(0).getOrder(); @@ -668,10 +667,7 @@ int getRowBoundaryEnd(BoundaryDef b, int currRow, PTFPartition p) throws HiveExc int amt = b.getAmt(); switch(d) { case PRECEDING: - if ( amt == 0 ) { - return currRow + 1; - } - return currRow - amt; + return currRow - amt + 1; case CURRENT: return currRow + 1; case FOLLOWING: @@ -1220,6 +1216,7 @@ public int size() { ArrayList output; List[] outputFromPivotFunctions; + ISupportStreamingModeForWindowing[] windowingEvals; int currIdx; PTFPartition iPart; /* @@ -1236,7 +1233,7 @@ public int size() { RankLimit rnkLimit; WindowingIterator(PTFPartition iPart, ArrayList output, - List[] outputFromPivotFunctions, int[] wFnsToProcess) { + List[] outputFromPivotFunctions, int[] wFnsToProcess) throws HiveException { this.iPart = iPart; this.output = output; this.outputFromPivotFunctions = outputFromPivotFunctions; @@ -1247,17 +1244,27 @@ public int size() { ptfDesc = getQueryDef(); inputOI = iPart.getOutputOI(); - aggBuffers = new AggregationBuffer[wTFnDef.getWindowFunctions().size()]; - args = new Object[wTFnDef.getWindowFunctions().size()][]; - try { + List windowFunctions = wTFnDef.getWindowFunctions(); + windowingEvals = new ISupportStreamingModeForWindowing[windowFunctions.size()]; + + if (ptfDesc.getLlInfo().getLeadLagExprs() == null) { + aggBuffers = new AggregationBuffer[windowFunctions.size()]; + args = new Object[windowFunctions.size()][]; for (int j : wFnsToProcess) { - WindowFunctionDef wFn = wTFnDef.getWindowFunctions().get(j); - aggBuffers[j] = wFn.getWFnEval().getNewAggregationBuffer(); - args[j] = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()]; + WindowFunctionDef wFn = windowFunctions.get(j); + if (!wFn.windowIncludesCurrent()) { + // cannot return evaluation of window function for a row immediately + continue; + } + GenericUDAFEvaluator wFnEval = wFn.getWFnEval(); + if (wFnEval instanceof ISupportStreamingModeForWindowing) { + windowingEvals[j] = (ISupportStreamingModeForWindowing) wFnEval; + aggBuffers[j] = wFnEval.getNewAggregationBuffer(); + args[j] = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()]; + } } - } catch (HiveException he) { - throw new RuntimeException(he); } + if ( WindowingTableFunction.this.rnkLimitDef != null ) { rnkLimit = new RankLimit(WindowingTableFunction.this.rnkLimitDef); } @@ -1284,17 +1291,10 @@ public Object next() { try { for (int j : wFnsToProcess) { WindowFunctionDef wFn = wTFnDef.getWindowFunctions().get(j); - if (wFn.getWFnEval() instanceof ISupportStreamingModeForWindowing) { - Object iRow = iPart.getAt(currIdx); - int a = 0; - if (wFn.getArgs() != null) { - for (PTFExpressionDef arg : wFn.getArgs()) { - args[j][a++] = arg.getExprEvaluator().evaluate(iRow); - } - } - wFn.getWFnEval().aggregate(aggBuffers[j], args[j]); - Object out = ((ISupportStreamingModeForWindowing) wFn.getWFnEval()) - .getNextResult(aggBuffers[j]); + if (windowingEvals[j] != null) { + makeArgs(args[j], wFn.getArgs()); + ((GenericUDAFEvaluator)windowingEvals[j]).aggregate(aggBuffers[j], args[j]); + Object out = windowingEvals[j].getNextResult(aggBuffers[j]); out = ObjectInspectorUtils.copyToStandardObject(out, wFn.getOI()); output.set(j, out); } else { @@ -1322,6 +1322,17 @@ public Object next() { return output; } + private void makeArgs(Object[] args, List exprs) throws HiveException { + if (args.length == 0) { + return; + } + Object iRow = iPart.getAt(currIdx); + int i = 0; + for (PTFExpressionDef arg : exprs) { + args[i++] = arg.getExprEvaluator().evaluate(iRow); + } + } + @Override public void remove() { throw new UnsupportedOperationException(); diff --git ql/src/test/queries/clientpositive/windowing_windowspec.q ql/src/test/queries/clientpositive/windowing_windowspec.q index 6d8ce67..3879548 100644 --- ql/src/test/queries/clientpositive/windowing_windowspec.q +++ ql/src/test/queries/clientpositive/windowing_windowspec.q @@ -34,3 +34,48 @@ select f, sum(f) over (partition by ts order by f range between unbounded preced select s, i, round(avg(d) over (partition by s order by i) / 10.0 , 2) from over10k limit 7; select s, i, round((avg(d) over w1 + 10.0) - (avg(d) over w1 - 10.0),2) from over10k window w1 as (partition by s order by i) limit 7; + +-- preceding + preceding, following + following + +-- streaming +select value, + min(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + max(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + sum(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + first_value(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + last_value(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + min(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + max(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + sum(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + first_value(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + last_value(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + min(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + max(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + sum(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + first_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + last_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING) +from src tablesample (10 rows); + +-- non-streaming +select value, + min(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + max(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + sum(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + first_value(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + last_value(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + min(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + max(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + sum(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + first_value(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + last_value(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + min(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + max(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + sum(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + first_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + last_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + min(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING), + max(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING), + sum(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING), + first_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING), + last_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) +from src tablesample (10 rows); \ No newline at end of file diff --git ql/src/test/results/clientpositive/windowing_windowspec.q.out ql/src/test/results/clientpositive/windowing_windowspec.q.out index 00af6b8..716451f 100644 --- ql/src/test/results/clientpositive/windowing_windowspec.q.out +++ ql/src/test/results/clientpositive/windowing_windowspec.q.out @@ -830,3 +830,121 @@ alice allen 65609 20.0 alice allen 65662 20.0 alice allen 65670 20.0 alice allen 65720 20.0 +PREHOOK: query: -- preceding + preceding, following + following + +-- streaming +select value, + min(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + max(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + sum(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + first_value(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + last_value(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + min(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + max(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + sum(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + first_value(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + last_value(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + min(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + max(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + sum(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + first_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + last_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING) +from src tablesample (10 rows) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: -- preceding + preceding, following + following + +-- streaming +select value, + min(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + max(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + sum(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + first_value(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + last_value(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + min(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + max(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + sum(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + first_value(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + last_value(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + min(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + max(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + sum(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + first_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + last_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING) +from src tablesample (10 rows) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +val_484 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL 98 278 376 98 278 +val_98 484 484 484 484 484 484 484 484 484 484 255 278 533 278 255 +val_278 98 484 582 484 98 98 484 582 484 98 255 409 664 255 409 +val_255 98 278 376 98 278 98 484 860 484 278 165 409 574 409 165 +val_409 255 278 533 278 255 98 484 1115 484 255 27 165 192 165 27 +val_165 255 409 664 255 409 98 484 1524 484 409 27 311 338 27 311 +val_27 165 409 574 409 165 98 484 1689 484 165 86 311 397 311 86 +val_311 27 165 192 165 27 27 484 1716 484 27 86 238 324 86 238 +val_86 27 311 338 27 311 27 484 2027 484 311 238 238 238 238 238 +val_238 86 311 397 311 86 27 484 2113 484 86 NULL NULL NULL NULL NULL +PREHOOK: query: -- non-streaming +select value, + min(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + max(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + sum(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + first_value(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + last_value(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + min(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + max(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + sum(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + first_value(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + last_value(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + min(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + max(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + sum(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + first_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + last_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + min(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING), + max(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING), + sum(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING), + first_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING), + last_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) +from src tablesample (10 rows) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: -- non-streaming +select value, + min(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + max(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + sum(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + first_value(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + last_value(cast(key as int)) over (ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING), + min(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + max(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + sum(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + first_value(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + last_value(cast(key as int)) over (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), + min(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + max(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + sum(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + first_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + last_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING), + min(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING), + max(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING), + sum(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING), + first_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING), + last_value(cast(key as int)) over (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) +from src tablesample (10 rows) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +val_484 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL 98 278 376 98 278 27 409 1867 98 238 +val_98 484 484 484 484 484 484 484 484 484 484 255 278 533 278 255 27 409 1769 278 238 +val_278 98 484 582 484 98 98 484 582 484 98 255 409 664 255 409 27 409 1491 255 238 +val_255 98 278 376 98 278 98 484 860 484 278 165 409 574 409 165 27 409 1236 409 238 +val_409 255 278 533 278 255 98 484 1115 484 255 27 165 192 165 27 27 311 827 165 238 +val_165 255 409 664 255 409 98 484 1524 484 409 27 311 338 27 311 27 311 662 27 238 +val_27 165 409 574 409 165 98 484 1689 484 165 86 311 397 311 86 86 311 635 311 238 +val_311 27 165 192 165 27 27 484 1716 484 27 86 238 324 86 238 86 238 324 86 238 +val_86 27 311 338 27 311 27 484 2027 484 311 238 238 238 238 238 238 238 238 238 238 +val_238 86 311 397 311 86 27 484 2113 484 86 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL