diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java index 748ce55..2ca48b3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql.udf.generic; +import java.util.ArrayDeque; +import java.util.Deque; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Description; @@ -25,7 +28,11 @@ import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; @@ -163,7 +170,165 @@ public Object terminate(AggregationBuffer agg) throws HiveException { return ((FirstValueBuffer) agg).val; } + + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + BoundaryDef start = wFrmDef.getStart(); + BoundaryDef end = wFrmDef.getEnd(); + return new FirstValStreamingFixedWindow(this, start.getAmt(), + end.getAmt()); + } } + + static class ValIndexPair { + Object val; + int idx; + + ValIndexPair(Object val, int idx) { + this.val = val; + this.idx = idx; + } + } + + static class FirstValStreamingFixedWindow extends + GenericUDAFStreamingEvaluator { + + class State extends GenericUDAFStreamingEvaluator.StreamingState { + private final Deque valueChain; + + public State(int numPreceding, int numFollowing, AggregationBuffer buf) { + super(numPreceding, numFollowing, buf); + valueChain = new ArrayDeque(numPreceding + numFollowing + + 1); + } + + @Override + public int estimate() { + if (!(wrappedBuf instanceof AbstractAggregationBuffer)) { + return -1; + } + int underlying = ((AbstractAggregationBuffer) wrappedBuf).estimate(); + if (underlying == -1) { + return -1; + } + if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) { + return -1; + } + /* + * sz Estimate = sz needed by underlying AggBuffer + sz for results + sz + * for maxChain + 3 * JavaDataModel.PRIMITIVES1 sz of results = sz of + * underlying * wdwSz sz of maxChain = sz of underlying * wdwSz + */ + + int wdwSz = numPreceding + numFollowing + 1; + return underlying + (underlying * wdwSz) + (underlying * wdwSz) + + (3 * JavaDataModel.PRIMITIVES1); + } + + protected void reset() { + valueChain.clear(); + super.reset(); + } + } + + public FirstValStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, + int numPreceding, int numFollowing) { + super(wrappedEval, numPreceding, numFollowing); + } + + @Override + public int getRowsRemainingAfterTerminate() throws HiveException { + throw new UnsupportedOperationException(); + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); + return new State(numPreceding, numFollowing, underlying); + } + + protected ObjectInspector inputOI() { + return ((GenericUDAFFirstValueEvaluator) wrappedEval).inputOI; + } + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) + throws HiveException { + + State s = (State) agg; + FirstValueBuffer fb = (FirstValueBuffer) s.wrappedBuf; + + /* + * on firstRow invoke underlying evaluator to initialize skipNulls flag. + */ + if (fb.firstRow) { + wrappedEval.iterate(fb, parameters); + } + + Object o = ObjectInspectorUtils.copyToStandardObject(parameters[0], + inputOI(), ObjectInspectorCopyOption.WRITABLE); + + /* + * add row to chain. except in case of UNB preceding: - only 1 firstVal + * needs to be tracked. + */ + if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + || s.valueChain.isEmpty()) { + /* + * add value to chain if it is not null or if skipNulls is false. + */ + if (!fb.skipNulls || o != null) { + s.valueChain.add(new ValIndexPair(o, s.numRows)); + } + } + + if (s.numRows >= (s.numFollowing)) { + /* + * if skipNulls is true and there are no rows in valueChain => all rows + * in partition are null so far; so add null in o/p + */ + if (fb.skipNulls && s.valueChain.size() == 0) { + s.results.add(null); + } else { + s.results.add(s.valueChain.getFirst().val); + } + } + s.numRows++; + + if (s.valueChain.size() > 0) { + int fIdx = (Integer) s.valueChain.getFirst().idx; + if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows > fIdx + s.numPreceding + s.numFollowing) { + s.valueChain.removeFirst(); + } + } + } + + @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + State s = (State) agg; + FirstValueBuffer fb = (FirstValueBuffer) s.wrappedBuf; + ValIndexPair r = fb.skipNulls && s.valueChain.size() == 0 ? null + : s.valueChain.getFirst(); + + for (int i = 0; i < s.numFollowing; i++) { + s.results.add(r == null ? null : r.val); + s.numRows++; + if (r != null) { + int fIdx = (Integer) r.idx; + if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows > fIdx + s.numPreceding + s.numFollowing + && !s.valueChain.isEmpty()) { + s.valueChain.removeFirst(); + r = !s.valueChain.isEmpty() ? s.valueChain.getFirst() : r; + } + } + } + + return null; + } + + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java index 138c152..99c41b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java @@ -25,6 +25,9 @@ import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; @@ -129,11 +132,13 @@ public void iterate(AggregationBuffer agg, Object[] parameters) PrimitiveObjectInspectorFactory.writableBooleanObjectInspector); } } + + Object o = ObjectInspectorUtils.copyToStandardObject(parameters[0], + inputOI, ObjectInspectorCopyOption.WRITABLE); - if ( !lb.skipNulls || lb.val != null ) - { - lb.val = parameters[0]; - } + if (!lb.skipNulls || o != null) { + lb.val = o; + } } @Override @@ -154,11 +159,124 @@ public void merge(AggregationBuffer agg, Object partial) public Object terminate(AggregationBuffer agg) throws HiveException { LastValueBuffer lb = (LastValueBuffer) agg; - return ObjectInspectorUtils.copyToStandardObject(lb.val, inputOI, - ObjectInspectorCopyOption.WRITABLE); + return lb.val; } + + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + BoundaryDef start = wFrmDef.getStart(); + BoundaryDef end = wFrmDef.getEnd(); + return new LastValStreamingFixedWindow(this, start.getAmt(), end.getAmt()); + } } + static class LastValStreamingFixedWindow extends + GenericUDAFStreamingEvaluator { + + class State extends GenericUDAFStreamingEvaluator.StreamingState { + private Object lastValue; + private int lastIdx; + + public State(int numPreceding, int numFollowing, AggregationBuffer buf) { + super(numPreceding, numFollowing, buf); + lastValue = null; + lastIdx = -1; + } + + @Override + public int estimate() { + if (!(wrappedBuf instanceof AbstractAggregationBuffer)) { + return -1; + } + int underlying = ((AbstractAggregationBuffer) wrappedBuf).estimate(); + if (underlying == -1) { + return -1; + } + return 2 * underlying; + } + + protected void reset() { + lastValue = null; + lastIdx = -1; + super.reset(); + } + } + + public LastValStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, + int numPreceding, int numFollowing) { + super(wrappedEval, numPreceding, numFollowing); + } + + @Override + public int getRowsRemainingAfterTerminate() throws HiveException { + throw new UnsupportedOperationException(); + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); + return new State(numPreceding, numFollowing, underlying); + } + + protected ObjectInspector inputOI() { + return ((GenericUDAFLastValueEvaluator) wrappedEval).inputOI; + } + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) + throws HiveException { + + State s = (State) agg; + LastValueBuffer lb = (LastValueBuffer) s.wrappedBuf; + + /* + * on firstRow invoke underlying evaluator to initialize skipNulls flag. + */ + if (lb.firstRow) { + wrappedEval.iterate(lb, parameters); + } + + Object o = ObjectInspectorUtils.copyToStandardObject(parameters[0], + inputOI(), ObjectInspectorCopyOption.WRITABLE); + + if (!lb.skipNulls || o != null) { + s.lastValue = o; + s.lastIdx = s.numRows; + } else if (lb.skipNulls && s.lastIdx != -1) { + if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows > s.lastIdx + s.numPreceding + s.numFollowing) { + s.lastValue = null; + s.lastIdx = -1; + } + } + + if (s.numRows >= (s.numFollowing)) { + s.results.add(s.lastValue); + } + s.numRows++; + } + + @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + State s = (State) agg; + LastValueBuffer lb = (LastValueBuffer) s.wrappedBuf; + + if (lb.skipNulls && s.lastIdx != -1) { + if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows > s.lastIdx + s.numPreceding + s.numFollowing) { + s.lastValue = null; + s.lastIdx = -1; + } + } + + for (int i = 0; i < s.numFollowing; i++) { + s.results.add(s.lastValue); + } + + return null; + } + + } } diff --git ql/src/test/queries/clientpositive/windowing_navfn.q ql/src/test/queries/clientpositive/windowing_navfn.q index 05da2ba..e275975 100644 --- ql/src/test/queries/clientpositive/windowing_navfn.q +++ ql/src/test/queries/clientpositive/windowing_navfn.q @@ -29,3 +29,46 @@ select s, first_value(s) over (partition by bo order by s) from over10k limit 10 select t, s, i, last_value(i) over (partition by t order by s) from over10k where (s = 'oscar allen' or s = 'oscar carson') and t = 10; + +drop table if exists wtest; +create table wtest as +select a, b +from +( +SELECT explode( + map( + 3, array(1,2,3,4,5), + 1, array(int(null),int(null),int(null), int(null), int(null)), + 2, array(1,null,2, null, 3) + ) + ) as (a,barr) FROM (select * from src limit 1) s + ) s1 lateral view explode(barr) arr as b; + +select a, b, +first_value(b) over (partition by a order by b rows between 1 preceding and 1 following ) , +first_value(b, true) over (partition by a order by b rows between 1 preceding and 1 following ) , +first_value(b) over (partition by a order by b rows between unbounded preceding and 1 following ) , +first_value(b, true) over (partition by a order by b rows between unbounded preceding and 1 following ) +from wtest; + + +select a, b, +first_value(b) over (partition by a order by b desc rows between 1 preceding and 1 following ) , +first_value(b, true) over (partition by a order by b desc rows between 1 preceding and 1 following ) , +first_value(b) over (partition by a order by b desc rows between unbounded preceding and 1 following ) , +first_value(b, true) over (partition by a order by b desc rows between unbounded preceding and 1 following ) +from wtest; + +select a, b, +last_value(b) over (partition by a order by b rows between 1 preceding and 1 following ) , +last_value(b, true) over (partition by a order by b rows between 1 preceding and 1 following ) , +last_value(b) over (partition by a order by b rows between unbounded preceding and 1 following ) , +last_value(b, true) over (partition by a order by b rows between unbounded preceding and 1 following ) +from wtest; + +select a, b, +last_value(b) over (partition by a order by b desc rows between 1 preceding and 1 following ) , +last_value(b, true) over (partition by a order by b desc rows between 1 preceding and 1 following ) , +last_value(b) over (partition by a order by b desc rows between unbounded preceding and 1 following ) , +last_value(b, true) over (partition by a order by b desc rows between unbounded preceding and 1 following ) +from wtest; \ No newline at end of file diff --git ql/src/test/results/clientpositive/windowing_navfn.q.out ql/src/test/results/clientpositive/windowing_navfn.q.out index b2038d2..95d7942 100644 --- ql/src/test/results/clientpositive/windowing_navfn.q.out +++ ql/src/test/results/clientpositive/windowing_navfn.q.out @@ -595,3 +595,168 @@ POSTHOOK: Input: default@over10k #### A masked pattern was here #### 10 oscar allen 65662 65662 10 oscar carson 65549 65549 +PREHOOK: query: drop table if exists wtest +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists wtest +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table wtest as +select a, b +from +( +SELECT explode( + map( + 3, array(1,2,3,4,5), + 1, array(int(null),int(null),int(null), int(null), int(null)), + 2, array(1,null,2, null, 3) + ) + ) as (a,barr) FROM (select * from src limit 1) s + ) s1 lateral view explode(barr) arr as b +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +POSTHOOK: query: create table wtest as +select a, b +from +( +SELECT explode( + map( + 3, array(1,2,3,4,5), + 1, array(int(null),int(null),int(null), int(null), int(null)), + 2, array(1,null,2, null, 3) + ) + ) as (a,barr) FROM (select * from src limit 1) s + ) s1 lateral view explode(barr) arr as b +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: default@wtest +PREHOOK: query: select a, b, +first_value(b) over (partition by a order by b rows between 1 preceding and 1 following ) , +first_value(b, true) over (partition by a order by b rows between 1 preceding and 1 following ) , +first_value(b) over (partition by a order by b rows between unbounded preceding and 1 following ) , +first_value(b, true) over (partition by a order by b rows between unbounded preceding and 1 following ) +from wtest +PREHOOK: type: QUERY +PREHOOK: Input: default@wtest +#### A masked pattern was here #### +POSTHOOK: query: select a, b, +first_value(b) over (partition by a order by b rows between 1 preceding and 1 following ) , +first_value(b, true) over (partition by a order by b rows between 1 preceding and 1 following ) , +first_value(b) over (partition by a order by b rows between unbounded preceding and 1 following ) , +first_value(b, true) over (partition by a order by b rows between unbounded preceding and 1 following ) +from wtest +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wtest +#### A masked pattern was here #### +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +2 NULL NULL NULL NULL NULL +2 NULL NULL 1 NULL 1 +2 1 NULL 1 NULL 1 +2 2 1 1 NULL 1 +2 3 2 2 NULL 1 +3 1 1 1 1 1 +3 2 1 1 1 1 +3 3 2 2 1 1 +3 4 3 3 1 1 +3 5 4 4 1 1 +PREHOOK: query: select a, b, +first_value(b) over (partition by a order by b desc rows between 1 preceding and 1 following ) , +first_value(b, true) over (partition by a order by b desc rows between 1 preceding and 1 following ) , +first_value(b) over (partition by a order by b desc rows between unbounded preceding and 1 following ) , +first_value(b, true) over (partition by a order by b desc rows between unbounded preceding and 1 following ) +from wtest +PREHOOK: type: QUERY +PREHOOK: Input: default@wtest +#### A masked pattern was here #### +POSTHOOK: query: select a, b, +first_value(b) over (partition by a order by b desc rows between 1 preceding and 1 following ) , +first_value(b, true) over (partition by a order by b desc rows between 1 preceding and 1 following ) , +first_value(b) over (partition by a order by b desc rows between unbounded preceding and 1 following ) , +first_value(b, true) over (partition by a order by b desc rows between unbounded preceding and 1 following ) +from wtest +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wtest +#### A masked pattern was here #### +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +2 3 3 3 3 3 +2 2 3 3 3 3 +2 1 2 2 3 3 +2 NULL 1 1 3 3 +2 NULL NULL NULL 3 3 +3 5 5 5 5 5 +3 4 5 5 5 5 +3 3 4 4 5 5 +3 2 3 3 5 5 +3 1 2 2 5 5 +PREHOOK: query: select a, b, +last_value(b) over (partition by a order by b rows between 1 preceding and 1 following ) , +last_value(b, true) over (partition by a order by b rows between 1 preceding and 1 following ) , +last_value(b) over (partition by a order by b rows between unbounded preceding and 1 following ) , +last_value(b, true) over (partition by a order by b rows between unbounded preceding and 1 following ) +from wtest +PREHOOK: type: QUERY +PREHOOK: Input: default@wtest +#### A masked pattern was here #### +POSTHOOK: query: select a, b, +last_value(b) over (partition by a order by b rows between 1 preceding and 1 following ) , +last_value(b, true) over (partition by a order by b rows between 1 preceding and 1 following ) , +last_value(b) over (partition by a order by b rows between unbounded preceding and 1 following ) , +last_value(b, true) over (partition by a order by b rows between unbounded preceding and 1 following ) +from wtest +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wtest +#### A masked pattern was here #### +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +2 NULL NULL NULL NULL NULL +2 NULL 1 1 1 1 +2 1 2 2 2 2 +2 2 3 3 3 3 +2 3 3 3 3 3 +3 1 2 2 2 2 +3 2 3 3 3 3 +3 3 4 4 4 4 +3 4 5 5 5 5 +3 5 5 5 5 5 +PREHOOK: query: select a, b, +last_value(b) over (partition by a order by b desc rows between 1 preceding and 1 following ) , +last_value(b, true) over (partition by a order by b desc rows between 1 preceding and 1 following ) , +last_value(b) over (partition by a order by b desc rows between unbounded preceding and 1 following ) , +last_value(b, true) over (partition by a order by b desc rows between unbounded preceding and 1 following ) +from wtest +PREHOOK: type: QUERY +PREHOOK: Input: default@wtest +#### A masked pattern was here #### +POSTHOOK: query: select a, b, +last_value(b) over (partition by a order by b desc rows between 1 preceding and 1 following ) , +last_value(b, true) over (partition by a order by b desc rows between 1 preceding and 1 following ) , +last_value(b) over (partition by a order by b desc rows between unbounded preceding and 1 following ) , +last_value(b, true) over (partition by a order by b desc rows between unbounded preceding and 1 following ) +from wtest +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wtest +#### A masked pattern was here #### +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +1 NULL NULL NULL NULL NULL +2 3 2 2 2 2 +2 2 1 1 1 1 +2 1 NULL 1 NULL 1 +2 NULL NULL 1 NULL 1 +2 NULL NULL NULL NULL 1 +3 5 4 4 4 4 +3 4 3 3 3 3 +3 3 2 2 2 2 +3 2 1 1 1 1 +3 1 1 1 1 1