diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java index ab3e0bf..d857421 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java @@ -28,10 +28,8 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; -import org.apache.hadoop.hive.ql.plan.ptf.ValueBoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum.GenericUDAFSumDouble.SumDoubleAgg; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; @@ -164,26 +162,12 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { BoundaryDef start = wFrmDef.getStart(); BoundaryDef end = wFrmDef.getEnd(); - /* - * Currently we are not handling dynamic sized windows implied by range based windows. - */ - if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) { - return null; - } - - /* - * Windows that are unbounded following don't benefit from Streaming. - */ - if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { - return null; - } - - return new GenericUDAFStreamingEnhancer(this, + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer(this, start.getAmt(), end.getAmt()) { @Override protected DoubleWritable getNextResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; Double r = myagg.count == 0 ? null : myagg.sum; @@ -201,7 +185,7 @@ protected DoubleWritable getNextResult( @Override protected Object[] getCurrentIntermediateResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; return myagg.count == 0 ? null : new Object[] { @@ -306,20 +290,12 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { BoundaryDef start = wFrmDef.getStart(); BoundaryDef end = wFrmDef.getEnd(); - if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) { - return null; - } - - if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { - return null; - } - - return new GenericUDAFStreamingEnhancer( + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer( this, start.getAmt(), end.getAmt()) { @Override protected HiveDecimalWritable getNextResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; HiveDecimal r = myagg.count == 0 ? null : myagg.sum; @@ -338,7 +314,7 @@ protected HiveDecimalWritable getNextResult( @Override protected Object[] getCurrentIntermediateResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; return myagg.count == 0 ? null : new Object[] { myagg.sum, diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLag.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLag.java index d265acc..fa5047d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLag.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLag.java @@ -25,6 +25,12 @@ import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead.GenericUDAFLeadEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead.GenericUDAFLeadEvaluatorStreaming; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead.LeadBuffer; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLeadLag.GenericUDAFLeadLagEvaluator; @WindowFunctionDescription ( @@ -53,10 +59,27 @@ protected GenericUDAFLeadLagEvaluator createLLEvaluator() { public static class GenericUDAFLagEvaluator extends GenericUDAFLeadLagEvaluator { + public GenericUDAFLagEvaluator() { + } + + /* + * used to initialize Streaming Evaluator. + */ + protected GenericUDAFLagEvaluator(GenericUDAFLeadLagEvaluator src) { + super(src); + } + @Override protected LeadLagBuffer getNewLLBuffer() throws HiveException { return new LagBuffer(); } + + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + + return new GenericUDAFLagEvaluatorStreaming(this); + } + } static class LagBuffer implements LeadLagBuffer { @@ -88,6 +111,7 @@ public Object terminate() { * the entire partition is in lagValues. */ if ( values.size() < lagAmt ) { + values = lagValues; return lagValues; } @@ -99,4 +123,42 @@ public Object terminate() { return values; } } + + /* + * StreamingEval: wrap regular eval. on getNext remove first row from values + * and return it. + */ + static class GenericUDAFLagEvaluatorStreaming extends GenericUDAFLagEvaluator + implements ISupportStreamingModeForWindowing { + + protected GenericUDAFLagEvaluatorStreaming(GenericUDAFLeadLagEvaluator src) { + super(src); + } + + @Override + public Object getNextResult(AggregationBuffer agg) throws HiveException { + LagBuffer lb = (LagBuffer) agg; + + if (!lb.lagValues.isEmpty()) { + Object res = lb.lagValues.remove(0); + if (res == null) { + return ISupportStreamingModeForWindowing.NULL_RESULT; + } + return res; + } else if (!lb.values.isEmpty()) { + Object res = lb.values.remove(0); + if (res == null) { + return ISupportStreamingModeForWindowing.NULL_RESULT; + } + return res; + } + return null; + } + + @Override + public int getRowsRemainingAfterTerminate() throws HiveException { + return getAmt(); + } + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLead.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLead.java index d7c18e5..6a27325 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLead.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLead.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; @WindowFunctionDescription ( @@ -53,10 +55,26 @@ protected GenericUDAFLeadLagEvaluator createLLEvaluator() { public static class GenericUDAFLeadEvaluator extends GenericUDAFLeadLagEvaluator { + public GenericUDAFLeadEvaluator() { + } + + /* + * used to initialize Streaming Evaluator. + */ + protected GenericUDAFLeadEvaluator(GenericUDAFLeadLagEvaluator src) { + super(src); + } + @Override protected LeadLagBuffer getNewLLBuffer() throws HiveException { return new LeadBuffer(); } + + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + + return new GenericUDAFLeadEvaluatorStreaming(this); + } } @@ -103,4 +121,34 @@ public Object terminate() { } + /* + * StreamingEval: wrap regular eval. on getNext remove first row from values + * and return it. + */ + static class GenericUDAFLeadEvaluatorStreaming extends + GenericUDAFLeadEvaluator implements ISupportStreamingModeForWindowing { + + protected GenericUDAFLeadEvaluatorStreaming(GenericUDAFLeadLagEvaluator src) { + super(src); + } + + @Override + public Object getNextResult(AggregationBuffer agg) throws HiveException { + LeadBuffer lb = (LeadBuffer) agg; + if (!lb.values.isEmpty()) { + Object res = lb.values.remove(0); + if (res == null) { + return ISupportStreamingModeForWindowing.NULL_RESULT; + } + return res; + } + return null; + } + + @Override + public int getRowsRemainingAfterTerminate() throws HiveException { + return getAmt(); + } + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLeadLag.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLeadLag.java index 295cd2e..79abc0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLeadLag.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLeadLag.java @@ -89,6 +89,20 @@ public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo parameters) String fnName; private transient Converter defaultValueConverter; + public GenericUDAFLeadLagEvaluator() { + } + + /* + * used to initialize Streaming Evaluator. + */ + protected GenericUDAFLeadLagEvaluator(GenericUDAFLeadLagEvaluator src) { + this.inputOI = src.inputOI; + this.amt = src.amt; + this.fnName = src.fnName; + this.defaultValueConverter = src.defaultValueConverter; + this.mode = src.mode; + } + @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java index d6e9db4..a153818 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java @@ -17,13 +17,21 @@ */ package org.apache.hadoop.hive.ql.udf.generic; +import java.util.ArrayDeque; +import java.util.Deque; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; @@ -120,6 +128,177 @@ public Object terminate(AggregationBuffer agg) throws HiveException { return myagg.o; } + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + BoundaryDef start = wFrmDef.getStart(); + BoundaryDef end = wFrmDef.getEnd(); + return new MaxStreamingFixedWindow(this, start.getAmt(), end.getAmt()); + } + + } + + /* + * Based on the Paper by Daniel Lemire: Streaming Max-Min filter using no more + * than 3 comparisons per elem. + * + * 1. His algorithm works on fixed size windows up to the current row. For row + * 'i' and window 'w' it computes the min/max for window (i-w, i). 2. The core + * idea is to keep a queue of (max, idx) tuples. A tuple in the queue + * represents the max value in the range (prev tuple.idx, idx). Using the + * queue data structure and following 2 operations it is easy to see that + * maxes can be computed: - on receiving the ith row; drain the queue from the + * back of any entries whose value is less than the ith entry; add the ith + * value as a tuple in the queue (i-val, i) - on the ith step, check if the + * element at the front of the queue has reached its max range of influence; + * i.e. frontTuple.idx + w > i. If yes we can remove it from the queue. - on + * the ith step o/p the front of the queue as the max for the ith entry. + * + * Here we modify the algorithm: 1. to handle window's that are of the form + * (i-p, i+f), where p is numPreceding,f = numFollowing - we start outputing + * rows only after receiving f rows. - the formula for 'influence range' of an + * idx accounts for the following rows. 2. optimize for the case when + * numPreceding is Unbounded. In this case only 1 max needs to be tarcked at + * any given time. + */ + static class MaxStreamingFixedWindow extends + GenericUDAFStreamingEvaluator { + + class State extends GenericUDAFStreamingEvaluator.StreamingState { + private final Deque maxChain; + + public State(int numPreceding, int numFollowing, AggregationBuffer buf) { + super(numPreceding, numFollowing, buf); + maxChain = new ArrayDeque(numPreceding + numFollowing + 1); + } + + @Override + public int estimate() { + if (!(wrappedBuf instanceof AbstractAggregationBuffer)) { + return -1; + } + int underlying = ((AbstractAggregationBuffer) wrappedBuf).estimate(); + if (underlying == -1) { + return -1; + } + if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) { + return -1; + } + /* + * sz Estimate = sz needed by underlying AggBuffer + sz for results + sz + * for maxChain + 3 * JavaDataModel.PRIMITIVES1 sz of results = sz of + * underlying * wdwSz sz of maxChain = sz of underlying * wdwSz + */ + + int wdwSz = numPreceding + numFollowing + 1; + return underlying + (underlying * wdwSz) + (underlying * wdwSz) + + (3 * JavaDataModel.PRIMITIVES1); + } + + protected void reset() { + maxChain.clear(); + super.reset(); + } + + } + + public MaxStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, + int numPreceding, int numFollowing) { + super(wrappedEval, numPreceding, numFollowing); + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); + return new State(numPreceding, numFollowing, underlying); + } + + protected ObjectInspector inputOI() { + return ((GenericUDAFMaxEvaluator) wrappedEval).inputOI; + } + + protected ObjectInspector outputOI() { + return ((GenericUDAFMaxEvaluator) wrappedEval).outputOI; + } + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) + throws HiveException { + + State s = (State) agg; + Object o = parameters[0]; + + while (!s.maxChain.isEmpty()) { + if (!removeLast(o, s.maxChain.getLast()[0])) { + break; + } else { + s.maxChain.removeLast(); + } + } + + /* + * add row to chain. except in case of UNB preceding: - only 1 max needs + * to be tracked. - current max will never become out of range. It can + * only be replaced by a larger max. + */ + if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + || s.maxChain.isEmpty()) { + o = o == null ? null : ObjectInspectorUtils.copyToStandardObject(o, + inputOI(), ObjectInspectorCopyOption.JAVA); + s.maxChain.addLast(new Object[] { o, s.numRows }); + } + + if (s.numRows >= (s.numFollowing)) { + s.results.add(s.maxChain.getFirst()[0]); + } + s.numRows++; + + int fIdx = (Integer) s.maxChain.getFirst()[1]; + if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows > fIdx + s.numPreceding + s.numFollowing) { + s.maxChain.removeFirst(); + } + } + + protected boolean removeLast(Object in, Object last) { + return isGreater(in, last); + } + + private boolean isGreater(Object in, Object last) { + if (in == null) { + return false; + } + if (last == null) { + return true; + } + return ObjectInspectorUtils.compare(in, inputOI(), last, outputOI()) > 0; + } + + @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + + State s = (State) agg; + Object[] r = s.maxChain.getFirst(); + + for (int i = 0; i < s.numFollowing; i++) { + s.results.add(r[0]); + s.numRows++; + int fIdx = (Integer) r[1]; + if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows - s.numFollowing + i > fIdx + s.numPreceding + && !s.maxChain.isEmpty()) { + s.maxChain.removeFirst(); + r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : r; + } + } + + return null; + } + + @Override + public int getRowsRemainingAfterTerminate() throws HiveException { + throw new UnsupportedOperationException(); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java index 3dc9900..d931d52 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java @@ -23,7 +23,10 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax.MaxStreamingFixedWindow; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; @@ -120,6 +123,44 @@ public Object terminate(AggregationBuffer agg) throws HiveException { return myagg.o; } + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + BoundaryDef start = wFrmDef.getStart(); + BoundaryDef end = wFrmDef.getEnd(); + return new MinStreamingFixedWindow(this, start.getAmt(), end.getAmt()); + } + + } + + static class MinStreamingFixedWindow extends MaxStreamingFixedWindow { + + public MinStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, + int numPreceding, int numFollowing) { + super(wrappedEval, numPreceding, numFollowing); + } + + protected ObjectInspector inputOI() { + return ((GenericUDAFMinEvaluator) wrappedEval).inputOI; + } + + protected ObjectInspector outputOI() { + return ((GenericUDAFMinEvaluator) wrappedEval).outputOI; + } + + protected boolean removeLast(Object in, Object last) { + return isLess(in, last); + } + + private boolean isLess(Object in, Object last) { + if (in == null) { + return false; + } + if (last == null) { + return true; + } + return ObjectInspectorUtils.compare(in, inputOI(), last, outputOI()) < 0; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java index 775d874..3eea6b2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java @@ -214,6 +214,11 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { return this; } + @Override + public int getRowsRemainingAfterTerminate() + throws HiveException { + return 0; + } } public static int compare(Object[] o1, ObjectInspector[] oi1, Object[] o2, diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java deleted file mode 100644 index f899ae0..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.udf.generic; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; -import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; - -@SuppressWarnings({ "deprecation", "unchecked" }) -public abstract class GenericUDAFStreamingEnhancer extends - GenericUDAFEvaluator implements ISupportStreamingModeForWindowing { - - private final GenericUDAFEvaluator wrappedEval; - private final int numPreceding; - private final int numFollowing; - - public GenericUDAFStreamingEnhancer(GenericUDAFEvaluator wrappedEval, - int numPreceding, int numFollowing) { - this.wrappedEval = wrappedEval; - this.numPreceding = numPreceding; - this.numFollowing = numFollowing; - this.mode = wrappedEval.mode; - } - - class StreamingState extends AbstractAggregationBuffer { - final AggregationBuffer wrappedBuf; - final int numPreceding; - final int numFollowing; - final List results; - final List intermediateVals; - int numRows; - - StreamingState(int numPreceding, int numFollowing, AggregationBuffer buf) { - this.wrappedBuf = buf; - this.numPreceding = numPreceding; - this.numFollowing = numFollowing; - results = new ArrayList(); - intermediateVals = new ArrayList(); - numRows = 0; - } - - @Override - public int estimate() { - if (!(wrappedBuf instanceof AbstractAggregationBuffer)) { - return -1; - } - int underlying = ((AbstractAggregationBuffer) wrappedBuf).estimate(); - if (underlying == -1) { - return -1; - } - if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) { - return -1; - } - /* - * sz Estimate = sz needed by underlying AggBuffer + - * sz for results + - * sz for intermediates + - * 3 * JavaDataModel.PRIMITIVES1 - * sz of results = sz of underlying * wdwSz - * sz of intermediates = sz of underlying * wdwSz - */ - - int wdwSz = numPreceding + numFollowing + 1; - return underlying + - (underlying * wdwSz) + - (underlying * wdwSz) + - (3 * JavaDataModel.PRIMITIVES1); - } - } - - @Override - public ObjectInspector init(Mode m, ObjectInspector[] parameters) - throws HiveException { - throw new HiveException(getClass().getSimpleName() + ": init not supported"); - } - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); - return new StreamingState(numPreceding, numFollowing, underlying); - } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - StreamingState ss = (StreamingState) agg; - wrappedEval.reset(ss.wrappedBuf); - ss.results.clear(); - ss.intermediateVals.clear(); - ss.numRows = 0; - } - - @Override - public void iterate(AggregationBuffer agg, Object[] parameters) - throws HiveException { - StreamingState ss = (StreamingState) agg; - - wrappedEval.iterate(ss.wrappedBuf, parameters); - - if (ss.numRows >= ss.numFollowing) { - ss.results.add(getNextResult(ss)); - } - if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT) { - ss.intermediateVals.add(getCurrentIntermediateResult(ss)); - } - - ss.numRows++; - } - - @Override - public Object terminate(AggregationBuffer agg) throws HiveException { - StreamingState ss = (StreamingState) agg; - Object o = wrappedEval.terminate(ss.wrappedBuf); - - for (int i = 0; i < ss.numFollowing; i++) { - ss.results.add(getNextResult(ss)); - } - return o; - } - - @Override - public Object terminatePartial(AggregationBuffer agg) throws HiveException { - throw new HiveException(getClass().getSimpleName() - + ": terminatePartial not supported"); - } - - @Override - public void merge(AggregationBuffer agg, Object partial) throws HiveException { - throw new HiveException(getClass().getSimpleName() - + ": merge not supported"); - } - - @Override - public Object getNextResult(AggregationBuffer agg) throws HiveException { - StreamingState ss = (StreamingState) agg; - if (!ss.results.isEmpty()) { - T1 res = ss.results.remove(0); - if (res == null) { - return ISupportStreamingModeForWindowing.NULL_RESULT; - } - return res; - } - return null; - } - - protected abstract T1 getNextResult(StreamingState ss) throws HiveException; - - protected abstract T2 getCurrentIntermediateResult(StreamingState ss) - throws HiveException; -} diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java new file mode 100644 index 0000000..0a437e9 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +@SuppressWarnings({ "deprecation", "unchecked" }) +public abstract class GenericUDAFStreamingEvaluator extends + GenericUDAFEvaluator implements ISupportStreamingModeForWindowing { + + protected final GenericUDAFEvaluator wrappedEval; + protected final int numPreceding; + protected final int numFollowing; + + public GenericUDAFStreamingEvaluator(GenericUDAFEvaluator wrappedEval, + int numPreceding, int numFollowing) { + this.wrappedEval = wrappedEval; + this.numPreceding = numPreceding; + this.numFollowing = numFollowing; + this.mode = wrappedEval.mode; + } + + class StreamingState extends AbstractAggregationBuffer { + final AggregationBuffer wrappedBuf; + final int numPreceding; + final int numFollowing; + final List results; + int numRows; + + StreamingState(int numPreceding, int numFollowing, AggregationBuffer buf) { + this.wrappedBuf = buf; + this.numPreceding = numPreceding; + this.numFollowing = numFollowing; + results = new ArrayList(); + numRows = 0; + } + + protected void reset() { + results.clear(); + numRows = 0; + } + } + + @Override + public ObjectInspector init(Mode m, ObjectInspector[] parameters) + throws HiveException { + throw new HiveException(getClass().getSimpleName() + ": init not supported"); + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + StreamingState ss = (StreamingState) agg; + wrappedEval.reset(ss.wrappedBuf); + ss.reset(); + } + + @Override + public Object terminatePartial(AggregationBuffer agg) throws HiveException { + throw new HiveException(getClass().getSimpleName() + + ": terminatePartial not supported"); + } + + @Override + public void merge(AggregationBuffer agg, Object partial) throws HiveException { + throw new HiveException(getClass().getSimpleName() + + ": merge not supported"); + } + + @Override + public Object getNextResult(AggregationBuffer agg) throws HiveException { + StreamingState ss = (StreamingState) agg; + if (!ss.results.isEmpty()) { + T1 res = ss.results.remove(0); + if (res == null) { + return ISupportStreamingModeForWindowing.NULL_RESULT; + } + return res; + } + return null; + } + + public static abstract class SumAvgEnhancer extends + GenericUDAFStreamingEvaluator { + + public SumAvgEnhancer(GenericUDAFEvaluator wrappedEval, int numPreceding, + int numFollowing) { + super(wrappedEval, numPreceding, numFollowing); + } + + class SumAvgStreamingState extends StreamingState { + + final List intermediateVals; + + SumAvgStreamingState(int numPreceding, int numFollowing, + AggregationBuffer buf) { + super(numPreceding, numFollowing, buf); + intermediateVals = new ArrayList(); + } + + @Override + public int estimate() { + if (!(wrappedBuf instanceof AbstractAggregationBuffer)) { + return -1; + } + int underlying = ((AbstractAggregationBuffer) wrappedBuf).estimate(); + if (underlying == -1) { + return -1; + } + if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) { + return -1; + } + /* + * sz Estimate = sz needed by underlying AggBuffer + sz for results + sz + * for intermediates + 3 * JavaDataModel.PRIMITIVES1 sz of results = sz + * of underlying * wdwSz sz of intermediates = sz of underlying * wdwSz + */ + + int wdwSz = numPreceding + numFollowing + 1; + return underlying + (underlying * wdwSz) + (underlying * wdwSz) + + (3 * JavaDataModel.PRIMITIVES1); + } + + protected void reset() { + intermediateVals.clear(); + super.reset(); + } + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); + return new SumAvgStreamingState(numPreceding, numFollowing, underlying); + } + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) + throws HiveException { + SumAvgStreamingState ss = (SumAvgStreamingState) agg; + + wrappedEval.iterate(ss.wrappedBuf, parameters); + + if (ss.numRows >= ss.numFollowing) { + ss.results.add(getNextResult(ss)); + } + if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT) { + ss.intermediateVals.add(getCurrentIntermediateResult(ss)); + } + + ss.numRows++; + } + + @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + SumAvgStreamingState ss = (SumAvgStreamingState) agg; + Object o = wrappedEval.terminate(ss.wrappedBuf); + + for (int i = 0; i < ss.numFollowing; i++) { + ss.results.add(getNextResult(ss)); + } + return o; + } + + @Override + public int getRowsRemainingAfterTerminate() throws HiveException { + throw new UnsupportedOperationException(); + } + + protected abstract T1 getNextResult(SumAvgStreamingState ss) + throws HiveException; + + protected abstract T2 getCurrentIntermediateResult(SumAvgStreamingState ss) + throws HiveException; + + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java index 2b82550..d1118f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java @@ -26,9 +26,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; -import org.apache.hadoop.hive.ql.plan.ptf.ValueBoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; @@ -189,20 +187,12 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { BoundaryDef start = wFrmDef.getStart(); BoundaryDef end = wFrmDef.getEnd(); - if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) { - return null; - } - - if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { - return null; - } - - return new GenericUDAFStreamingEnhancer( + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer( this, start.getAmt(), end.getAmt()) { @Override protected HiveDecimalWritable getNextResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf; HiveDecimal r = myagg.empty ? null : myagg.sum; @@ -218,7 +208,7 @@ protected HiveDecimalWritable getNextResult( @Override protected HiveDecimal getCurrentIntermediateResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf; return myagg.empty ? null : myagg.sum; @@ -313,24 +303,15 @@ public Object terminate(AggregationBuffer agg) throws HiveException { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - BoundaryDef start = wFrmDef.getStart(); BoundaryDef end = wFrmDef.getEnd(); - if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) { - return null; - } - - if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { - return null; - } - - return new GenericUDAFStreamingEnhancer(this, + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer(this, start.getAmt(), end.getAmt()) { @Override protected DoubleWritable getNextResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf; Double r = myagg.empty ? null : myagg.sum; @@ -346,7 +327,7 @@ protected DoubleWritable getNextResult( @Override protected Double getCurrentIntermediateResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf; return myagg.empty ? null : new Double(myagg.sum); @@ -443,20 +424,12 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { BoundaryDef start = wFrmDef.getStart(); BoundaryDef end = wFrmDef.getEnd(); - if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) { - return null; - } - - if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { - return null; - } - - return new GenericUDAFStreamingEnhancer(this, + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer(this, start.getAmt(), end.getAmt()) { @Override protected LongWritable getNextResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf; Long r = myagg.empty ? null : myagg.sum; @@ -472,7 +445,7 @@ protected LongWritable getNextResult( @Override protected Long getCurrentIntermediateResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf; return myagg.empty ? null : new Long(myagg.sum); diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java index cf2035c..327a732 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java @@ -33,6 +33,14 @@ public interface ISupportStreamingModeForWindowing { Object getNextResult(AggregationBuffer agg) throws HiveException; + + /* + * for functions that don't support a Window, this provides the rows remaining to be + * added to output. Functions that return a Window can throw a UnsupportedException, + * this method shouldn't be called. For Ranking fns return 0; lead/lag fns return the + * lead/lag amt. + */ + int getRowsRemainingAfterTerminate() throws HiveException; public static Object NULL_RESULT = new Object(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java index 0a67fea..88482d3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java @@ -28,8 +28,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.PTFPartition; +import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo; import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator; import org.apache.hadoop.hive.ql.exec.PTFRollingPartition; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -142,6 +144,49 @@ private boolean processWindow(WindowFunctionDef wFn) { return true; } + private boolean streamingPossible(Configuration cfg, WindowFunctionDef wFnDef) { + WindowFrameDef wdwFrame = wFnDef.getWindowFrame(); + WindowFunctionInfo wFnInfo = FunctionRegistry.getWindowFunctionInfo(wFnDef + .getName()); + + if (!wFnInfo.isSupportsWindow()) { + return true; + } + + BoundaryDef start = wdwFrame.getStart(); + BoundaryDef end = wdwFrame.getEnd(); + + /* + * Currently we are not handling dynamic sized windows implied by range + * based windows. + */ + if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) { + return false; + } + + /* + * Windows that are unbounded following don't benefit from Streaming. + */ + if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { + return false; + } + + /* + * let function decide if it can handle this special case. + */ + if (start.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { + return true; + } + + int windowLimit = HiveConf.getIntVar(cfg, ConfVars.HIVEJOINCACHESIZE); + + if (windowLimit < (start.getAmt() + end.getAmt() + 1)) { + return false; + } + + return true; + } + /* * (non-Javadoc) * @@ -155,6 +200,7 @@ private boolean processWindow(WindowFunctionDef wFn) { * ISupportStreamingModeForWindowing. 3. Is an invocation on a 'fixed' window. * So no Unbounded Preceding or Following. */ + @SuppressWarnings("resource") private int[] setCanAcceptInputAsStream(Configuration cfg) { canAcceptInputAsStream = false; @@ -171,8 +217,9 @@ private boolean processWindow(WindowFunctionDef wFn) { WindowFunctionDef wFnDef = tabDef.getWindowFunctions().get(i); WindowFrameDef wdwFrame = wFnDef.getWindowFrame(); GenericUDAFEvaluator fnEval = wFnDef.getWFnEval(); - GenericUDAFEvaluator streamingEval = fnEval - .getWindowingEvaluator(wdwFrame); + boolean streamingPossible = streamingPossible(cfg, wFnDef); + GenericUDAFEvaluator streamingEval = streamingPossible ? fnEval + .getWindowingEvaluator(wdwFrame) : null; if (streamingEval != null && streamingEval instanceof ISupportStreamingModeForWindowing) { continue; @@ -343,6 +390,14 @@ public void startPartition() throws HiveException { int numRowsRemaining = wFn.getWindowFrame().getEnd().getAmt(); if (fnEval instanceof ISupportStreamingModeForWindowing) { fnEval.terminate(streamingState.aggBuffers[i]); + + WindowFunctionInfo wFnInfo = FunctionRegistry.getWindowFunctionInfo(wFn + .getName()); + if (!wFnInfo.isSupportsWindow()) { + numRowsRemaining = ((ISupportStreamingModeForWindowing) fnEval) + .getRowsRemainingAfterTerminate(); + } + if (numRowsRemaining != BoundarySpec.UNBOUNDED_AMOUNT) { while (numRowsRemaining > 0) { Object out = ((ISupportStreamingModeForWindowing) fnEval) @@ -411,13 +466,20 @@ public boolean canIterateOutput() { } else if (wFn.isPivotResult()) { GenericUDAFEvaluator streamingEval = wFn.getWFnEval().getWindowingEvaluator(wFn.getWindowFrame()); if ( streamingEval != null && streamingEval instanceof ISupportStreamingModeForWindowing ) { - wFn.setWFnEval(streamingEval); - if ( wFn.getOI() instanceof ListObjectInspector ) { - ListObjectInspector listOI = (ListObjectInspector) wFn.getOI(); - wFn.setOI(listOI.getListElementObjectInspector()); + ISupportStreamingModeForWindowing strEval = (ISupportStreamingModeForWindowing) streamingEval; + if ( strEval.getRowsRemainingAfterTerminate() == 0 ) { + wFn.setWFnEval(streamingEval); + if ( wFn.getOI() instanceof ListObjectInspector ) { + ListObjectInspector listOI = (ListObjectInspector) wFn.getOI(); + wFn.setOI(listOI.getListElementObjectInspector()); + } + output.add(null); + wFnsWithWindows.add(i); + } else { + outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, + pItr); + output.add(null); } - output.add(null); - wFnsWithWindows.add(i); } else { outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr); output.add(null); diff --git ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMax.java ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMax.java new file mode 100644 index 0000000..635c285 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMax.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udaf; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.udaf.TestStreamingSum.TypeHandler; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.LongWritable; +import org.junit.Test; + +public class TestStreamingMax { + + public void maxLong(Iterator inVals, int inSz, int numPreceding, + int numFollowing, Iterator outVals) throws HiveException { + + GenericUDAFMax fnR = new GenericUDAFMax(); + TypeInfo[] inputTypes = { TypeInfoFactory.longTypeInfo }; + ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableLongObjectInspector }; + + LongWritable[] in = new LongWritable[1]; + in[0] = new LongWritable(); + + TestStreamingSum._agg(fnR, inputTypes, inVals, TypeHandler.LongHandler, in, + inputOIs, inSz, numPreceding, numFollowing, outVals); + + } + + @Test + public void testLong_3_4() throws HiveException { + + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(5L, 6L, 7L, 8L, 9L, 10L, 10L, 10L, 10L, + 10L); + maxLong(inVals.iterator(), 10, 3, 4, outVals.iterator()); + } + + @Test + public void testLongr_3_4() throws HiveException { + + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays.asList(10L, 10L, 10L, 10L, 9L, 8L, 7L, 6L, 5L, + 4L); + maxLong(inVals.iterator(), 10, 3, 4, outVals.iterator()); + } + + @Test + public void testLongr_1_4() throws HiveException { + + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays + .asList(10L, 10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L); + maxLong(inVals.iterator(), 10, 1, 4, outVals.iterator()); + } + + @Test + public void testLong_3_0() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + maxLong(inVals.iterator(), 10, 3, 0, outVals.iterator()); + } + + @Test + public void testLong_0_5() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(6L, 7L, 8L, 9L, 10L, 10L, 10L, 10L, 10L, + 10L); + maxLong(inVals.iterator(), 10, 0, 5, outVals.iterator()); + } + + @Test + public void testLong_7_2() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 10L, + 10L); + maxLong(inVals.iterator(), 10, 7, 2, outVals.iterator()); + } + + @Test + public void testLongr_7_2() throws HiveException { + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays.asList(10L, 10L, 10L, 10L, 10L, 10L, 10L, 10L, + 9L, 8L); + maxLong(inVals.iterator(), 10, 7, 2, outVals.iterator()); + } + + @Test + public void testLong_15_15() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(10L, 10L, 10L, 10L, 10L, 10L, 10L, 10L, + 10L, 10L); + maxLong(inVals.iterator(), 10, 15, 15, outVals.iterator()); + } + + @Test + public void testLong_unb_0() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + maxLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 0, + outVals.iterator()); + } + + @Test + public void testLong_unb_5() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(6L, 7L, 8L, 9L, 10L, 10L, 10L, 10L, 10L, + 10L); + maxLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 5, + outVals.iterator()); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMin.java ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMin.java new file mode 100644 index 0000000..bc30022 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMin.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udaf; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.udaf.TestStreamingSum.TypeHandler; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.LongWritable; +import org.junit.Test; + +public class TestStreamingMin { + + public void minLong(Iterator inVals, int inSz, int numPreceding, + int numFollowing, Iterator outVals) throws HiveException { + + GenericUDAFMin fnR = new GenericUDAFMin(); + TypeInfo[] inputTypes = { TypeInfoFactory.longTypeInfo }; + ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableLongObjectInspector }; + + LongWritable[] in = new LongWritable[1]; + in[0] = new LongWritable(); + + TestStreamingSum._agg(fnR, inputTypes, inVals, TypeHandler.LongHandler, in, + inputOIs, inSz, numPreceding, numFollowing, outVals); + + } + + @Test + public void testLong_3_4() throws HiveException { + + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 1L, 1L, 1L, 2L, 3L, 4L, 5L, 6L, 7L); + minLong(inVals.iterator(), 10, 3, 4, outVals.iterator()); + } + + @Test + public void testLongr_3_4() throws HiveException { + + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays.asList(6L, 5L, 4L, 3L, 2L, 1L, 1L, 1L, 1L, 1L); + minLong(inVals.iterator(), 10, 3, 4, outVals.iterator()); + } + + @Test + public void testLongr_1_4() throws HiveException { + + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays.asList(6L, 5L, 4L, 3L, 2L, 1L, 1L, 1L, 1L, 1L); + minLong(inVals.iterator(), 10, 1, 4, outVals.iterator()); + } + + @Test + public void testLong_3_0() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 1L, 1L, 1L, 2L, 3L, 4L, 5L, 6L, 7L); + minLong(inVals.iterator(), 10, 3, 0, outVals.iterator()); + } + + @Test + public void testLong_0_5() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + minLong(inVals.iterator(), 10, 0, 5, outVals.iterator()); + } + + @Test + public void testLong_7_2() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 2L, 3L); + minLong(inVals.iterator(), 10, 7, 2, outVals.iterator()); + } + + @Test + public void testLongr_7_2() throws HiveException { + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays.asList(8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L, 1L, 1L); + minLong(inVals.iterator(), 10, 7, 2, outVals.iterator()); + } + + @Test + public void testLong_15_15() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L); + minLong(inVals.iterator(), 10, 15, 15, outVals.iterator()); + } + + @Test + public void testLong_unb_0() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L); + minLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 0, + outVals.iterator()); + } + + @Test + public void testLongr_unb_5() throws HiveException { + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays.asList(5L, 4L, 3L, 2L, 1L, 1L, 1L, 1L, 1L, 1L); + minLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 5, + outVals.iterator()); + } + +} diff --git ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java index ef6a4b5..a331e66 100644 --- ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java +++ ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java @@ -174,8 +174,14 @@ public HiveDecimal get(HiveDecimalWritable iw) { fn.aggregate(agg, in); Object out = oS.getNextResult(agg); if (out != null) { - out = out == ISupportStreamingModeForWindowing.NULL_RESULT ? null - : typeHandler.get((TW) out); + if ( out == ISupportStreamingModeForWindowing.NULL_RESULT ) { + out = null; + } else { + try { + out = typeHandler.get((TW) out); + } catch(ClassCastException ce) { + } + } Assert.assertEquals(out, outVals.next()); outSz++; } @@ -185,8 +191,14 @@ public HiveDecimal get(HiveDecimalWritable iw) { while (outSz < inSz) { Object out = oS.getNextResult(agg); - out = out == ISupportStreamingModeForWindowing.NULL_RESULT ? null - : typeHandler.get((TW) out); + if ( out == ISupportStreamingModeForWindowing.NULL_RESULT ) { + out = null; + } else { + try { + out = typeHandler.get((TW) out); + } catch(ClassCastException ce) { + } + } Assert.assertEquals(out, outVals.next()); outSz++; }