diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java index d3800c2..e917cdf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java @@ -83,6 +83,7 @@ protected void initializeOp(Configuration jobConf) throws HiveException { setupKeysWrapper(inputObjInspectors[0]); ptfInvocation = setupChain(); + ptfInvocation.initializeStreaming(jobConf, isMapOperator); firstMapRow = true; super.initializeOp(jobConf); @@ -282,6 +283,19 @@ boolean isStreaming() { return tabFn.canAcceptInputAsStream(); } + void initializeStreaming(Configuration cfg, boolean isMapSide) throws HiveException { + PartitionedTableFunctionDef tabDef = tabFn.getTableDef(); + PTFInputDef inputDef = tabDef.getInput(); + ObjectInspector inputOI = conf.getStartOfChain() == tabDef ? + inputObjInspectors[0] : inputDef.getOutputShape().getOI(); + + tabFn.initializeStreaming(cfg, (StructObjectInspector) inputOI, isMapSide); + + if ( next != null ) { + next.initializeStreaming(cfg, isMapSide); + } + } + void startPartition() throws HiveException { if ( isStreaming() ) { tabFn.startPartition(); @@ -301,15 +315,6 @@ void startPartition() throws HiveException { void processRow(Object row) throws HiveException { if ( isStreaming() ) { - if ( prev == null ) { - /* - * this is needed because during Translation we are still assuming that rows - * are collected into a PTFPartition. - * @Todo make translation handle the case when the first PTF is Streaming. - */ - row = ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[0], - ObjectInspectorCopyOption.WRITABLE); - } handleOutputRows(tabFn.processRow(row)); } else { inputPart.append(row); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java index b5adb11..21d85f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java @@ -51,13 +51,25 @@ protected PTFPartition(Configuration cfg, SerDe serDe, StructObjectInspector inputOI, StructObjectInspector outputOI) throws HiveException { + this(cfg, serDe, inputOI, outputOI, true); + } + + protected PTFPartition(Configuration cfg, + SerDe serDe, StructObjectInspector inputOI, + StructObjectInspector outputOI, + boolean createElemContainer) + throws HiveException { this.serDe = serDe; this.inputOI = inputOI; this.outputOI = outputOI; - int containerNumRows = HiveConf.getIntVar(cfg, ConfVars.HIVEJOINCACHESIZE); - elems = new PTFRowContainer>(containerNumRows, cfg, null); - elems.setSerDe(serDe, outputOI); - elems.setTableDesc(PTFRowContainer.createTableDesc(inputOI)); + if ( createElemContainer ) { + int containerNumRows = HiveConf.getIntVar(cfg, ConfVars.HIVEJOINCACHESIZE); + elems = new PTFRowContainer>(containerNumRows, cfg, null); + elems.setSerDe(serDe, outputOI); + elems.setTableDesc(PTFRowContainer.createTableDesc(inputOI)); + } else { + elems = null; + } } public void reset() throws HiveException { @@ -233,6 +245,16 @@ public static PTFPartition create(Configuration cfg, throws HiveException { return new PTFPartition(cfg, serDe, inputOI, outputOI); } + + public static PTFRollingPartition createRolling(Configuration cfg, + SerDe serDe, + StructObjectInspector inputOI, + StructObjectInspector outputOI, + int precedingSpan, + int followingSpan) + throws HiveException { + return new PTFRollingPartition(cfg, serDe, inputOI, outputOI, precedingSpan, followingSpan); + } public static StructObjectInspector setupPartitionOutputOI(SerDe serDe, StructObjectInspector tblFnOI) throws SerDeException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PTFRollingPartition.java ql/src/java/org/apache/hadoop/hive/ql/exec/PTFRollingPartition.java new file mode 100644 index 0000000..e195c0a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PTFRollingPartition.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFunctionDef; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +public class PTFRollingPartition extends PTFPartition { + + /* + * num rows whose output is evaluated. + */ + int numRowsProcessed; + + /* + * number rows to maintain before nextRowToProcess + */ + int precedingSpan; + + /* + * number rows to maintain after nextRowToProcess + */ + int followingSpan; + + /* + * number of rows received. + */ + int numRowsReceived; + + /* + * State of the Rolling Partition + * + * x0 x1 x2 x3 x4 x5 x6 x7 x8 x9 x10 x11 x12 x13 x14 x15 x16 x17 + * ^ ^ ^ + * | | | + * |--preceding span--numRowsProcessed---followingSpan --numRowsRecived + * + * a. index x7 : represents the last output row + * b. so preceding span rows before that are still held on for subsequent rows processing. + * c. The #of rows beyond numRowsProcessed = followingSpan + */ + + /* + * cache of rows; guaranteed to contain precedingSpan rows before + * nextRowToProcess. + */ + List currWindow; + + protected PTFRollingPartition(Configuration cfg, SerDe serDe, + StructObjectInspector inputOI, StructObjectInspector outputOI, + int precedingSpan, int succeedingSpan) throws HiveException { + super(cfg, serDe, inputOI, outputOI, false); + this.precedingSpan = precedingSpan; + this.followingSpan = succeedingSpan; + currWindow = new ArrayList(precedingSpan + followingSpan); + } + + public void reset() throws HiveException { + currWindow.clear(); + numRowsProcessed = 0; + numRowsReceived = 0; + } + + public Object getAt(int i) throws HiveException { + int rangeStart = numRowsReceived - currWindow.size(); + return currWindow.get(i - rangeStart); + } + + public void append(Object o) throws HiveException { + @SuppressWarnings("unchecked") + List l = (List) ObjectInspectorUtils.copyToStandardObject( + o, inputOI, ObjectInspectorCopyOption.WRITABLE); + currWindow.add(l); + numRowsReceived++; + } + + public Object nextOutputRow() throws HiveException { + Object row = getAt(numRowsProcessed); + numRowsProcessed++; + if (numRowsProcessed > precedingSpan) { + currWindow.remove(0); + } + return row; + } + + public boolean processedAllRows() { + return numRowsProcessed >= numRowsReceived; + } + + public int rowToProcess(WindowFunctionDef wFn) { + int rowToProcess = numRowsReceived - wFn.getWindowFrame().getEnd().getAmt() + - 1; + return rowToProcess >= 0 ? rowToProcess : -1; + } + + public int size() { + return numRowsReceived; + } + + public PTFPartitionIterator iterator() throws HiveException { + return new RollingPItr(); + } + + public void close() { + } + + class RollingPItr implements PTFPartitionIterator { + + @Override + public boolean hasNext() { + throw new UnsupportedOperationException(); + } + + @Override + public Object next() { + throw new UnsupportedOperationException(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public int getIndex() { + return PTFRollingPartition.this.numRowsProcessed; + } + + @Override + public Object lead(int amt) throws HiveException { + int i = PTFRollingPartition.this.numRowsProcessed + amt; + i = i >= PTFRollingPartition.this.numRowsReceived ? PTFRollingPartition.this.numRowsReceived - 1 + : i; + return PTFRollingPartition.this.getAt(i); + } + + @Override + public Object lag(int amt) throws HiveException { + int i = PTFRollingPartition.this.numRowsProcessed - amt; + int start = PTFRollingPartition.this.numRowsReceived + - PTFRollingPartition.this.currWindow.size(); + + i = i < start ? start : i; + return PTFRollingPartition.this.getAt(i); + } + + @Override + public Object resetToIndex(int idx) throws HiveException { + return PTFRollingPartition.this.getAt(idx); + } + + @Override + public PTFPartition getPartition() { + return PTFRollingPartition.this; + } + + @Override + public void reset() throws HiveException { + } + + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java index 814ae37..ab3e0bf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java @@ -26,7 +26,12 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.ValueBoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum.GenericUDAFSumDouble.SumDoubleAgg; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; @@ -152,6 +157,59 @@ public AggregationBuffer getNewAggregationBuffer() throws HiveException { reset(result); return result; } + + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + + BoundaryDef start = wFrmDef.getStart(); + BoundaryDef end = wFrmDef.getEnd(); + + /* + * Currently we are not handling dynamic sized windows implied by range based windows. + */ + if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) { + return null; + } + + /* + * Windows that are unbounded following don't benefit from Streaming. + */ + if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { + return null; + } + + return new GenericUDAFStreamingEnhancer(this, + start.getAmt(), end.getAmt()) { + + @Override + protected DoubleWritable getNextResult( + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + throws HiveException { + AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; + Double r = myagg.count == 0 ? null : myagg.sum; + long cnt = myagg.count; + if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { + Object[] o = ss.intermediateVals.remove(0); + Double d = o == null ? 0.0 : (Double) o[0]; + r = r == null ? null : r - d; + cnt = cnt - ((Long) o[1]); + } + + return r == null ? null : new DoubleWritable(r / cnt); + } + + @Override + protected Object[] getCurrentIntermediateResult( + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + throws HiveException { + AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; + return myagg.count == 0 ? null : new Object[] { + new Double(myagg.sum), myagg.count }; + } + + }; + } } public static class GenericUDAFAverageEvaluatorDecimal extends AbstractGenericUDAFAverageEvaluator { @@ -241,6 +299,54 @@ public AggregationBuffer getNewAggregationBuffer() throws HiveException { reset(result); return result; } + + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + + BoundaryDef start = wFrmDef.getStart(); + BoundaryDef end = wFrmDef.getEnd(); + + if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) { + return null; + } + + if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { + return null; + } + + return new GenericUDAFStreamingEnhancer( + this, start.getAmt(), end.getAmt()) { + + @Override + protected HiveDecimalWritable getNextResult( + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + throws HiveException { + AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; + HiveDecimal r = myagg.count == 0 ? null : myagg.sum; + long cnt = myagg.count; + if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { + Object[] o = ss.intermediateVals.remove(0); + HiveDecimal d = o == null ? HiveDecimal.ZERO : (HiveDecimal) o[0]; + r = r == null ? null : r.subtract(d); + cnt = cnt - ((Long) o[1]); + } + + return r == null ? null : new HiveDecimalWritable( + r.divide(HiveDecimal.create(cnt))); + } + + @Override + protected Object[] getCurrentIntermediateResult( + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + throws HiveException { + AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf; + return myagg.count == 0 ? null : new Object[] { myagg.sum, + myagg.count }; + } + + }; + } } private static class AverageAggregationBuffer implements AggregationBuffer { diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java index 18c8c8d..fbadb91 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCumeDist.java @@ -53,12 +53,12 @@ static final Log LOG = LogFactory.getLog(GenericUDAFCumeDist.class.getName()); @Override - protected GenericUDAFRankEvaluator createEvaluator() + protected GenericUDAFAbstractRankEvaluator createEvaluator() { return new GenericUDAFCumeDistEvaluator(); } - public static class GenericUDAFCumeDistEvaluator extends GenericUDAFRankEvaluator + public static class GenericUDAFCumeDistEvaluator extends GenericUDAFAbstractRankEvaluator { @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java index c1d43d8..8856fb7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFDenseRank.java @@ -43,7 +43,7 @@ static final Log LOG = LogFactory.getLog(GenericUDAFDenseRank.class.getName()); @Override - protected GenericUDAFRankEvaluator createEvaluator() + protected GenericUDAFAbstractRankEvaluator createEvaluator() { return new GenericUDAFDenseRankEvaluator(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java index 5668a3b..3bd97b0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -233,4 +234,24 @@ public Object evaluate(AggregationBuffer agg) throws HiveException { */ public abstract Object terminate(AggregationBuffer agg) throws HiveException; + /** + * When evaluating an aggregates over a fixed Window, the naive way to compute + * results is to compute the aggregate for each row. But often there is a way + * to compute results in a more efficient manner. This method enables the + * basic evaluator to provide a function object that does the job in a more + * efficient manner. + *

+ * This method is called after this Evaluator is initialized. The returned + * Function must be initialized. It is passed the 'window' of aggregation for + * each row. + * + * @param wFrmDef + * the Window definition in play for this evaluation. + * @return null implies that this fn cannot be processed in Streaming mode. So + * each row is evaluated independently. + */ + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + return null; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java index aab1922..1cca03e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentRank.java @@ -49,12 +49,12 @@ static final Log LOG = LogFactory.getLog(GenericUDAFPercentRank.class.getName()); @Override - protected GenericUDAFRankEvaluator createEvaluator() + protected GenericUDAFAbstractRankEvaluator createEvaluator() { return new GenericUDAFPercentRankEvaluator(); } - public static class GenericUDAFPercentRankEvaluator extends GenericUDAFRankEvaluator + public static class GenericUDAFPercentRankEvaluator extends GenericUDAFAbstractRankEvaluator { @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java index 5c8f1e0..775d874 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -71,7 +72,7 @@ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticE return createEvaluator(); } - protected GenericUDAFRankEvaluator createEvaluator() + protected GenericUDAFAbstractRankEvaluator createEvaluator() { return new GenericUDAFRankEvaluator(); } @@ -83,10 +84,12 @@ protected GenericUDAFRankEvaluator createEvaluator() Object[] currVal; int currentRank; int numParams; + boolean supportsStreaming; - RankBuffer(int numParams) + RankBuffer(int numParams, boolean supportsStreaming) { this.numParams = numParams; + this.supportsStreaming = supportsStreaming; init(); } @@ -96,20 +99,33 @@ void init() currentRowNum = 0; currentRank = 0; currVal = new Object[numParams]; + if ( supportsStreaming ) { + /* initialize rowNums to have 1 row */ + rowNums.add(null); + } } - + void incrRowNum() { currentRowNum++; } void addRank() { - rowNums.add(new IntWritable(currentRank)); + if ( supportsStreaming ) { + rowNums.set(0, new IntWritable(currentRank)); + } else { + rowNums.add(new IntWritable(currentRank)); + } } } - public static class GenericUDAFRankEvaluator extends GenericUDAFEvaluator + public static abstract class GenericUDAFAbstractRankEvaluator extends GenericUDAFEvaluator { ObjectInspector[] inputOI; ObjectInspector[] outputOI; + boolean isStreamingMode = false; + + protected boolean isStreaming() { + return isStreamingMode; + } @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException @@ -132,7 +148,7 @@ public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveExc @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { - return new RankBuffer(inputOI.length); + return new RankBuffer(inputOI.length, isStreamingMode); } @Override @@ -183,6 +199,23 @@ public Object terminate(AggregationBuffer agg) throws HiveException } + public static class GenericUDAFRankEvaluator extends + GenericUDAFAbstractRankEvaluator implements + ISupportStreamingModeForWindowing { + + @Override + public Object getNextResult(AggregationBuffer agg) throws HiveException { + return ((RankBuffer) agg).rowNums.get(0); + } + + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + isStreamingMode = true; + return this; + } + + } + public static int compare(Object[] o1, ObjectInspector[] oi1, Object[] o2, ObjectInspector[] oi2) { diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java new file mode 100644 index 0000000..f899ae0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +@SuppressWarnings({ "deprecation", "unchecked" }) +public abstract class GenericUDAFStreamingEnhancer extends + GenericUDAFEvaluator implements ISupportStreamingModeForWindowing { + + private final GenericUDAFEvaluator wrappedEval; + private final int numPreceding; + private final int numFollowing; + + public GenericUDAFStreamingEnhancer(GenericUDAFEvaluator wrappedEval, + int numPreceding, int numFollowing) { + this.wrappedEval = wrappedEval; + this.numPreceding = numPreceding; + this.numFollowing = numFollowing; + this.mode = wrappedEval.mode; + } + + class StreamingState extends AbstractAggregationBuffer { + final AggregationBuffer wrappedBuf; + final int numPreceding; + final int numFollowing; + final List results; + final List intermediateVals; + int numRows; + + StreamingState(int numPreceding, int numFollowing, AggregationBuffer buf) { + this.wrappedBuf = buf; + this.numPreceding = numPreceding; + this.numFollowing = numFollowing; + results = new ArrayList(); + intermediateVals = new ArrayList(); + numRows = 0; + } + + @Override + public int estimate() { + if (!(wrappedBuf instanceof AbstractAggregationBuffer)) { + return -1; + } + int underlying = ((AbstractAggregationBuffer) wrappedBuf).estimate(); + if (underlying == -1) { + return -1; + } + if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) { + return -1; + } + /* + * sz Estimate = sz needed by underlying AggBuffer + + * sz for results + + * sz for intermediates + + * 3 * JavaDataModel.PRIMITIVES1 + * sz of results = sz of underlying * wdwSz + * sz of intermediates = sz of underlying * wdwSz + */ + + int wdwSz = numPreceding + numFollowing + 1; + return underlying + + (underlying * wdwSz) + + (underlying * wdwSz) + + (3 * JavaDataModel.PRIMITIVES1); + } + } + + @Override + public ObjectInspector init(Mode m, ObjectInspector[] parameters) + throws HiveException { + throw new HiveException(getClass().getSimpleName() + ": init not supported"); + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); + return new StreamingState(numPreceding, numFollowing, underlying); + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + StreamingState ss = (StreamingState) agg; + wrappedEval.reset(ss.wrappedBuf); + ss.results.clear(); + ss.intermediateVals.clear(); + ss.numRows = 0; + } + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) + throws HiveException { + StreamingState ss = (StreamingState) agg; + + wrappedEval.iterate(ss.wrappedBuf, parameters); + + if (ss.numRows >= ss.numFollowing) { + ss.results.add(getNextResult(ss)); + } + if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT) { + ss.intermediateVals.add(getCurrentIntermediateResult(ss)); + } + + ss.numRows++; + } + + @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + StreamingState ss = (StreamingState) agg; + Object o = wrappedEval.terminate(ss.wrappedBuf); + + for (int i = 0; i < ss.numFollowing; i++) { + ss.results.add(getNextResult(ss)); + } + return o; + } + + @Override + public Object terminatePartial(AggregationBuffer agg) throws HiveException { + throw new HiveException(getClass().getSimpleName() + + ": terminatePartial not supported"); + } + + @Override + public void merge(AggregationBuffer agg, Object partial) throws HiveException { + throw new HiveException(getClass().getSimpleName() + + ": merge not supported"); + } + + @Override + public Object getNextResult(AggregationBuffer agg) throws HiveException { + StreamingState ss = (StreamingState) agg; + if (!ss.results.isEmpty()) { + T1 res = ss.results.remove(0); + if (res == null) { + return ISupportStreamingModeForWindowing.NULL_RESULT; + } + return res; + } + return null; + } + + protected abstract T1 getNextResult(StreamingState ss) throws HiveException; + + protected abstract T2 getCurrentIntermediateResult(StreamingState ss) + throws HiveException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java index 8508ffb..2b82550 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java @@ -24,6 +24,10 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.ValueBoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -179,6 +183,49 @@ public Object terminate(AggregationBuffer agg) throws HiveException { return result; } + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + + BoundaryDef start = wFrmDef.getStart(); + BoundaryDef end = wFrmDef.getEnd(); + + if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) { + return null; + } + + if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { + return null; + } + + return new GenericUDAFStreamingEnhancer( + this, start.getAmt(), end.getAmt()) { + + @Override + protected HiveDecimalWritable getNextResult( + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + throws HiveException { + SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf; + HiveDecimal r = myagg.empty ? null : myagg.sum; + if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { + HiveDecimal d = (HiveDecimal) ss.intermediateVals.remove(0); + d = d == null ? HiveDecimal.ZERO : d; + r = r == null ? null : r.subtract(d); + } + + return r == null ? null : new HiveDecimalWritable(r); + } + + @Override + protected HiveDecimal getCurrentIntermediateResult( + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + throws HiveException { + SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf; + return myagg.empty ? null : myagg.sum; + } + + }; + } } /** @@ -264,6 +311,50 @@ public Object terminate(AggregationBuffer agg) throws HiveException { return result; } + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + + BoundaryDef start = wFrmDef.getStart(); + BoundaryDef end = wFrmDef.getEnd(); + + if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) { + return null; + } + + if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { + return null; + } + + return new GenericUDAFStreamingEnhancer(this, + start.getAmt(), end.getAmt()) { + + @Override + protected DoubleWritable getNextResult( + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + throws HiveException { + SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf; + Double r = myagg.empty ? null : myagg.sum; + if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { + Double d = (Double) ss.intermediateVals.remove(0); + d = d == null ? 0.0 : d; + r = r == null ? null : r - d; + } + + return r == null ? null : new DoubleWritable(r); + } + + @Override + protected Double getCurrentIntermediateResult( + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + throws HiveException { + SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf; + return myagg.empty ? null : new Double(myagg.sum); + } + + }; + } + } /** @@ -346,6 +437,49 @@ public Object terminate(AggregationBuffer agg) throws HiveException { return result; } + @Override + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + + BoundaryDef start = wFrmDef.getStart(); + BoundaryDef end = wFrmDef.getEnd(); + + if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) { + return null; + } + + if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) { + return null; + } + + return new GenericUDAFStreamingEnhancer(this, + start.getAmt(), end.getAmt()) { + + @Override + protected LongWritable getNextResult( + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + throws HiveException { + SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf; + Long r = myagg.empty ? null : myagg.sum; + if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { + Long d = (Long) ss.intermediateVals.remove(0); + d = d == null ? 0 : d; + r = r == null ? null : r - d; + } + + return r == null ? null : new LongWritable(r); + } + + @Override + protected Long getCurrentIntermediateResult( + org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer.StreamingState ss) + throws HiveException { + SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf; + return myagg.empty ? null : new Long(myagg.sum); + } + + }; + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java new file mode 100644 index 0000000..cf2035c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction; + +/** + * A GenericUDAF mode that provides it results as a List to the + * {@link WindowingTableFunction} (so it is a + * {@link WindowFunctionInfo#isPivotResult()} return true) may support this + * interface. If it does then the WindowingTableFunction will ask it for the + * next Result after every aggregate call. + */ +public interface ISupportStreamingModeForWindowing { + + Object getNextResult(AggregationBuffer agg) throws HiveException; + + public static Object NULL_RESULT = new Object(); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java index d50a542..41e0102 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopStreaming.java @@ -21,37 +21,47 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.PTFDesc; import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; public class NoopStreaming extends Noop { - + List rows; - + StructObjectInspector inputOI; + NoopStreaming() { rows = new ArrayList(); } - - public boolean canAcceptInputAsStream() { - return true; - } - + + public void initializeStreaming(Configuration cfg, + StructObjectInspector inputOI, boolean isMapSide) throws HiveException { + this.inputOI = inputOI; + canAcceptInputAsStream = true; + } + public List processRow(Object row) throws HiveException { - if (!canAcceptInputAsStream() ) { + if (!canAcceptInputAsStream()) { throw new HiveException(String.format( - "Internal error: PTF %s, doesn't support Streaming", - getClass().getName())); + "Internal error: PTF %s, doesn't support Streaming", getClass() + .getName())); } rows.clear(); + row = ObjectInspectorUtils.copyToStandardObject(row, inputOI, + ObjectInspectorCopyOption.WRITABLE); rows.add(row); return rows; } - + public static class NoopStreamingResolver extends NoopResolver { @Override - protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef) { + protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, + PartitionedTableFunctionDef tDef) { return new NoopStreaming(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java index be1f9ab..5d322d3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/NoopWithMapStreaming.java @@ -21,36 +21,46 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.PTFDesc; import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; public class NoopWithMapStreaming extends NoopWithMap { List rows; - + StructObjectInspector inputOI; + NoopWithMapStreaming() { rows = new ArrayList(); } - - public boolean canAcceptInputAsStream() { - return true; - } - + + public void initializeStreaming(Configuration cfg, + StructObjectInspector inputOI, boolean isMapSide) throws HiveException { + this.inputOI = inputOI; + canAcceptInputAsStream = true; + } + public List processRow(Object row) throws HiveException { - if (!canAcceptInputAsStream() ) { + if (!canAcceptInputAsStream()) { throw new HiveException(String.format( - "Internal error: PTF %s, doesn't support Streaming", - getClass().getName())); + "Internal error: PTF %s, doesn't support Streaming", getClass() + .getName())); } rows.clear(); + row = ObjectInspectorUtils.copyToStandardObject(row, inputOI, + ObjectInspectorCopyOption.WRITABLE); rows.add(row); return rows; } - + public static class NoopWithMapStreamingResolver extends NoopWithMapResolver { @Override - protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, PartitionedTableFunctionDef tDef) { + protected TableFunctionEvaluator createEvaluator(PTFDesc ptfDesc, + PartitionedTableFunctionDef tDef) { return new NoopStreaming(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java index 8a1e085..b8b819e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.PTFPartition; import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator; @@ -29,6 +30,7 @@ import org.apache.hadoop.hive.ql.plan.PTFDesc; import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; /* @@ -91,6 +93,7 @@ protected PTFDesc ptfDesc; boolean transformsRawInput; transient protected PTFPartition outputPartition; + transient protected boolean canAcceptInputAsStream; static { PTFUtils.makeTransient(TableFunctionEvaluator.class, "outputOI", "rawInputOI"); @@ -215,9 +218,14 @@ public boolean canIterateOutput() { * remaining o/p rows. */ public boolean canAcceptInputAsStream() { - return false; + return canAcceptInputAsStream; } - + + public void initializeStreaming(Configuration cfg, + StructObjectInspector inputOI, boolean isMapSide) throws HiveException { + canAcceptInputAsStream = false; + } + public void startPartition() throws HiveException { if (!canAcceptInputAsStream() ) { throw new HiveException(String.format( diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java index cdb5624..0a67fea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java @@ -25,9 +25,13 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.PTFPartition; import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator; +import org.apache.hadoop.hive.ql.exec.PTFRollingPartition; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -43,6 +47,9 @@ import org.apache.hadoop.hive.ql.plan.ptf.WindowTableFunctionDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -52,6 +59,8 @@ @SuppressWarnings("deprecation") public class WindowingTableFunction extends TableFunctionEvaluator { + StreamingState streamingState; + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void execute(PTFPartitionIterator pItr, PTFPartition outP) throws HiveException { @@ -133,6 +142,251 @@ private boolean processWindow(WindowFunctionDef wFn) { return true; } + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator#canAcceptInputAsStream + * () + * + * WindowTableFunction supports streaming if all functions meet one of these + * conditions: 1. The Function implements ISupportStreamingModeForWindowing 2. + * Or returns a non null Object for the getWindowingEvaluator, that implements + * ISupportStreamingModeForWindowing. 3. Is an invocation on a 'fixed' window. + * So no Unbounded Preceding or Following. + */ + private int[] setCanAcceptInputAsStream(Configuration cfg) { + + canAcceptInputAsStream = false; + + if (ptfDesc.getLlInfo().getLeadLagExprs() != null) { + return null; + } + + WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef(); + int precedingSpan = 0; + int followingSpan = 0; + + for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) { + WindowFunctionDef wFnDef = tabDef.getWindowFunctions().get(i); + WindowFrameDef wdwFrame = wFnDef.getWindowFrame(); + GenericUDAFEvaluator fnEval = wFnDef.getWFnEval(); + GenericUDAFEvaluator streamingEval = fnEval + .getWindowingEvaluator(wdwFrame); + if (streamingEval != null + && streamingEval instanceof ISupportStreamingModeForWindowing) { + continue; + } + BoundaryDef start = wdwFrame.getStart(); + BoundaryDef end = wdwFrame.getEnd(); + if (!(end instanceof ValueBoundaryDef) + && !(start instanceof ValueBoundaryDef)) { + if (end.getAmt() != BoundarySpec.UNBOUNDED_AMOUNT + && start.getAmt() != BoundarySpec.UNBOUNDED_AMOUNT + && end.getDirection() != Direction.PRECEDING + && start.getDirection() != Direction.FOLLOWING) { + + int amt = wdwFrame.getStart().getAmt(); + if (amt > precedingSpan) { + precedingSpan = amt; + } + + amt = wdwFrame.getEnd().getAmt(); + if (amt > followingSpan) { + followingSpan = amt; + } + continue; + } + } + return null; + } + + int windowLimit = HiveConf.getIntVar(cfg, ConfVars.HIVEJOINCACHESIZE); + + if (windowLimit < (followingSpan + precedingSpan + 1)) { + return null; + } + + canAcceptInputAsStream = true; + return new int[] {precedingSpan, followingSpan}; + } + + @Override + public void initializeStreaming(Configuration cfg, + StructObjectInspector inputOI, boolean isMapSide) throws HiveException { + + int[] span = setCanAcceptInputAsStream(cfg); + if (!canAcceptInputAsStream) { + return; + } + + WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef(); + + for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) { + WindowFunctionDef wFnDef = tabDef.getWindowFunctions().get(i); + WindowFrameDef wdwFrame = wFnDef.getWindowFrame(); + GenericUDAFEvaluator fnEval = wFnDef.getWFnEval(); + GenericUDAFEvaluator streamingEval = fnEval + .getWindowingEvaluator(wdwFrame); + if (streamingEval != null) { + wFnDef.setWFnEval(streamingEval); + if (wFnDef.isPivotResult()) { + ListObjectInspector listOI = (ListObjectInspector) wFnDef.getOI(); + wFnDef.setOI(listOI.getListElementObjectInspector()); + } + } + } + streamingState = new StreamingState(cfg, inputOI, isMapSide, tabDef, + span[0], span[1]); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator#startPartition() + */ + @Override + public void startPartition() throws HiveException { + WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef(); + streamingState.reset(tabDef); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator#processRow(java + * .lang.Object) + * + * - hand row to each Function, provided there are enough rows for Function's + * window. - call getNextObject on each Function. - output as many rows as + * possible, based on minimum sz of Output List + */ + @Override + public List processRow(Object row) throws HiveException { + + streamingState.rollingPart.append(row); + row = streamingState.rollingPart + .getAt(streamingState.rollingPart.size() - 1); + + WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef(); + + for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) { + WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i); + GenericUDAFEvaluator fnEval = wFn.getWFnEval(); + + int a = 0; + if (wFn.getArgs() != null) { + for (PTFExpressionDef arg : wFn.getArgs()) { + streamingState.funcArgs[i][a++] = arg.getExprEvaluator().evaluate(row); + } + } + + if (fnEval instanceof ISupportStreamingModeForWindowing) { + fnEval.aggregate(streamingState.aggBuffers[i], streamingState.funcArgs[i]); + Object out = ((ISupportStreamingModeForWindowing) fnEval) + .getNextResult(streamingState.aggBuffers[i]); + if (out != null) { + streamingState.fnOutputs[i] + .add(out == ISupportStreamingModeForWindowing.NULL_RESULT ? null + : out); + } + } else { + int rowToProcess = streamingState.rollingPart.rowToProcess(wFn); + if (rowToProcess >= 0) { + Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart, + streamingState.order); + PTFPartitionIterator rItr = rng.iterator(); + PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr); + Object out = evaluateWindowFunction(wFn, rItr); + streamingState.fnOutputs[i].add(out); + } + } + } + + List oRows = new ArrayList(); + while (true) { + boolean hasRow = streamingState.hasOutputRow(); + + if (!hasRow) { + break; + } + + oRows.add(streamingState.nextOutputRow()); + } + + return oRows.size() == 0 ? null : oRows; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator#finishPartition() + * + * for fns that are not ISupportStreamingModeForWindowing give them the + * remaining rows (rows whose span went beyond the end of the partition) for + * rest of the functions invoke terminate. + * + * while numOutputRows < numInputRows for each Fn that doesn't have enough o/p + * invoke getNextObj if there is no O/p then flag this as an error. + */ + @Override + public List finishPartition() throws HiveException { + + WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef(); + for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) { + WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i); + GenericUDAFEvaluator fnEval = wFn.getWFnEval(); + + int numRowsRemaining = wFn.getWindowFrame().getEnd().getAmt(); + if (fnEval instanceof ISupportStreamingModeForWindowing) { + fnEval.terminate(streamingState.aggBuffers[i]); + if (numRowsRemaining != BoundarySpec.UNBOUNDED_AMOUNT) { + while (numRowsRemaining > 0) { + Object out = ((ISupportStreamingModeForWindowing) fnEval) + .getNextResult(streamingState.aggBuffers[i]); + if (out != null) { + streamingState.fnOutputs[i] + .add(out == ISupportStreamingModeForWindowing.NULL_RESULT ? null + : out); + } + numRowsRemaining--; + } + } + } else { + while (numRowsRemaining > 0) { + int rowToProcess = streamingState.rollingPart.size() + - numRowsRemaining; + Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart, + streamingState.order); + PTFPartitionIterator rItr = rng.iterator(); + PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr); + Object out = evaluateWindowFunction(wFn, rItr); + streamingState.fnOutputs[i].add(out); + numRowsRemaining--; + } + } + + } + + List oRows = new ArrayList(); + + while (!streamingState.rollingPart.processedAllRows()) { + boolean hasRow = streamingState.hasOutputRow(); + ; + + if (!hasRow) { + throw new HiveException( + "Internal Error: cannot generate all output rows for a Partition"); + } + oRows.add(streamingState.nextOutputRow()); + } + + return oRows.size() == 0 ? null : oRows; + } + @Override public boolean canIterateOutput() { return true; @@ -155,16 +409,19 @@ public boolean canIterateOutput() { Object out = evaluateWindowFunction(wFn, pItr); output.add(out); } else if (wFn.isPivotResult()) { - /* - * for functions that currently return the output as a List, - * for e.g. the ranking functions, lead/lag, ntile, cume_dist - * - for now continue to execute them here. The functions need to provide a way to get - * each output row as we are iterating through the input. This is relative - * easy to do for ranking functions; not possible for lead, ntile, cume_dist. - * - */ - outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr); - output.add(null); + GenericUDAFEvaluator streamingEval = wFn.getWFnEval().getWindowingEvaluator(wFn.getWindowFrame()); + if ( streamingEval != null && streamingEval instanceof ISupportStreamingModeForWindowing ) { + wFn.setWFnEval(streamingEval); + if ( wFn.getOI() instanceof ListObjectInspector ) { + ListObjectInspector listOI = (ListObjectInspector) wFn.getOI(); + wFn.setOI(listOI.getListElementObjectInspector()); + } + output.add(null); + wFnsWithWindows.add(i); + } else { + outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr); + output.add(null); + } } else { output.add(null); wFnsWithWindows.add(i); @@ -884,8 +1141,10 @@ public int size() { Order order; PTFDesc ptfDesc; StructObjectInspector inputOI; + AggregationBuffer[] aggBuffers; + Object[][] args; - WindowingIterator(PTFPartition iPart, ArrayList output, + WindowingIterator(PTFPartition iPart, ArrayList output, List[] outputFromPivotFunctions, int[] wFnsToProcess) { this.iPart = iPart; this.output = output; @@ -896,6 +1155,18 @@ public int size() { order = wTFnDef.getOrder().getExpressions().get(0).getOrder(); ptfDesc = getQueryDef(); inputOI = iPart.getOutputOI(); + + aggBuffers = new AggregationBuffer[wTFnDef.getWindowFunctions().size()]; + args = new Object[wTFnDef.getWindowFunctions().size()][]; + try { + for (int j : wFnsToProcess) { + WindowFunctionDef wFn = wTFnDef.getWindowFunctions().get(j); + aggBuffers[j] = wFn.getWFnEval().getNewAggregationBuffer(); + args[j] = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()]; + } + } catch (HiveException he) { + throw new RuntimeException(he); + } } @Override @@ -915,10 +1186,25 @@ public Object next() { try { for (int j : wFnsToProcess) { WindowFunctionDef wFn = wTFnDef.getWindowFunctions().get(j); - Range rng = getRange(wFn, currIdx, iPart, order); - PTFPartitionIterator rItr = rng.iterator(); - PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr); - output.set(j, evaluateWindowFunction(wFn, rItr)); + if (wFn.getWFnEval() instanceof ISupportStreamingModeForWindowing) { + Object iRow = iPart.getAt(currIdx); + int a = 0; + if (wFn.getArgs() != null) { + for (PTFExpressionDef arg : wFn.getArgs()) { + args[j][a++] = arg.getExprEvaluator().evaluate(iRow); + } + } + wFn.getWFnEval().aggregate(aggBuffers[j], args[j]); + Object out = ((ISupportStreamingModeForWindowing) wFn.getWFnEval()) + .getNextResult(aggBuffers[j]); + out = ObjectInspectorUtils.copyToStandardObject(out, wFn.getOI()); + output.set(j, out); + } else { + Range rng = getRange(wFn, currIdx, iPart, order); + PTFPartitionIterator rItr = rng.iterator(); + PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr); + output.set(j, evaluateWindowFunction(wFn, rItr)); + } } Object iRow = iPart.getAt(currIdx); @@ -942,4 +1228,72 @@ public void remove() { } + class StreamingState { + PTFRollingPartition rollingPart; + List[] fnOutputs; + AggregationBuffer[] aggBuffers; + Object[][] funcArgs; + Order order; + + @SuppressWarnings("unchecked") + StreamingState(Configuration cfg, StructObjectInspector inputOI, + boolean isMapSide, WindowTableFunctionDef tabDef, int precedingSpan, + int followingSpan) throws HiveException { + SerDe serde = isMapSide ? tabDef.getInput().getOutputShape().getSerde() + : tabDef.getRawInputShape().getSerde(); + StructObjectInspector outputOI = isMapSide ? tabDef.getInput() + .getOutputShape().getOI() : tabDef.getRawInputShape().getOI(); + rollingPart = PTFPartition.createRolling(cfg, serde, inputOI, outputOI, + precedingSpan, followingSpan); + + order = tabDef.getOrder().getExpressions().get(0).getOrder(); + + int numFns = tabDef.getWindowFunctions().size(); + fnOutputs = new ArrayList[numFns]; + + aggBuffers = new AggregationBuffer[numFns]; + funcArgs = new Object[numFns][]; + for (int i = 0; i < numFns; i++) { + fnOutputs[i] = new ArrayList(); + WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i); + funcArgs[i] = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()]; + } + } + + void reset(WindowTableFunctionDef tabDef) throws HiveException { + int numFns = tabDef.getWindowFunctions().size(); + rollingPart.reset(); + for (int i = 0; i < fnOutputs.length; i++) { + fnOutputs[i].clear(); + } + + for (int i = 0; i < numFns; i++) { + WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i); + aggBuffers[i] = wFn.getWFnEval().getNewAggregationBuffer(); + } + } + + boolean hasOutputRow() { + for (int i = 0; i < fnOutputs.length; i++) { + if (fnOutputs[i].size() == 0) { + return false; + } + } + return true; + } + + List nextOutputRow() throws HiveException { + List oRow = new ArrayList(); + Object iRow = rollingPart.nextOutputRow(); + int i = 0; + for (; i < fnOutputs.length; i++) { + oRow.add(fnOutputs[i].remove(0)); + } + for (StructField f : rollingPart.getOutputOI().getAllStructFieldRefs()) { + oRow.add(rollingPart.getOutputOI().getStructFieldData(iRow, f)); + } + return oRow; + } + } + } diff --git ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingAvg.java ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingAvg.java new file mode 100644 index 0000000..61ca4c3 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingAvg.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udaf; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.udaf.TestStreamingSum.TypeHandler; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.junit.Test; + +public class TestStreamingAvg { + + public void avgDouble(Iterator inVals, int inSz, int numPreceding, + int numFollowing, Iterator outVals) throws HiveException { + + GenericUDAFAverage fnR = new GenericUDAFAverage(); + TypeInfo[] inputTypes = { TypeInfoFactory.doubleTypeInfo }; + ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableDoubleObjectInspector }; + + DoubleWritable[] in = new DoubleWritable[1]; + in[0] = new DoubleWritable(); + + TestStreamingSum._agg(fnR, inputTypes, inVals, TypeHandler.DoubleHandler, + in, inputOIs, inSz, numPreceding, numFollowing, outVals); + + } + + public void avgHiveDecimal(Iterator inVals, int inSz, + int numPreceding, int numFollowing, Iterator outVals) + throws HiveException { + + GenericUDAFAverage fnR = new GenericUDAFAverage(); + TypeInfo[] inputTypes = { TypeInfoFactory.decimalTypeInfo }; + ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector }; + + HiveDecimalWritable[] in = new HiveDecimalWritable[1]; + in[0] = new HiveDecimalWritable(); + + TestStreamingSum._agg(fnR, inputTypes, inVals, + TypeHandler.HiveDecimalHandler, in, inputOIs, inSz, numPreceding, + numFollowing, outVals); + + } + + @Test + public void testDouble_3_4() throws HiveException { + + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(15.0 / 5, 21.0 / 6, 28.0 / 7, + 36.0 / 8, 44.0 / 8, 52.0 / 8, 49.0 / 7, 45.0 / 6, 40.0 / 5, 34.0 / 4); + avgDouble(inVals.iterator(), 10, 3, 4, outVals.iterator()); + } + + @Test + public void testHiveDecimal_3_4() throws HiveException { + + List inVals = Arrays + .asList(HiveDecimal.create(1L), HiveDecimal.create(2L), + HiveDecimal.create(3L), HiveDecimal.create(4L), + HiveDecimal.create(5L), HiveDecimal.create(6L), + HiveDecimal.create(7L), HiveDecimal.create(8L), + HiveDecimal.create(9L), HiveDecimal.create(10L)); + List outVals = Arrays.asList( + HiveDecimal.create(new BigDecimal(15.0 / 5)), + HiveDecimal.create(new BigDecimal(21.0 / 6)), + HiveDecimal.create(new BigDecimal(28.0 / 7)), + HiveDecimal.create(new BigDecimal(36.0 / 8)), + HiveDecimal.create(new BigDecimal(44.0 / 8)), + HiveDecimal.create(new BigDecimal(52.0 / 8)), + HiveDecimal.create(new BigDecimal(49.0 / 7)), + HiveDecimal.create(new BigDecimal(45.0 / 6)), + HiveDecimal.create(new BigDecimal(40.0 / 5)), + HiveDecimal.create(new BigDecimal(34.0 / 4))); + avgHiveDecimal(inVals.iterator(), 10, 3, 4, outVals.iterator()); + } + + @Test + public void testDouble_3_0() throws HiveException { + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(1.0 / 1, 3.0 / 2, 6.0 / 3, 10.0 / 4, + 14.0 / 4, 18.0 / 4, 22.0 / 4, 26.0 / 4, 30.0 / 4, 34.0 / 4); + avgDouble(inVals.iterator(), 10, 3, 0, outVals.iterator()); + } + + @Test + public void testDouble_unb_0() throws HiveException { + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(1.0 / 1, 3.0 / 2, 6.0 / 3, 10.0 / 4, + 15.0 / 5, 21.0 / 6, 28.0 / 7, 36.0 / 8, 45.0 / 9, 55.0 / 10); + avgDouble(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 0, + outVals.iterator()); + } + + @Test + public void testDouble_0_5() throws HiveException { + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(21.0 / 6, 27.0 / 6, 33.0 / 6, + 39.0 / 6, 45.0 / 6, 40.0 / 5, 34.0 / 4, 27.0 / 3, 19.0 / 2, 10.0 / 1); + avgDouble(inVals.iterator(), 10, 0, 5, outVals.iterator()); + } + + @Test + public void testDouble_unb_5() throws HiveException { + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(21.0 / 6, 28.0 / 7, 36.0 / 8, + 45.0 / 9, 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10, + 55.0 / 10); + avgDouble(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 5, + outVals.iterator()); + } + + @Test + public void testDouble_7_2() throws HiveException { + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(6.0 / 3, 10.0 / 4, 15.0 / 5, 21.0 / 6, + 28.0 / 7, 36.0 / 8, 45.0 / 9, 55.0 / 10, 54.0 / 9, 52.0 / 8); + avgDouble(inVals.iterator(), 10, 7, 2, outVals.iterator()); + } + + @Test + public void testDouble_15_15() throws HiveException { + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(55.0 / 10, 55.0 / 10, 55.0 / 10, + 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10, 55.0 / 10, + 55.0 / 10); + avgDouble(inVals.iterator(), 10, 15, 15, outVals.iterator()); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java new file mode 100644 index 0000000..ef6a4b5 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java @@ -0,0 +1,443 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udaf; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction; +import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.CurrentRowDef; +import org.apache.hadoop.hive.ql.plan.ptf.RangeBoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum; +import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.LongWritable; +import org.junit.Test; + +public class TestStreamingSum { + + public static WindowFrameDef wdwFrame(int p, int f) { + WindowFrameDef wFrmDef = new WindowFrameDef(); + BoundaryDef start, end; + if (p == 0) { + start = new CurrentRowDef(); + } else { + RangeBoundaryDef startR = new RangeBoundaryDef(); + startR.setDirection(Direction.PRECEDING); + startR.setAmt(p); + start = startR; + } + + if (f == 0) { + end = new CurrentRowDef(); + } else { + RangeBoundaryDef endR = new RangeBoundaryDef(); + endR.setDirection(Direction.FOLLOWING); + endR.setAmt(f); + end = endR; + } + wFrmDef.setStart(start); + wFrmDef.setEnd(end); + return wFrmDef; + } + + public void sumDouble(Iterator inVals, int inSz, int numPreceding, + int numFollowing, Iterator outVals) throws HiveException { + + GenericUDAFSum fnR = new GenericUDAFSum(); + TypeInfo[] inputTypes = { TypeInfoFactory.doubleTypeInfo }; + ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableDoubleObjectInspector }; + + DoubleWritable[] in = new DoubleWritable[1]; + in[0] = new DoubleWritable(); + + _agg(fnR, inputTypes, inVals, TypeHandler.DoubleHandler, in, inputOIs, + inSz, numPreceding, numFollowing, outVals); + + } + + public void sumLong(Iterator inVals, int inSz, int numPreceding, + int numFollowing, Iterator outVals) throws HiveException { + + GenericUDAFSum fnR = new GenericUDAFSum(); + TypeInfo[] inputTypes = { TypeInfoFactory.longTypeInfo }; + ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableLongObjectInspector }; + + LongWritable[] in = new LongWritable[1]; + in[0] = new LongWritable(); + + _agg(fnR, inputTypes, inVals, TypeHandler.LongHandler, in, inputOIs, inSz, + numPreceding, numFollowing, outVals); + + } + + public void sumHiveDecimal(Iterator inVals, int inSz, + int numPreceding, int numFollowing, Iterator outVals) + throws HiveException { + + GenericUDAFSum fnR = new GenericUDAFSum(); + TypeInfo[] inputTypes = { TypeInfoFactory.decimalTypeInfo }; + ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector }; + + HiveDecimalWritable[] in = new HiveDecimalWritable[1]; + in[0] = new HiveDecimalWritable(); + + _agg(fnR, inputTypes, inVals, TypeHandler.HiveDecimalHandler, in, inputOIs, + inSz, numPreceding, numFollowing, outVals); + + } + + static interface TypeHandler { + public void set(T i, TW iw); + + public T get(TW iw); + + TypeHandler DoubleHandler = new TypeHandler() { + public void set(Double d, DoubleWritable iw) { + iw.set(d); + } + + public Double get(DoubleWritable iw) { + return iw.get(); + } + }; + + TypeHandler LongHandler = new TypeHandler() { + public void set(Long d, LongWritable iw) { + iw.set(d); + } + + public Long get(LongWritable iw) { + return iw.get(); + } + }; + + TypeHandler HiveDecimalHandler = new TypeHandler() { + public void set(HiveDecimal d, HiveDecimalWritable iw) { + iw.set(d); + } + + public HiveDecimal get(HiveDecimalWritable iw) { + return iw.getHiveDecimal(); + } + }; + } + + public static void _agg(GenericUDAFResolver fnR, + TypeInfo[] inputTypes, Iterator inVals, + TypeHandler typeHandler, TW[] in, ObjectInspector[] inputOIs, + int inSz, int numPreceding, int numFollowing, Iterator outVals) + throws HiveException { + + GenericUDAFEvaluator fn = fnR.getEvaluator(inputTypes); + fn.init(Mode.COMPLETE, inputOIs); + fn = fn.getWindowingEvaluator(wdwFrame(numPreceding, numFollowing)); + AggregationBuffer agg = fn.getNewAggregationBuffer(); + ISupportStreamingModeForWindowing oS = (ISupportStreamingModeForWindowing) fn; + + int outSz = 0; + while (inVals.hasNext()) { + typeHandler.set(inVals.next(), in[0]); + fn.aggregate(agg, in); + Object out = oS.getNextResult(agg); + if (out != null) { + out = out == ISupportStreamingModeForWindowing.NULL_RESULT ? null + : typeHandler.get((TW) out); + Assert.assertEquals(out, outVals.next()); + outSz++; + } + } + + fn.terminate(agg); + + while (outSz < inSz) { + Object out = oS.getNextResult(agg); + out = out == ISupportStreamingModeForWindowing.NULL_RESULT ? null + : typeHandler.get((TW) out); + Assert.assertEquals(out, outVals.next()); + outSz++; + } + + } + + @Test + public void testDouble_3_4() throws HiveException { + + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(15.0, 21.0, 28.0, 36.0, 44.0, 52.0, + 49.0, 45.0, 40.0, 34.0); + sumDouble(inVals.iterator(), 10, 3, 4, outVals.iterator()); + } + + @Test + public void testDouble_3_0() throws HiveException { + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(1.0, 3.0, 6.0, 10.0, 14.0, 18.0, 22.0, + 26.0, 30.0, 34.0); + sumDouble(inVals.iterator(), 10, 3, 0, outVals.iterator()); + } + + @Test + public void testDouble_unb_0() throws HiveException { + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(1.0, 3.0, 6.0, 10.0, 15.0, 21.0, 28.0, + 36.0, 45.0, 55.0); + sumDouble(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 0, + outVals.iterator()); + } + + @Test + public void testDouble_0_5() throws HiveException { + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(21.0, 27.0, 33.0, 39.0, 45.0, 40.0, + 34.0, 27.0, 19.0, 10.0); + sumDouble(inVals.iterator(), 10, 0, 5, outVals.iterator()); + } + + @Test + public void testDouble_unb_5() throws HiveException { + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(21.0, 28.0, 36.0, 45.0, 55.0, 55.0, + 55.0, 55.0, 55.0, 55.0); + sumDouble(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 5, + outVals.iterator()); + } + + @Test + public void testDouble_7_2() throws HiveException { + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(6.0, 10.0, 15.0, 21.0, 28.0, 36.0, + 45.0, 55.0, 54.0, 52.0); + sumDouble(inVals.iterator(), 10, 7, 2, outVals.iterator()); + } + + @Test + public void testDouble_15_15() throws HiveException { + List inVals = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, + 9.0, 10.0); + List outVals = Arrays.asList(55.0, 55.0, 55.0, 55.0, 55.0, 55.0, + 55.0, 55.0, 55.0, 55.0); + sumDouble(inVals.iterator(), 10, 15, 15, outVals.iterator()); + } + + @Test + public void testLong_3_4() throws HiveException { + + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(15L, 21L, 28L, 36L, 44L, 52L, 49L, 45L, + 40L, 34L); + sumLong(inVals.iterator(), 10, 3, 4, outVals.iterator()); + } + + @Test + public void testLong_3_0() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 3L, 6L, 10L, 14L, 18L, 22L, 26L, + 30L, 34L); + sumLong(inVals.iterator(), 10, 3, 0, outVals.iterator()); + } + + @Test + public void testLong_unb_0() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(1L, 3L, 6L, 10L, 15L, 21L, 28L, 36L, + 45L, 55L); + sumLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 0, + outVals.iterator()); + } + + @Test + public void testLong_0_5() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(21L, 27L, 33L, 39L, 45L, 40L, 34L, 27L, + 19L, 10L); + sumLong(inVals.iterator(), 10, 0, 5, outVals.iterator()); + } + + @Test + public void testLong_unb_5() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(21L, 28L, 36L, 45L, 55L, 55L, 55L, 55L, + 55L, 55L); + sumLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 5, + outVals.iterator()); + } + + @Test + public void testLong_7_2() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(6L, 10L, 15L, 21L, 28L, 36L, 45L, 55L, + 54L, 52L); + sumLong(inVals.iterator(), 10, 7, 2, outVals.iterator()); + } + + @Test + public void testLong_15_15() throws HiveException { + List inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); + List outVals = Arrays.asList(55L, 55L, 55L, 55L, 55L, 55L, 55L, 55L, + 55L, 55L); + sumLong(inVals.iterator(), 10, 15, 15, outVals.iterator()); + } + + @Test + public void testHiveDecimal_3_4() throws HiveException { + + List inVals = Arrays + .asList(HiveDecimal.create(1L), HiveDecimal.create(2L), + HiveDecimal.create(3L), HiveDecimal.create(4L), + HiveDecimal.create(5L), HiveDecimal.create(6L), + HiveDecimal.create(7L), HiveDecimal.create(8L), + HiveDecimal.create(9L), HiveDecimal.create(10L)); + List outVals = Arrays.asList(HiveDecimal.create(15L), + HiveDecimal.create(21L), HiveDecimal.create(28L), + HiveDecimal.create(36L), HiveDecimal.create(44L), + HiveDecimal.create(52L), HiveDecimal.create(49L), + HiveDecimal.create(45L), HiveDecimal.create(40L), + HiveDecimal.create(34L)); + sumHiveDecimal(inVals.iterator(), 10, 3, 4, outVals.iterator()); + } + + @Test + public void testHiveDecimal_3_0() throws HiveException { + List inVals = Arrays + .asList(HiveDecimal.create(1L), HiveDecimal.create(2L), + HiveDecimal.create(3L), HiveDecimal.create(4L), + HiveDecimal.create(5L), HiveDecimal.create(6L), + HiveDecimal.create(7L), HiveDecimal.create(8L), + HiveDecimal.create(9L), HiveDecimal.create(10L)); + List outVals = Arrays.asList(HiveDecimal.create(1L), + HiveDecimal.create(3L), HiveDecimal.create(6L), + HiveDecimal.create(10L), HiveDecimal.create(14L), + HiveDecimal.create(18L), HiveDecimal.create(22L), + HiveDecimal.create(26L), HiveDecimal.create(30L), + HiveDecimal.create(34L)); + sumHiveDecimal(inVals.iterator(), 10, 3, 0, outVals.iterator()); + } + + @Test + public void testHiveDecimal_unb_0() throws HiveException { + List inVals = Arrays + .asList(HiveDecimal.create(1L), HiveDecimal.create(2L), + HiveDecimal.create(3L), HiveDecimal.create(4L), + HiveDecimal.create(5L), HiveDecimal.create(6L), + HiveDecimal.create(7L), HiveDecimal.create(8L), + HiveDecimal.create(9L), HiveDecimal.create(10L)); + List outVals = Arrays.asList(HiveDecimal.create(1L), + HiveDecimal.create(3L), HiveDecimal.create(6L), + HiveDecimal.create(10L), HiveDecimal.create(15L), + HiveDecimal.create(21L), HiveDecimal.create(28L), + HiveDecimal.create(36L), HiveDecimal.create(45L), + HiveDecimal.create(55L)); + sumHiveDecimal(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 0, + outVals.iterator()); + } + + @Test + public void testHiveDecimal_0_5() throws HiveException { + List inVals = Arrays + .asList(HiveDecimal.create(1L), HiveDecimal.create(2L), + HiveDecimal.create(3L), HiveDecimal.create(4L), + HiveDecimal.create(5L), HiveDecimal.create(6L), + HiveDecimal.create(7L), HiveDecimal.create(8L), + HiveDecimal.create(9L), HiveDecimal.create(10L)); + List outVals = Arrays.asList(HiveDecimal.create(21L), + HiveDecimal.create(27L), HiveDecimal.create(33L), + HiveDecimal.create(39L), HiveDecimal.create(45L), + HiveDecimal.create(40L), HiveDecimal.create(34L), + HiveDecimal.create(27L), HiveDecimal.create(19L), + HiveDecimal.create(10L)); + sumHiveDecimal(inVals.iterator(), 10, 0, 5, outVals.iterator()); + } + + @Test + public void testHiveDecimal_unb_5() throws HiveException { + List inVals = Arrays + .asList(HiveDecimal.create(1L), HiveDecimal.create(2L), + HiveDecimal.create(3L), HiveDecimal.create(4L), + HiveDecimal.create(5L), HiveDecimal.create(6L), + HiveDecimal.create(7L), HiveDecimal.create(8L), + HiveDecimal.create(9L), HiveDecimal.create(10L)); + List outVals = Arrays.asList(HiveDecimal.create(21L), + HiveDecimal.create(28L), HiveDecimal.create(36L), + HiveDecimal.create(45L), HiveDecimal.create(55L), + HiveDecimal.create(55L), HiveDecimal.create(55L), + HiveDecimal.create(55L), HiveDecimal.create(55L), + HiveDecimal.create(55L)); + sumHiveDecimal(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 5, + outVals.iterator()); + } + + @Test + public void testHiveDecimal_7_2() throws HiveException { + List inVals = Arrays + .asList(HiveDecimal.create(1L), HiveDecimal.create(2L), + HiveDecimal.create(3L), HiveDecimal.create(4L), + HiveDecimal.create(5L), HiveDecimal.create(6L), + HiveDecimal.create(7L), HiveDecimal.create(8L), + HiveDecimal.create(9L), HiveDecimal.create(10L)); + List outVals = Arrays.asList(HiveDecimal.create(6L), + HiveDecimal.create(10L), HiveDecimal.create(15L), + HiveDecimal.create(21L), HiveDecimal.create(28L), + HiveDecimal.create(36L), HiveDecimal.create(45L), + HiveDecimal.create(55L), HiveDecimal.create(54L), + HiveDecimal.create(52L)); + sumHiveDecimal(inVals.iterator(), 10, 7, 2, outVals.iterator()); + } + + @Test + public void testHiveDecimal_15_15() throws HiveException { + List inVals = Arrays + .asList(HiveDecimal.create(1L), HiveDecimal.create(2L), + HiveDecimal.create(3L), HiveDecimal.create(4L), + HiveDecimal.create(5L), HiveDecimal.create(6L), + HiveDecimal.create(7L), HiveDecimal.create(8L), + HiveDecimal.create(9L), HiveDecimal.create(10L)); + List outVals = Arrays.asList(HiveDecimal.create(55L), + HiveDecimal.create(55L), HiveDecimal.create(55L), + HiveDecimal.create(55L), HiveDecimal.create(55L), + HiveDecimal.create(55L), HiveDecimal.create(55L), + HiveDecimal.create(55L), HiveDecimal.create(55L), + HiveDecimal.create(55L)); + sumHiveDecimal(inVals.iterator(), 10, 15, 15, outVals.iterator()); + } + +} diff --git ql/src/test/results/clientpositive/ptf.q.out ql/src/test/results/clientpositive/ptf.q.out index eb4997d..34d2ef0 100644 --- ql/src/test/results/clientpositive/ptf.q.out +++ ql/src/test/results/clientpositive/ptf.q.out @@ -806,7 +806,7 @@ Manufacturer#1 Brand#15 1602.59 8749.73 Manufacturer#2 Brand#22 3491.38 3491.38 Manufacturer#2 Brand#23 2031.98 5523.360000000001 Manufacturer#2 Brand#24 1698.66 7222.02 -Manufacturer#2 Brand#25 1701.6 5432.24 +Manufacturer#2 Brand#25 1701.6 5432.240000000001 Manufacturer#3 Brand#31 1671.68 1671.68 Manufacturer#3 Brand#32 3333.37 5005.05 Manufacturer#3 Brand#34 1337.29 6342.34 diff --git ql/src/test/results/clientpositive/windowing.q.out ql/src/test/results/clientpositive/windowing.q.out index 7e23497..20c6c49 100644 --- ql/src/test/results/clientpositive/windowing.q.out +++ ql/src/test/results/clientpositive/windowing.q.out @@ -845,14 +845,14 @@ POSTHOOK: Input: default@part Manufacturer#1 almond antique burnished rose metallic 2 4100.06 1173.15 1753.76 1366.6866666666667 Manufacturer#1 almond antique burnished rose metallic 2 5702.650000000001 1173.15 1753.76 1425.6625000000001 Manufacturer#1 almond antique chartreuse lavender yellow 34 7117.070000000001 1173.15 1753.76 1423.4140000000002 -Manufacturer#1 almond antique salmon chartreuse burlywood 6 7576.58 1173.15 1753.76 1515.316 -Manufacturer#1 almond aquamarine burnished black steel 28 6403.43 1414.42 1753.76 1600.8575 -Manufacturer#1 almond aquamarine pink moccasin thistle 42 4649.67 1414.42 1632.66 1549.89 +Manufacturer#1 almond antique salmon chartreuse burlywood 6 7576.580000000002 1173.15 1753.76 1515.3160000000003 +Manufacturer#1 almond aquamarine burnished black steel 28 6403.430000000001 1414.42 1753.76 1600.8575000000003 +Manufacturer#1 almond aquamarine pink moccasin thistle 42 4649.670000000001 1414.42 1632.66 1549.8900000000003 Manufacturer#2 almond antique violet chocolate turquoise 14 5523.360000000001 1690.68 2031.98 1841.1200000000001 Manufacturer#2 almond antique violet turquoise frosted 40 7222.02 1690.68 2031.98 1805.505 Manufacturer#2 almond aquamarine midnight light salmon 2 8923.62 1690.68 2031.98 1784.7240000000002 Manufacturer#2 almond aquamarine rose maroon antique 25 7232.9400000000005 1698.66 2031.98 1808.2350000000001 -Manufacturer#2 almond aquamarine sandy cyan gainsboro 18 5432.24 1698.66 2031.98 1810.7466666666667 +Manufacturer#2 almond aquamarine sandy cyan gainsboro 18 5432.240000000001 1698.66 2031.98 1810.746666666667 Manufacturer#3 almond antique chartreuse khaki white 17 4272.34 1190.27 1671.68 1424.1133333333335 Manufacturer#3 almond antique forest lavender goldenrod 14 6195.32 1190.27 1922.98 1548.83 Manufacturer#3 almond antique metallic orange dim 19 7532.61 1190.27 1922.98 1506.522 @@ -866,7 +866,7 @@ Manufacturer#4 almond azure aquamarine papaya violet 12 4341.530000000001 1206.2 Manufacturer#5 almond antique blue firebrick mint 31 5190.08 1611.66 1789.69 1730.0266666666666 Manufacturer#5 almond antique medium spring khaki 6 6208.18 1018.1 1789.69 1552.045 Manufacturer#5 almond antique sky peru orange 2 7672.66 1018.1 1789.69 1534.532 -Manufacturer#5 almond aquamarine dodger light gainsboro 46 5882.970000000001 1018.1 1788.73 1470.7425000000003 +Manufacturer#5 almond aquamarine dodger light gainsboro 46 5882.969999999999 1018.1 1788.73 1470.7424999999998 Manufacturer#5 almond azure blanched chiffon midnight 23 4271.3099999999995 1018.1 1788.73 1423.7699999999998 PREHOOK: query: -- 19. testUDAFsWithGBY select p_mfgr,p_name, p_size, p_retailprice, @@ -901,7 +901,7 @@ Manufacturer#2 almond antique violet chocolate turquoise 14 1690.68 5523.3600000 Manufacturer#2 almond antique violet turquoise frosted 40 1800.7 7222.02 1800.7 1800.7 1805.505 Manufacturer#2 almond aquamarine midnight light salmon 2 2031.98 8923.62 2031.98 2031.98 1784.7240000000002 Manufacturer#2 almond aquamarine rose maroon antique 25 1698.66 7232.9400000000005 1698.66 1698.66 1808.2350000000001 -Manufacturer#2 almond aquamarine sandy cyan gainsboro 18 1701.6 5432.24 1701.6 1701.6 1810.7466666666667 +Manufacturer#2 almond aquamarine sandy cyan gainsboro 18 1701.6 5432.240000000001 1701.6 1701.6 1810.746666666667 Manufacturer#3 almond antique chartreuse khaki white 17 1671.68 4272.34 1671.68 1671.68 1424.1133333333335 Manufacturer#3 almond antique forest lavender goldenrod 14 1190.27 6195.32 1190.27 1190.27 1548.83 Manufacturer#3 almond antique metallic orange dim 19 1410.39 7532.61 1410.39 1410.39 1506.522 @@ -915,7 +915,7 @@ Manufacturer#4 almond azure aquamarine papaya violet 12 1290.35 4341.53000000000 Manufacturer#5 almond antique blue firebrick mint 31 1789.69 5190.08 1789.69 1789.69 1730.0266666666666 Manufacturer#5 almond antique medium spring khaki 6 1611.66 6208.18 1611.66 1611.66 1552.045 Manufacturer#5 almond antique sky peru orange 2 1788.73 7672.66 1788.73 1788.73 1534.532 -Manufacturer#5 almond aquamarine dodger light gainsboro 46 1018.1 5882.970000000001 1018.1 1018.1 1470.7425000000003 +Manufacturer#5 almond aquamarine dodger light gainsboro 46 1018.1 5882.969999999999 1018.1 1018.1 1470.7424999999998 Manufacturer#5 almond azure blanched chiffon midnight 23 1464.48 4271.3099999999995 1464.48 1464.48 1423.7699999999998 PREHOOK: query: -- 20. testSTATs select p_mfgr,p_name, p_size, @@ -1136,12 +1136,12 @@ Manufacturer#1 Brand#14 2346.3 Manufacturer#1 Brand#12 4100.06 Manufacturer#1 Brand#15 4529.5 Manufacturer#1 Brand#12 4770.77 -Manufacturer#1 Brand#12 4649.67 +Manufacturer#1 Brand#12 4649.670000000001 Manufacturer#2 Brand#22 1690.68 Manufacturer#2 Brand#22 3491.38 Manufacturer#2 Brand#23 5523.360000000001 Manufacturer#2 Brand#24 5531.34 -Manufacturer#2 Brand#25 5432.24 +Manufacturer#2 Brand#25 5432.240000000001 Manufacturer#3 Brand#31 1671.68 Manufacturer#3 Brand#35 2861.95 Manufacturer#3 Brand#32 4272.34 @@ -1155,7 +1155,7 @@ Manufacturer#4 Brand#41 4341.530000000001 Manufacturer#5 Brand#52 1789.69 Manufacturer#5 Brand#51 3401.3500000000004 Manufacturer#5 Brand#53 5190.08 -Manufacturer#5 Brand#53 4418.490000000001 +Manufacturer#5 Brand#53 4418.49 Manufacturer#5 Brand#52 4271.3099999999995 PREHOOK: query: -- 24. testLateralViews select p_mfgr, p_name, diff --git ql/src/test/results/clientpositive/windowing_windowspec.q.out ql/src/test/results/clientpositive/windowing_windowspec.q.out index 6ea068c..9d18c67 100644 --- ql/src/test/results/clientpositive/windowing_windowspec.q.out +++ ql/src/test/results/clientpositive/windowing_windowspec.q.out @@ -489,86 +489,86 @@ alice falkner 27.742499999999996 alice king 26.706666666666663 alice king 26.306999999999995 alice xylophone 24.458181818181814 -bob ellison 25.029090909090904 +bob ellison 25.029090909090908 bob falkner 24.216363636363635 -bob ichabod 20.17363636363637 -bob johnson 16.431818181818183 -bob polk 16.64090909090909 -bob underhill 15.266363636363637 -bob underhill 18.288181818181815 +bob ichabod 20.173636363636362 +bob johnson 16.431818181818176 +bob polk 16.640909090909087 +bob underhill 15.266363636363632 +bob underhill 18.288181818181812 bob van buren 18.405454545454543 -calvin ichabod 20.903636363636362 -calvin white 22.44818181818182 -david carson 24.329090909090908 -david falkner 25.01181818181818 -david garcia 22.984545454545454 -david hernandez 22.922727272727272 -ethan steinbeck 24.026363636363637 -ethan underhill 25.189090909090908 -fred ellison 27.16 -gabriella brown 25.664545454545454 -holly nixon 25.705454545454543 -holly polk 24.118181818181824 -holly steinbeck 24.490909090909096 +calvin ichabod 20.90363636363636 +calvin white 22.448181818181812 +david carson 24.329090909090898 +david falkner 25.01181818181817 +david garcia 22.984545454545444 +david hernandez 22.92272727272726 +ethan steinbeck 24.026363636363627 +ethan underhill 25.189090909090904 +fred ellison 27.159999999999993 +gabriella brown 25.66454545454545 +holly nixon 25.70545454545454 +holly polk 24.11818181818182 +holly steinbeck 24.49090909090909 holly thompson 23.376363636363635 holly underhill 19.453636363636363 -irene ellison 20.37818181818182 -irene underhill 23.51 -irene young 25.371818181818185 -jessica johnson 24.426363636363636 -jessica king 26.38 -jessica miller 23.99545454545455 -jessica white 26.86636363636364 -katie ichabod 28.520909090909093 -luke garcia 26.110909090909093 -luke ichabod 27.41909090909091 -luke king 28.713636363636365 -luke young 30.591818181818187 +irene ellison 20.378181818181826 +irene underhill 23.510000000000012 +irene young 25.371818181818195 +jessica johnson 24.42636363636365 +jessica king 26.380000000000017 +jessica miller 23.99545454545456 +jessica white 26.866363636363655 +katie ichabod 28.520909090909115 +luke garcia 26.110909090909114 +luke ichabod 27.41909090909093 +luke king 28.713636363636375 +luke young 30.59181818181818 mike allen 27.91545454545455 -mike king 25.526363636363637 -mike polk 24.774545454545454 -mike white 25.183636363636364 -mike xylophone 27.508181818181814 -nick nixon 26.225454545454543 -nick robinson 24.344545454545457 -oscar davidson 26.719090909090912 -oscar garcia 27.19636363636364 -oscar johnson 27.082727272727276 -oscar johnson 25.16454545454545 -oscar miller 28.05909090909091 -priscilla laertes 31.737272727272725 -priscilla quirinius 30.353636363636358 -priscilla zipper 27.96181818181818 -quinn ellison 29.406363636363633 -quinn polk 27.267272727272726 -rachel davidson 25.41545454545454 +mike king 25.526363636363644 +mike polk 24.774545454545464 +mike white 25.18363636363637 +mike xylophone 27.50818181818182 +nick nixon 26.225454545454546 +nick robinson 24.34454545454545 +oscar davidson 26.719090909090916 +oscar garcia 27.196363636363643 +oscar johnson 27.08272727272728 +oscar johnson 25.164545454545472 +oscar miller 28.059090909090916 +priscilla laertes 31.73727272727274 +priscilla quirinius 30.353636363636372 +priscilla zipper 27.961818181818195 +quinn ellison 29.40636363636366 +quinn polk 27.267272727272754 +rachel davidson 25.415454545454562 rachel thompson 23.608181818181823 -sarah miller 21.499090909090913 +sarah miller 21.49909090909091 sarah robinson 23.40454545454546 -sarah xylophone 26.95727272727273 +sarah xylophone 26.957272727272724 sarah zipper 24.83545454545455 -tom hernandez 21.274545454545457 +tom hernandez 21.274545454545454 tom hernandez 20.315454545454546 -tom polk 21.901818181818182 +tom polk 21.90181818181819 tom steinbeck 20.772727272727273 -ulysses carson 21.64727272727273 -ulysses ellison 22.960909090909094 -ulysses quirinius 23.025454545454547 -ulysses robinson 23.762727272727275 +ulysses carson 21.647272727272718 +ulysses ellison 22.960909090909084 +ulysses quirinius 23.025454545454544 +ulysses robinson 23.762727272727282 ulysses steinbeck 21.08909090909091 -victor allen 16.62818181818182 +victor allen 16.628181818181826 victor hernandez 15.74909090909091 -victor robinson 18.19363636363636 -victor thompson 20.811818181818186 -victor xylophone 20.37272727272727 -wendy quirinius 20.816363636363636 +victor robinson 18.193636363636355 +victor thompson 20.81181818181817 +victor xylophone 20.372727272727243 +wendy quirinius 20.81636363636362 wendy robinson 19.936363636363634 wendy xylophone 20.270909090909093 xavier garcia 19.874000000000002 -xavier ovid 19.976666666666667 -yuri xylophone 21.896250000000002 -zach thompson 25.021428571428572 -zach young 27.776666666666667 +xavier ovid 19.976666666666663 +yuri xylophone 21.89625000000001 +zach thompson 25.021428571428583 +zach young 27.77666666666668 alice carson 18.785 alice nixon 17.58142857142857 alice underhill 17.072499999999998 @@ -576,13 +576,13 @@ alice underhill 19.146666666666665 alice xylophone 20.556 bob falkner 19.116363636363637 bob king 21.04 -bob ovid 20.85454545454546 -bob van buren 21.988181818181822 -bob xylophone 24.36454545454546 -calvin xylophone 26.912727272727274 -david falkner 27.310000000000002 -david laertes 28.004545454545454 -david miller 28.400909090909092 +bob ovid 20.854545454545452 +bob van buren 21.988181818181815 +bob xylophone 24.364545454545453 +calvin xylophone 26.91272727272727 +david falkner 27.31 +david laertes 28.00454545454545 +david miller 28.40090909090909 PREHOOK: query: select s, sum(i) over(partition by ts order by s) from over10k limit 100 PREHOOK: type: QUERY PREHOOK: Input: default@over10k