diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java index ab3e0bf..bce239f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java @@ -178,12 +178,12 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { return null; } - return new GenericUDAFStreamingEnhancer(this, + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer(this, start.getAmt(), end.getAmt()) { @Override protected DoubleWritable getNextResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; Double r = myagg.count == 0 ? null : myagg.sum; @@ -201,7 +201,7 @@ protected DoubleWritable getNextResult( @Override protected Object[] getCurrentIntermediateResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; return myagg.count == 0 ? null : new Object[] { @@ -314,12 +314,12 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { return null; } - return new GenericUDAFStreamingEnhancer( + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer( this, start.getAmt(), end.getAmt()) { @Override protected HiveDecimalWritable getNextResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; HiveDecimal r = myagg.count == 0 ? null : myagg.sum; @@ -338,7 +338,7 @@ protected HiveDecimalWritable getNextResult( @Override protected Object[] getCurrentIntermediateResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; return myagg.count == 0 ? null : new Object[] { myagg.sum, diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java index d6e9db4..557e75d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java @@ -17,19 +17,35 @@ */ package org.apache.hadoop.hive.ql.udf.generic; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.ValueBoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AbstractAggregationBuffer; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import com.google.common.collect.Queues; + @Description(name = "max", value = "_FUNC_(expr) - Returns the maximum value of expr") public class GenericUDAFMax extends AbstractGenericUDAFResolver { @@ -120,6 +136,184 @@ public Object terminate(AggregationBuffer agg) throws HiveException { return myagg.o; } + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + + BoundaryDef start = wFrmDef.getStart(); + BoundaryDef end = wFrmDef.getEnd(); + + if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) { + return null; + } + + if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { + return null; + } + + return new MaxStreamingFixedWindow(this, start.getAmt(), end.getAmt()); + } + + } + + /* + * Based on the Paper by Daniel Lemire: + * Streaming Max-Min filter using no more than 3 comparisons per elem. + * + * 1. His algorithm works on fixed size windows up to the current row. For row + * 'i' and window 'w' it computes the min/max for window (i-w, i). + * 2. The core idea is to keep a queue of (max, idx) tuples. A tuple in the queue + * represents the max value in the range (prev tuple.idx, idx). Using the queue data + * structure and following 2 operations it is easy to see that maxes can be computed: + * - on receiving the ith row; drain the queue from the back of any entries whose value + * is less than the ith entry; add the ith value as a tuple in the queue (i-val, i) + * - on the ith step, check if the element at the front of the queue has reached its max + * range of influence; i.e. frontTuple.idx + w > i. If yes we can remove it from the queue. + * - on the ith step o/p the front of the queue as the max for the ith entry. + * + * Here we modify the algorithm: + * 1. to handle window's that are of the form (i-p, i+f), where p is numPreceding,f = numFollowing + * - we start outputing rows only after receiving f rows. + * - the formula for 'influence range' of an idx accounts for the following rows. + * 2. optimize for the case when numPreceding is Unbounded. In this case only 1 max needs to be + * tarcked at any given time. + */ + static class MaxStreamingFixedWindow extends + GenericUDAFStreamingEvaluator { + + class State extends GenericUDAFStreamingEvaluator.StreamingState { + private final Deque maxChain; + + public State(int numPreceding, int numFollowing, AggregationBuffer buf) { + super(numPreceding, numFollowing, buf); + maxChain = new ArrayDeque(numPreceding + numFollowing + 1); + } + + @Override + public int estimate() { + if (!(wrappedBuf instanceof AbstractAggregationBuffer)) { + return -1; + } + int underlying = ((AbstractAggregationBuffer) wrappedBuf).estimate(); + if (underlying == -1) { + return -1; + } + if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) { + return -1; + } + /* + * sz Estimate = sz needed by underlying AggBuffer + + * sz for results + + * sz for maxChain + + * 3 * JavaDataModel.PRIMITIVES1 + * sz of results = sz of underlying * wdwSz + * sz of maxChain = sz of underlying * wdwSz + */ + + int wdwSz = numPreceding + numFollowing + 1; + return underlying + (underlying * wdwSz) + (underlying * wdwSz) + + (3 * JavaDataModel.PRIMITIVES1); + } + + protected void reset() { + maxChain.clear(); + super.reset(); + } + + } + + public MaxStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, + int numPreceding, int numFollowing) { + super(wrappedEval, numPreceding, numFollowing); + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); + return new State(numPreceding, numFollowing, underlying); + } + + protected ObjectInspector inputOI() { + return ((GenericUDAFMaxEvaluator) wrappedEval).inputOI; + } + + protected ObjectInspector outputOI() { + return ((GenericUDAFMaxEvaluator) wrappedEval).outputOI; + } + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) + throws HiveException { + + State s = (State) agg; + Object o = parameters[0]; + + while (!s.maxChain.isEmpty()) { + if (!removeLast(o, s.maxChain.getLast()[0])) { + break; + } else { + s.maxChain.removeLast(); + } + } + + /* + * add row to chain. except in case of UNB preceding: - only 1 max needs + * to be tracked. - current max will never become out of range. It can + * only be replaced by a larger max. + */ + if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + || s.maxChain.isEmpty()) { + o = o == null ? null : ObjectInspectorUtils.copyToStandardObject(o, + inputOI(), ObjectInspectorCopyOption.JAVA); + s.maxChain.addLast(new Object[] { o, s.numRows }); + } + + if (s.numRows >= (s.numFollowing)) { + s.results.add(s.maxChain.getFirst()[0]); + } + s.numRows++; + + int fIdx = (Integer) s.maxChain.getFirst()[1]; + if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows > fIdx + s.numPreceding + s.numFollowing) { + s.maxChain.removeFirst(); + } + } + + protected boolean removeLast(Object in, Object last) { + return isGreater(in, last); + } + + private boolean isGreater(Object in, Object last) { + if (in == null) { + return false; + } + if (last == null) { + return true; + } + return ObjectInspectorUtils.compare(in, inputOI(), last, outputOI()) > 0; + } + + @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + + State s = (State) agg; + Object[] r = s.maxChain.getFirst(); + + for (int i = 0; i < s.numFollowing; i++) { + s.results.add(r[0]); + s.numRows++; + int fIdx = (Integer) r[1]; + if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows - s.numFollowing + i > fIdx + s.numPreceding + && !s.maxChain.isEmpty()) { + s.maxChain.removeFirst(); + r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : r; + } + } + + return null; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java index 3dc9900..990a714 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java @@ -23,7 +23,12 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.ValueBoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax.MaxStreamingFixedWindow; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; @@ -120,6 +125,54 @@ public Object terminate(AggregationBuffer agg) throws HiveException { return myagg.o; } + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + + BoundaryDef start = wFrmDef.getStart(); + BoundaryDef end = wFrmDef.getEnd(); + + if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) { + return null; + } + + if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { + return null; + } + + return new MinStreamingFixedWindow(this, start.getAmt(), end.getAmt()); + } + + } + + static class MinStreamingFixedWindow extends MaxStreamingFixedWindow { + + public MinStreamingFixedWindow(GenericUDAFEvaluator wrappedEval, + int numPreceding, int numFollowing) { + super(wrappedEval, numPreceding, numFollowing); + } + + protected ObjectInspector inputOI() { + return ((GenericUDAFMinEvaluator) wrappedEval).inputOI; + } + + protected ObjectInspector outputOI() { + return ((GenericUDAFMinEvaluator) wrappedEval).outputOI; + } + + protected boolean removeLast(Object in, Object last) { + return isLess(in, last); + } + + private boolean isLess(Object in, Object last) { + if (in == null) { + return false; + } + if (last == null) { + return true; + } + return ObjectInspectorUtils.compare(in, inputOI(), last, outputOI()) < 0; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java deleted file mode 100644 index f899ae0..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.udf.generic; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; -import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; - -@SuppressWarnings({ "deprecation", "unchecked" }) -public abstract class GenericUDAFStreamingEnhancer extends - GenericUDAFEvaluator implements ISupportStreamingModeForWindowing { - - private final GenericUDAFEvaluator wrappedEval; - private final int numPreceding; - private final int numFollowing; - - public GenericUDAFStreamingEnhancer(GenericUDAFEvaluator wrappedEval, - int numPreceding, int numFollowing) { - this.wrappedEval = wrappedEval; - this.numPreceding = numPreceding; - this.numFollowing = numFollowing; - this.mode = wrappedEval.mode; - } - - class StreamingState extends AbstractAggregationBuffer { - final AggregationBuffer wrappedBuf; - final int numPreceding; - final int numFollowing; - final List results; - final List intermediateVals; - int numRows; - - StreamingState(int numPreceding, int numFollowing, AggregationBuffer buf) { - this.wrappedBuf = buf; - this.numPreceding = numPreceding; - this.numFollowing = numFollowing; - results = new ArrayList(); - intermediateVals = new ArrayList(); - numRows = 0; - } - - @Override - public int estimate() { - if (!(wrappedBuf instanceof AbstractAggregationBuffer)) { - return -1; - } - int underlying = ((AbstractAggregationBuffer) wrappedBuf).estimate(); - if (underlying == -1) { - return -1; - } - if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) { - return -1; - } - /* - * sz Estimate = sz needed by underlying AggBuffer + - * sz for results + - * sz for intermediates + - * 3 * JavaDataModel.PRIMITIVES1 - * sz of results = sz of underlying * wdwSz - * sz of intermediates = sz of underlying * wdwSz - */ - - int wdwSz = numPreceding + numFollowing + 1; - return underlying + - (underlying * wdwSz) + - (underlying * wdwSz) + - (3 * JavaDataModel.PRIMITIVES1); - } - } - - @Override - public ObjectInspector init(Mode m, ObjectInspector[] parameters) - throws HiveException { - throw new HiveException(getClass().getSimpleName() + ": init not supported"); - } - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); - return new StreamingState(numPreceding, numFollowing, underlying); - } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - StreamingState ss = (StreamingState) agg; - wrappedEval.reset(ss.wrappedBuf); - ss.results.clear(); - ss.intermediateVals.clear(); - ss.numRows = 0; - } - - @Override - public void iterate(AggregationBuffer agg, Object[] parameters) - throws HiveException { - StreamingState ss = (StreamingState) agg; - - wrappedEval.iterate(ss.wrappedBuf, parameters); - - if (ss.numRows >= ss.numFollowing) { - ss.results.add(getNextResult(ss)); - } - if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT) { - ss.intermediateVals.add(getCurrentIntermediateResult(ss)); - } - - ss.numRows++; - } - - @Override - public Object terminate(AggregationBuffer agg) throws HiveException { - StreamingState ss = (StreamingState) agg; - Object o = wrappedEval.terminate(ss.wrappedBuf); - - for (int i = 0; i < ss.numFollowing; i++) { - ss.results.add(getNextResult(ss)); - } - return o; - } - - @Override - public Object terminatePartial(AggregationBuffer agg) throws HiveException { - throw new HiveException(getClass().getSimpleName() - + ": terminatePartial not supported"); - } - - @Override - public void merge(AggregationBuffer agg, Object partial) throws HiveException { - throw new HiveException(getClass().getSimpleName() - + ": merge not supported"); - } - - @Override - public Object getNextResult(AggregationBuffer agg) throws HiveException { - StreamingState ss = (StreamingState) agg; - if (!ss.results.isEmpty()) { - T1 res = ss.results.remove(0); - if (res == null) { - return ISupportStreamingModeForWindowing.NULL_RESULT; - } - return res; - } - return null; - } - - protected abstract T1 getNextResult(StreamingState ss) throws HiveException; - - protected abstract T2 getCurrentIntermediateResult(StreamingState ss) - throws HiveException; -} diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java new file mode 100644 index 0000000..25c8915 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +@SuppressWarnings({ "deprecation", "unchecked" }) +public abstract class GenericUDAFStreamingEvaluator extends + GenericUDAFEvaluator implements ISupportStreamingModeForWindowing { + + protected final GenericUDAFEvaluator wrappedEval; + protected final int numPreceding; + protected final int numFollowing; + + public GenericUDAFStreamingEvaluator(GenericUDAFEvaluator wrappedEval, + int numPreceding, int numFollowing) { + this.wrappedEval = wrappedEval; + this.numPreceding = numPreceding; + this.numFollowing = numFollowing; + this.mode = wrappedEval.mode; + } + + class StreamingState extends AbstractAggregationBuffer { + final AggregationBuffer wrappedBuf; + final int numPreceding; + final int numFollowing; + final List results; + int numRows; + + StreamingState(int numPreceding, int numFollowing, AggregationBuffer buf) { + this.wrappedBuf = buf; + this.numPreceding = numPreceding; + this.numFollowing = numFollowing; + results = new ArrayList(); + numRows = 0; + } + + protected void reset() { + results.clear(); + numRows = 0; + } + } + + @Override + public ObjectInspector init(Mode m, ObjectInspector[] parameters) + throws HiveException { + throw new HiveException(getClass().getSimpleName() + ": init not supported"); + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + StreamingState ss = (StreamingState) agg; + wrappedEval.reset(ss.wrappedBuf); + ss.reset(); + } + + @Override + public Object terminatePartial(AggregationBuffer agg) throws HiveException { + throw new HiveException(getClass().getSimpleName() + + ": terminatePartial not supported"); + } + + @Override + public void merge(AggregationBuffer agg, Object partial) throws HiveException { + throw new HiveException(getClass().getSimpleName() + + ": merge not supported"); + } + + @Override + public Object getNextResult(AggregationBuffer agg) throws HiveException { + StreamingState ss = (StreamingState) agg; + if (!ss.results.isEmpty()) { + T1 res = ss.results.remove(0); + if (res == null) { + return ISupportStreamingModeForWindowing.NULL_RESULT; + } + return res; + } + return null; + } + + public static abstract class SumAvgEnhancer extends GenericUDAFStreamingEvaluator { + + public SumAvgEnhancer(GenericUDAFEvaluator wrappedEval, int numPreceding, + int numFollowing) { + super(wrappedEval, numPreceding, numFollowing); + } + + class SumAvgStreamingState extends StreamingState { + + final List intermediateVals; + + SumAvgStreamingState(int numPreceding, int numFollowing, + AggregationBuffer buf) { + super(numPreceding, numFollowing, buf); + intermediateVals = new ArrayList(); + } + + @Override + public int estimate() { + if (!(wrappedBuf instanceof AbstractAggregationBuffer)) { + return -1; + } + int underlying = ((AbstractAggregationBuffer) wrappedBuf).estimate(); + if (underlying == -1) { + return -1; + } + if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) { + return -1; + } + /* + * sz Estimate = sz needed by underlying AggBuffer + + * sz for results + + * sz for intermediates + + * 3 * JavaDataModel.PRIMITIVES1 + * sz of results = sz of underlying * wdwSz + * sz of intermediates = sz of underlying * wdwSz + */ + + int wdwSz = numPreceding + numFollowing + 1; + return underlying + + (underlying * wdwSz) + + (underlying * wdwSz) + + (3 * JavaDataModel.PRIMITIVES1); + } + + protected void reset() { + intermediateVals.clear(); + super.reset(); + } + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); + return new SumAvgStreamingState(numPreceding, numFollowing, underlying); + } + + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) + throws HiveException { + SumAvgStreamingState ss = (SumAvgStreamingState) agg; + + wrappedEval.iterate(ss.wrappedBuf, parameters); + + if (ss.numRows >= ss.numFollowing) { + ss.results.add(getNextResult(ss)); + } + if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT) { + ss.intermediateVals.add(getCurrentIntermediateResult(ss)); + } + + ss.numRows++; + } + + @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + SumAvgStreamingState ss = (SumAvgStreamingState) agg; + Object o = wrappedEval.terminate(ss.wrappedBuf); + + for (int i = 0; i < ss.numFollowing; i++) { + ss.results.add(getNextResult(ss)); + } + return o; + } + + protected abstract T1 getNextResult(SumAvgStreamingState ss) throws HiveException; + + protected abstract T2 getCurrentIntermediateResult(SumAvgStreamingState ss) + throws HiveException; + + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java index 2b82550..a00928a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java @@ -197,12 +197,12 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { return null; } - return new GenericUDAFStreamingEnhancer( + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer( this, start.getAmt(), end.getAmt()) { @Override protected HiveDecimalWritable getNextResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf; HiveDecimal r = myagg.empty ? null : myagg.sum; @@ -218,7 +218,7 @@ protected HiveDecimalWritable getNextResult( @Override protected HiveDecimal getCurrentIntermediateResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf; return myagg.empty ? null : myagg.sum; @@ -325,12 +325,12 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { return null; } - return new GenericUDAFStreamingEnhancer(this, + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer(this, start.getAmt(), end.getAmt()) { @Override protected DoubleWritable getNextResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf; Double r = myagg.empty ? null : myagg.sum; @@ -346,7 +346,7 @@ protected DoubleWritable getNextResult( @Override protected Double getCurrentIntermediateResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf; return myagg.empty ? null : new Double(myagg.sum); @@ -451,12 +451,12 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { return null; } - return new GenericUDAFStreamingEnhancer(this, + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer(this, start.getAmt(), end.getAmt()) { @Override protected LongWritable getNextResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf; Long r = myagg.empty ? null : myagg.sum; @@ -472,7 +472,7 @@ protected LongWritable getNextResult( @Override protected Long getCurrentIntermediateResult( - org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer.SumAvgStreamingState ss) throws HiveException { SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf; return myagg.empty ? null : new Long(myagg.sum); diff --git ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMax.java ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMax.java new file mode 100644 index 0000000..b969285 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMax.java @@ -0,0 +1,119 @@ +package org.apache.hadoop.hive.ql.udaf; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.udaf.TestStreamingSum.TypeHandler; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.LongWritable; +import org.junit.Test; + +public class TestStreamingMax { + + public void maxLong(Iterator inVals, int inSz, int numPreceding, + int numFollowing, Iterator outVals) throws HiveException { + + GenericUDAFMax fnR = new GenericUDAFMax(); + TypeInfo[] inputTypes = { TypeInfoFactory.longTypeInfo }; + ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableLongObjectInspector }; + + LongWritable[] in = new LongWritable[1]; + in[0] = new LongWritable(); + + TestStreamingSum._agg(fnR, inputTypes, inVals, TypeHandler.LongHandler, in, inputOIs, inSz, + numPreceding, numFollowing, outVals); + + } + + @Test + public void testLong_3_4() throws HiveException { + + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(5L, 6L, 7L, 8L, 9L, 10L, 10L, 10L, + 10L, 10L); + maxLong(inVals.iterator(), 10, 3, 4, outVals.iterator()); + } + + @Test + public void testLongr_3_4() throws HiveException { + + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays.asList(10L, 10L, 10L, 10L, 9L, 8L, 7L, 6L, + 5L, 4L); + maxLong(inVals.iterator(), 10, 3, 4, outVals.iterator()); + } + + @Test + public void testLongr_1_4() throws HiveException { + + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays.asList(10L, 10L, 9L, 8L, 7L, 6L, 5L, 4L, + 3L, 2L); + maxLong(inVals.iterator(), 10, 1, 4, outVals.iterator()); + } + + @Test + public void testLong_3_0() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, + 9L, 10L); + maxLong(inVals.iterator(), 10, 3, 0, outVals.iterator()); + } + + @Test + public void testLong_0_5() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(6L, 7L, 8L, 9L, 10L, 10L, 10L, 10L, + 10L, 10L); + maxLong(inVals.iterator(), 10, 0, 5, outVals.iterator()); + } + + @Test + public void testLong_7_2() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, + 10L, 10L); + maxLong(inVals.iterator(), 10, 7, 2, outVals.iterator()); + } + + @Test + public void testLongr_7_2() throws HiveException { + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays.asList(10L, 10L, 10L, 10L, 10L, 10L, 10L, 10L, + 9L, 8L); + maxLong(inVals.iterator(), 10, 7, 2, outVals.iterator()); + } + + @Test + public void testLong_15_15() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(10L, 10L, 10L, 10L, 10L, 10L, 10L, 10L, + 10L, 10L); + maxLong(inVals.iterator(), 10, 15, 15, outVals.iterator()); + } + + @Test + public void testLong_unb_0() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, + 9L, 10L); + maxLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 0, + outVals.iterator()); + } + + @Test + public void testLong_unb_5() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(6L, 7L, 8L, 9L, 10L, 10L, 10L, 10L, + 10L, 10L); + maxLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 5, + outVals.iterator()); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMin.java ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMin.java new file mode 100644 index 0000000..8182ece --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMin.java @@ -0,0 +1,121 @@ +package org.apache.hadoop.hive.ql.udaf; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.udaf.TestStreamingSum.TypeHandler; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.LongWritable; +import org.junit.Test; + +public class TestStreamingMin { + + public void minLong(Iterator inVals, int inSz, int numPreceding, + int numFollowing, Iterator outVals) throws HiveException { + + GenericUDAFMin fnR = new GenericUDAFMin(); + TypeInfo[] inputTypes = { TypeInfoFactory.longTypeInfo }; + ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableLongObjectInspector }; + + LongWritable[] in = new LongWritable[1]; + in[0] = new LongWritable(); + + TestStreamingSum._agg(fnR, inputTypes, inVals, TypeHandler.LongHandler, in, inputOIs, inSz, + numPreceding, numFollowing, outVals); + + } + + @Test + public void testLong_3_4() throws HiveException { + + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 1L, 1L, 1L, 2L, 3L, 4L, 5L, + 6L, 7L); + minLong(inVals.iterator(), 10, 3, 4, outVals.iterator()); + } + + @Test + public void testLongr_3_4() throws HiveException { + + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays.asList(6L, 5L, 4L, 3L, 2L, 1L, 1L, 1L, + 1L, 1L); + minLong(inVals.iterator(), 10, 3, 4, outVals.iterator()); + } + + @Test + public void testLongr_1_4() throws HiveException { + + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays.asList(6L, 5L, 4L, 3L, 2L, 1L, 1L, 1L, + 1L, 1L); + minLong(inVals.iterator(), 10, 1, 4, outVals.iterator()); + } + + @Test + public void testLong_3_0() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 1L, 1L, 1L, 2L, 3L, 4L, 5L, + 6L, 7L); + minLong(inVals.iterator(), 10, 3, 0, outVals.iterator()); + } + + @Test + public void testLong_0_5() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, + 9L, 10L); + minLong(inVals.iterator(), 10, 0, 5, outVals.iterator()); + } + + @Test + public void testLong_7_2() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, + 2L, 3L); + minLong(inVals.iterator(), 10, 7, 2, outVals.iterator()); + } + + @Test + public void testLongr_7_2() throws HiveException { + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays.asList(8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L, + 1L, 1L); + minLong(inVals.iterator(), 10, 7, 2, outVals.iterator()); + } + + @Test + public void testLong_15_15() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, + 1L, 1L); + minLong(inVals.iterator(), 10, 15, 15, outVals.iterator()); + } + + @Test + public void testLong_unb_0() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, + 1L, 1L); + minLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 0, + outVals.iterator()); + } + + @Test + public void testLongr_unb_5() throws HiveException { + List inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L); + List outVals = Arrays.asList(5L, 4L, 3L, 2L, 1L, 1L, 1L, 1L, + 1L, 1L); + minLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 5, + outVals.iterator()); + } + +} diff --git ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java index ef6a4b5..a331e66 100644 --- ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java +++ ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java @@ -174,8 +174,14 @@ public HiveDecimal get(HiveDecimalWritable iw) { fn.aggregate(agg, in); Object out = oS.getNextResult(agg); if (out != null) { - out = out == ISupportStreamingModeForWindowing.NULL_RESULT ? null - : typeHandler.get((TW) out); + if ( out == ISupportStreamingModeForWindowing.NULL_RESULT ) { + out = null; + } else { + try { + out = typeHandler.get((TW) out); + } catch(ClassCastException ce) { + } + } Assert.assertEquals(out, outVals.next()); outSz++; } @@ -185,8 +191,14 @@ public HiveDecimal get(HiveDecimalWritable iw) { while (outSz < inSz) { Object out = oS.getNextResult(agg); - out = out == ISupportStreamingModeForWindowing.NULL_RESULT ? null - : typeHandler.get((TW) out); + if ( out == ISupportStreamingModeForWindowing.NULL_RESULT ) { + out = null; + } else { + try { + out = typeHandler.get((TW) out); + } catch(ClassCastException ce) { + } + } Assert.assertEquals(out, outVals.next()); outSz++; }