diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java index f418a7f..237ec07 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.LeadLagInfo; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.PTFDesc; import org.apache.hadoop.hive.ql.plan.PTFDeserializer; @@ -217,9 +218,9 @@ private PTFInvocation setupChain() { return first; } - public static void connectLeadLagFunctionsToPartition(PTFDesc ptfDesc, + public static void connectLeadLagFunctionsToPartition(LeadLagInfo leadLagInfo, PTFPartitionIterator pItr) throws HiveException { - List llFnDescs = ptfDesc.getLlInfo().getLeadLagExprs(); + List llFnDescs = leadLagInfo.getLeadLagExprs(); if (llFnDescs == null) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java index 4fc7089..20d70d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java @@ -22,11 +22,15 @@ import java.io.IOException; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; +import java.util.List; import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.PTFPartition; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.ptf.BasePartitionEvaluator; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hive.common.util.AnnotationUtils; @@ -258,4 +262,42 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { return null; } + protected BasePartitionEvaluator partitionEvaluator; + + /** + * When evaluating an aggregates over a fixed Window, streaming is not possible + * especially for RANGE Window type. For such case, the whole partition data needs + * to be collected and then to evaluate the aggregates. The naive approach is to + * calculate a row range for each row and to perform the aggregates. For some + * functions, a better implementation can be used to reduce the calculation. + * @param winFrame the Window definition in play for this evaluation. + * @param partition the partition data + * @param parameters the list of the expressions in the function + * @param outputOI the output object inspector + * @return the evaluator, default to BasePartitionEvaluator which + * implements the naive approach + */ + public final BasePartitionEvaluator getPartitionWindowingEvaluator( + WindowFrameDef winFrame, + PTFPartition partition, + List parameters, + ObjectInspector outputOI) { + if (partitionEvaluator == null) { + partitionEvaluator = createPartitionEvaluator(winFrame, partition, parameters, outputOI); + } + + return partitionEvaluator; + } + + /** + * This class needs to be overridden by the child class to implement function + * specific evaluator. + */ + protected BasePartitionEvaluator createPartitionEvaluator( + WindowFrameDef winFrame, + PTFPartition partition, + List parameters, + ObjectInspector outputOI) { + return new BasePartitionEvaluator(this, winFrame, partition, parameters, outputOI); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java index e2cd213..fb63d46 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java @@ -18,33 +18,36 @@ package org.apache.hadoop.hive.ql.udf.generic; import java.util.HashSet; +import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.PTFPartition; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.ql.udf.ptf.BasePartitionEvaluator; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * GenericUDAFSum. @@ -335,6 +338,15 @@ protected HiveDecimal getCurrentIntermediateResult( }; } + + @Override + protected BasePartitionEvaluator createPartitionEvaluator( + WindowFrameDef winFrame, + PTFPartition partition, + List parameters, + ObjectInspector outputOI) { + return new BasePartitionEvaluator.SumPartitionHiveDecimalEvaluator(this, winFrame, partition, parameters, outputOI); + } } /** @@ -455,6 +467,14 @@ protected Double getCurrentIntermediateResult( }; } + @Override + protected BasePartitionEvaluator createPartitionEvaluator( + WindowFrameDef winFrame, + PTFPartition partition, + List parameters, + ObjectInspector outputOI) { + return new BasePartitionEvaluator.SumPartitionDoubleEvaluator(this, winFrame, partition, parameters, outputOI); + } } /** @@ -570,5 +590,14 @@ protected Long getCurrentIntermediateResult( } }; } + + @Override + protected BasePartitionEvaluator createPartitionEvaluator( + WindowFrameDef winFrame, + PTFPartition partition, + List parameters, + ObjectInspector outputOI) { + return new BasePartitionEvaluator.SumPartitionLongEvaluator(this, winFrame, partition, parameters, outputOI); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/BasePartitionEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/BasePartitionEvaluator.java new file mode 100644 index 0000000..5fedf17 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/BasePartitionEvaluator.java @@ -0,0 +1,347 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf.ptf; + +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.PTFOperator; +import org.apache.hadoop.hive.ql.exec.PTFPartition; +import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.LeadLagInfo; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType; +import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AbstractAggregationBuffer; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +/** + * This class is mostly used for RANGE windowing type to do some optimization. ROWS + * windowing type can support streaming. + * + */ +public class BasePartitionEvaluator { + protected final GenericUDAFEvaluator wrappedEvaluator; + protected final WindowFrameDef winFrame; + protected final PTFPartition partition; + protected final List parameters; + protected final ObjectInspector outputOI; + + protected Object partitionAgg; + + /** + * Internal class to represent a window range in a partition by searching the + * relative position (ROWS) or relative value (RANGE) of the current row + */ + protected static class Range + { + int start; + int end; + PTFPartition p; + + public Range(int start, int end, PTFPartition p) + { + this.start = start; + this.end = end; + this.p = p; + } + + public PTFPartitionIterator iterator() + { + return p.range(start, end); + } + + public int getDiff(Range prevRange) { + return this.start - prevRange.start + this.end - prevRange.end; + } + + public int getSize() { + return end - start; + } + } + + public BasePartitionEvaluator( + GenericUDAFEvaluator wrappedEvaluator, + WindowFrameDef winFrame, + PTFPartition partition, + List parameters, + ObjectInspector outputOI) { + this.wrappedEvaluator = wrappedEvaluator; + this.winFrame = winFrame; + this.partition = partition; + this.parameters = parameters; + this.outputOI = outputOI; + } + + /** + * Get the aggregation for the whole partition. Used in the case where windowing + * is unbounded and all the rows in the partition gets the same value. + * @return + * @throws HiveException + */ + public Object getPartitionAgg() throws HiveException { + if (partitionAgg != null) { + return partitionAgg == ISupportStreamingModeForWindowing.NULL_RESULT ? null : partitionAgg; + } + + Object result = calcFunctionValue(partition.iterator()); + partitionAgg = (result == null ? ISupportStreamingModeForWindowing.NULL_RESULT : result); + return result; + } + + /** + * Given the current row, get the aggregation for the window + * + * @throws HiveException + */ + public Object iterate(int currentRow, LeadLagInfo leadLagInfo) throws HiveException { + Range rng = getRange(winFrame, currentRow, partition); + PTFPartitionIterator pItr = rng.iterator(); + PTFOperator.connectLeadLagFunctionsToPartition(leadLagInfo, pItr); + return calcFunctionValue(pItr); + } + + /** + * Given a partition iterator, calculate the function value + * @param pItr the partition pointer + * @return the function value + * @throws HiveException + */ + protected Object calcFunctionValue(PTFPartitionIterator pItr) + throws HiveException { + AggregationBuffer aggBuffer = wrappedEvaluator.getNewAggregationBuffer(); + Object[] argValues = new Object[parameters == null ? 0 : parameters.size()]; + while(pItr.hasNext()) + { + Object row = pItr.next(); + int i = 0; + if ( parameters != null ) { + for(PTFExpressionDef param : parameters) + { + argValues[i++] = param.getExprEvaluator().evaluate(row); + } + } + wrappedEvaluator.aggregate(aggBuffer, argValues); + } + + // The object is reused during evaluating, make a copy here + return ObjectInspectorUtils.copyToStandardObject(wrappedEvaluator.evaluate(aggBuffer), outputOI); + } + + protected static Range getRange(WindowFrameDef winFrame, int currRow, PTFPartition p) + throws HiveException { + BoundaryDef startB = winFrame.getStart(); + BoundaryDef endB = winFrame.getEnd(); + + int start, end; + if (winFrame.getWindowType() == WindowType.ROWS) { + start = getRowBoundaryStart(startB, currRow); + end = getRowBoundaryEnd(endB, currRow, p); + } else { + ValueBoundaryScanner vbs = ValueBoundaryScanner.getScanner(winFrame); + start = vbs.computeStart(currRow, p); + end = vbs.computeEnd(currRow, p); + } + start = start < 0 ? 0 : start; + end = end > p.size() ? p.size() : end; + return new Range(start, end, p); + } + + private static int getRowBoundaryStart(BoundaryDef b, int currRow) throws HiveException { + Direction d = b.getDirection(); + int amt = b.getAmt(); + switch(d) { + case PRECEDING: + if (amt == BoundarySpec.UNBOUNDED_AMOUNT) { + return 0; + } + else { + return currRow - amt; + } + case CURRENT: + return currRow; + case FOLLOWING: + return currRow + amt; + } + throw new HiveException("Unknown Start Boundary Direction: " + d); + } + + private static int getRowBoundaryEnd(BoundaryDef b, int currRow, PTFPartition p) throws HiveException { + Direction d = b.getDirection(); + int amt = b.getAmt(); + switch(d) { + case PRECEDING: + if ( amt == 0 ) { + return currRow + 1; + } + return currRow - amt + 1; + case CURRENT: + return currRow + 1; + case FOLLOWING: + if (amt == BoundarySpec.UNBOUNDED_AMOUNT) { + return p.size(); + } + else { + return currRow + amt + 1; + } + } + throw new HiveException("Unknown End Boundary Direction: " + d); + } + + /** + * The base type for sum operator evaluator when a partition data is available + * and streaming process is not possible. Some optimization can be done for such + * case. + * + */ + public static abstract class SumPartitionEvaluator extends BasePartitionEvaluator { + protected final WindowSumAgg sumAgg; + + public SumPartitionEvaluator( + GenericUDAFEvaluator wrappedEvaluator, + WindowFrameDef winFrame, + PTFPartition partition, + List parameters, + ObjectInspector outputOI) { + super(wrappedEvaluator, winFrame, partition, parameters, outputOI); + sumAgg = new WindowSumAgg(); + } + + static class WindowSumAgg extends AbstractAggregationBuffer { + Range prevRange; + ResultType prevSum; + boolean empty; + } + + public abstract ResultType add(ResultType t1, ResultType t2); + public abstract ResultType minus(ResultType t1, ResultType t2); + + @Override + public Object iterate(int currentRow, LeadLagInfo leadLagInfo) throws HiveException { + Range currentRange = getRange(winFrame, currentRow, partition); + PTFOperator.connectLeadLagFunctionsToPartition(leadLagInfo, currentRange.iterator()); // TODO investigate how to call this + ResultType result; + if (currentRow == 0 || // Reset for the new partition + sumAgg.prevRange == null || + currentRange.getSize() <= currentRange.getDiff(sumAgg.prevRange)) { + result = (ResultType)calcFunctionValue(currentRange.iterator()); + sumAgg.prevRange = currentRange; + sumAgg.empty = false; + sumAgg.prevSum = result; + } else { + // Given the previous range and the current range, calculate the new sum + // from the previous sum and the difference to save the computation. + Range r1 = new Range(sumAgg.prevRange.start, currentRange.start, partition); + Range r2 = new Range(sumAgg.prevRange.end, currentRange.end, partition); + ResultType sum1 = (ResultType)calcFunctionValue(r1.iterator()); + ResultType sum2 = (ResultType)calcFunctionValue(r2.iterator()); + result = add(minus(sumAgg.prevSum, sum1), sum2); + sumAgg.prevRange = currentRange; + sumAgg.prevSum = result; + } + + return result; + } + } + + public static class SumPartitionDoubleEvaluator extends SumPartitionEvaluator { + public SumPartitionDoubleEvaluator(GenericUDAFEvaluator wrappedEvaluator, + WindowFrameDef winFrame, PTFPartition partition, + List parameters, ObjectInspector outputOI) { + super(wrappedEvaluator, winFrame, partition, parameters, outputOI); + } + + @Override + public DoubleWritable add(DoubleWritable t1, DoubleWritable t2) { + if (t1 == null && t2 == null) return null; + return new DoubleWritable((t1 == null ? 0 : t1.get()) + (t2 == null ? 0 : t2.get())); + } + + @Override + public DoubleWritable minus(DoubleWritable t1, DoubleWritable t2) { + if (t1 == null && t2 == null) return null; + return new DoubleWritable((t1 == null ? 0 : t1.get()) - (t2 == null ? 0 : t2.get())); + } + } + + public static class SumPartitionLongEvaluator extends SumPartitionEvaluator { + public SumPartitionLongEvaluator(GenericUDAFEvaluator wrappedEvaluator, + WindowFrameDef winFrame, PTFPartition partition, + List parameters, ObjectInspector outputOI) { + super(wrappedEvaluator, winFrame, partition, parameters, outputOI); + } + + @Override + public LongWritable add(LongWritable t1, LongWritable t2) { + if (t1 == null && t2 == null) return null; + return new LongWritable((t1 == null ? 0 : t1.get()) + (t2 == null ? 0 : t2.get())); + } + + @Override + public LongWritable minus(LongWritable t1, LongWritable t2) { + if (t1 == null && t2 == null) return null; + return new LongWritable((t1 == null ? 0 : t1.get()) - (t2 == null ? 0 : t2.get())); + } + } + + public static class SumPartitionHiveDecimalEvaluator extends SumPartitionEvaluator { + public SumPartitionHiveDecimalEvaluator(GenericUDAFEvaluator wrappedEvaluator, + WindowFrameDef winFrame, PTFPartition partition, + List parameters, ObjectInspector outputOI) { + super(wrappedEvaluator, winFrame, partition, parameters, outputOI); + } + + @Override + public HiveDecimalWritable add(HiveDecimalWritable t1, HiveDecimalWritable t2) { + if (t1 == null && t2 == null) return null; + if (t1 == null) { + return t2; + } else { + if (t2 != null) { + t1.mutateAdd(t2); + } + return t1; + } + } + + @Override + public HiveDecimalWritable minus(HiveDecimalWritable t1, HiveDecimalWritable t2) { + if (t1 == null && t2 == null) return null; + if (t1 == null) { + t2.mutateNegate(); + return t2; + } else { + if (t2 != null) { + t1.mutateSubtract(t2); + } + return t1; + } + } + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java index c76118b..7b30838 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java @@ -140,7 +140,7 @@ public PTFPartition execute(PTFPartition iPart) return transformRawInput(iPart); } PTFPartitionIterator pItr = iPart.iterator(); - PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, pItr); + PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc.getLlInfo(), pItr); if ( outputPartition == null ) { outputPartition = PTFPartition.create(ptfDesc.getCfg(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/ValueBoundaryScanner.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/ValueBoundaryScanner.java new file mode 100644 index 0000000..88837dc --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/ValueBoundaryScanner.java @@ -0,0 +1,729 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.ptf; + +import java.util.Date; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.PTFPartition; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; +import org.apache.hadoop.hive.ql.plan.ptf.OrderDef; +import org.apache.hadoop.hive.ql.plan.ptf.OrderExpressionDef; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; + +public abstract class ValueBoundaryScanner { + BoundaryDef start, end; + + public ValueBoundaryScanner(BoundaryDef start, BoundaryDef end) { + this.start = start; + this.end = end; + } + + public abstract int computeStart(int rowIdx, PTFPartition p) throws HiveException; + + public abstract int computeEnd(int rowIdx, PTFPartition p) throws HiveException; + + public static ValueBoundaryScanner getScanner(WindowFrameDef winFrameDef) + throws HiveException { + OrderDef orderDef = winFrameDef.getOrderDef(); + int numOrders = orderDef.getExpressions().size(); + if (numOrders != 1) { + return new MultiValueBoundaryScanner(winFrameDef.getStart(), winFrameDef.getEnd(), orderDef); + } else { + return SingleValueBoundaryScanner.getScanner(winFrameDef.getStart(), winFrameDef.getEnd(), orderDef); + } + } +} + +/* + * - starting from the given rowIdx scan in the given direction until a row's expr + * evaluates to an amt that crosses the 'amt' threshold specified in the BoundaryDef. + */ +abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner { + OrderExpressionDef expressionDef; + + public SingleValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) { + super(start, end); + this.expressionDef = expressionDef; + } + + /* +| Use | Boundary1.type | Boundary1. amt | Sort Key | Order | Behavior | +| Case | | | | | | +|------+----------------+----------------+----------+-------+-----------------------------------| +| 1. | PRECEDING | UNB | ANY | ANY | start = 0 | +| 2. | PRECEDING | unsigned int | NULL | ASC | start = 0 | +| 3. | | | | DESC | scan backwards to row R2 | +| | | | | | such that R2.sk is not null | +| | | | | | start = R2.idx + 1 | +| 4. | PRECEDING | unsigned int | not NULL | DESC | scan backwards until row R2 | +| | | | | | such that R2.sk - R.sk > amt | +| | | | | | start = R2.idx + 1 | +| 5. | PRECEDING | unsigned int | not NULL | ASC | scan backward until row R2 | +| | | | | | such that R.sk - R2.sk > bnd1.amt | +| | | | | | start = R2.idx + 1 | +| 6. | CURRENT ROW | | NULL | ANY | scan backwards until row R2 | +| | | | | | such that R2.sk is not null | +| | | | | | start = R2.idx + 1 | +| 7. | CURRENT ROW | | not NULL | ANY | scan backwards until row R2 | +| | | | | | such R2.sk != R.sk | +| | | | | | start = R2.idx + 1 | +| 8. | FOLLOWING | UNB | ANY | ANY | Error | +| 9. | FOLLOWING | unsigned int | NULL | DESC | start = partition.size | +| 10. | | | | ASC | scan forward until R2 | +| | | | | | such that R2.sk is not null | +| | | | | | start = R2.idx | +| 11. | FOLLOWING | unsigned int | not NULL | DESC | scan forward until row R2 | +| | | | | | such that R.sk - R2.sk > amt | +| | | | | | start = R2.idx | +| 12. | | | | ASC | scan forward until row R2 | +| | | | | | such that R2.sk - R.sk > amt | +|------+----------------+----------------+----------+-------+-----------------------------------| + */ + @Override + public int computeStart(int rowIdx, PTFPartition p) throws HiveException { + switch(start.getDirection()) { + case PRECEDING: + return computeStartPreceding(rowIdx, p); + case CURRENT: + return computeStartCurrentRow(rowIdx, p); + case FOLLOWING: + default: + return computeStartFollowing(rowIdx, p); + } + } + + protected int computeStartPreceding(int rowIdx, PTFPartition p) throws HiveException { + int amt = start.getAmt(); + // Use Case 1. + if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) { + return 0; + } + Object sortKey = computeValue(p.getAt(rowIdx)); + + if ( sortKey == null ) { + // Use Case 2. + if ( expressionDef.getOrder() == Order.ASC ) { + return 0; + } + else { // Use Case 3. + while ( sortKey == null && rowIdx >= 0 ) { + --rowIdx; + if ( rowIdx >= 0 ) { + sortKey = computeValue(p.getAt(rowIdx)); + } + } + return rowIdx+1; + } + } + + Object rowVal = sortKey; + int r = rowIdx; + + // Use Case 4. + if ( expressionDef.getOrder() == Order.DESC ) { + while (r >= 0 && !isDistanceGreater(rowVal, sortKey, amt) ) { + r--; + if ( r >= 0 ) { + rowVal = computeValue(p.getAt(r)); + } + } + return r + 1; + } + else { // Use Case 5. + while (r >= 0 && !isDistanceGreater(sortKey, rowVal, amt) ) { + r--; + if ( r >= 0 ) { + rowVal = computeValue(p.getAt(r)); + } + } + return r + 1; + } + } + + protected int computeStartCurrentRow(int rowIdx, PTFPartition p) throws HiveException { + Object sortKey = computeValue(p.getAt(rowIdx)); + + // Use Case 6. + if ( sortKey == null ) { + while ( sortKey == null && rowIdx >= 0 ) { + --rowIdx; + if ( rowIdx >= 0 ) { + sortKey = computeValue(p.getAt(rowIdx)); + } + } + return rowIdx+1; + } + + Object rowVal = sortKey; + int r = rowIdx; + + // Use Case 7. + while (r >= 0 && isEqual(rowVal, sortKey) ) { + r--; + if ( r >= 0 ) { + rowVal = computeValue(p.getAt(r)); + } + } + return r + 1; + } + + protected int computeStartFollowing(int rowIdx, PTFPartition p) throws HiveException { + int amt = start.getAmt(); + Object sortKey = computeValue(p.getAt(rowIdx)); + + Object rowVal = sortKey; + int r = rowIdx; + + if ( sortKey == null ) { + // Use Case 9. + if ( expressionDef.getOrder() == Order.DESC) { + return p.size(); + } + else { // Use Case 10. + while (r < p.size() && rowVal == null ) { + r++; + if ( r < p.size() ) { + rowVal = computeValue(p.getAt(r)); + } + } + return r; + } + } + + // Use Case 11. + if ( expressionDef.getOrder() == Order.DESC) { + while (r < p.size() && !isDistanceGreater(sortKey, rowVal, amt) ) { + r++; + if ( r < p.size() ) { + rowVal = computeValue(p.getAt(r)); + } + } + return r; + } + else { // Use Case 12. + while (r < p.size() && !isDistanceGreater(rowVal, sortKey, amt) ) { + r++; + if ( r < p.size() ) { + rowVal = computeValue(p.getAt(r)); + } + } + return r; + } + } + + /* +| Use | Boundary2.type | Boundary2.amt | Sort Key | Order | Behavior | +| Case | | | | | | +|------+----------------+---------------+----------+-------+-----------------------------------| +| 1. | PRECEDING | UNB | ANY | ANY | Error | +| 2. | PRECEDING | unsigned int | NULL | DESC | end = partition.size() | +| 3. | | | | ASC | end = 0 | +| 4. | PRECEDING | unsigned int | not null | DESC | scan backward until row R2 | +| | | | | | such that R2.sk - R.sk > bnd.amt | +| | | | | | end = R2.idx + 1 | +| 5. | PRECEDING | unsigned int | not null | ASC | scan backward until row R2 | +| | | | | | such that R.sk - R2.sk > bnd.amt | +| | | | | | end = R2.idx + 1 | +| 6. | CURRENT ROW | | NULL | ANY | scan forward until row R2 | +| | | | | | such that R2.sk is not null | +| | | | | | end = R2.idx | +| 7. | CURRENT ROW | | not null | ANY | scan forward until row R2 | +| | | | | | such that R2.sk != R.sk | +| | | | | | end = R2.idx | +| 8. | FOLLOWING | UNB | ANY | ANY | end = partition.size() | +| 9. | FOLLOWING | unsigned int | NULL | DESC | end = partition.size() | +| 10. | | | | ASC | scan forward until row R2 | +| | | | | | such that R2.sk is not null | +| | | | | | end = R2.idx | +| 11. | FOLLOWING | unsigned int | not NULL | DESC | scan forward until row R2 | +| | | | | | such R.sk - R2.sk > bnd.amt | +| | | | | | end = R2.idx | +| 12. | | | | ASC | scan forward until row R2 | +| | | | | | such R2.sk - R2.sk > bnd.amt | +| | | | | | end = R2.idx | +|------+----------------+---------------+----------+-------+-----------------------------------| + */ + @Override + public int computeEnd(int rowIdx, PTFPartition p) throws HiveException { + switch(end.getDirection()) { + case PRECEDING: + return computeEndPreceding(rowIdx, p); + case CURRENT: + return computeEndCurrentRow(rowIdx, p); + case FOLLOWING: + default: + return computeEndFollowing(rowIdx, p); + } + } + + protected int computeEndPreceding(int rowIdx, PTFPartition p) throws HiveException { + int amt = end.getAmt(); + // Use Case 1. + // amt == UNBOUNDED, is caught during translation + + Object sortKey = computeValue(p.getAt(rowIdx)); + + if ( sortKey == null ) { + // Use Case 2. + if ( expressionDef.getOrder() == Order.DESC ) { + return p.size(); + } + else { // Use Case 3. + return 0; + } + } + + Object rowVal = sortKey; + int r = rowIdx; + + // Use Case 4. + if ( expressionDef.getOrder() == Order.DESC ) { + while (r >= 0 && !isDistanceGreater(rowVal, sortKey, amt) ) { + r--; + if ( r >= 0 ) { + rowVal = computeValue(p.getAt(r)); + } + } + return r + 1; + } + else { // Use Case 5. + while (r >= 0 && !isDistanceGreater(sortKey, rowVal, amt) ) { + r--; + if ( r >= 0 ) { + rowVal = computeValue(p.getAt(r)); + } + } + return r + 1; + } + } + + protected int computeEndCurrentRow(int rowIdx, PTFPartition p) throws HiveException { + Object sortKey = computeValue(p.getAt(rowIdx)); + + // Use Case 6. + if ( sortKey == null ) { + while ( sortKey == null && rowIdx < p.size() ) { + ++rowIdx; + if ( rowIdx < p.size() ) { + sortKey = computeValue(p.getAt(rowIdx)); + } + } + return rowIdx; + } + + Object rowVal = sortKey; + int r = rowIdx; + + // Use Case 7. + while (r < p.size() && isEqual(sortKey, rowVal) ) { + r++; + if ( r < p.size() ) { + rowVal = computeValue(p.getAt(r)); + } + } + return r; + } + + protected int computeEndFollowing(int rowIdx, PTFPartition p) throws HiveException { + int amt = end.getAmt(); + + // Use Case 8. + if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) { + return p.size(); + } + Object sortKey = computeValue(p.getAt(rowIdx)); + + Object rowVal = sortKey; + int r = rowIdx; + + if ( sortKey == null ) { + // Use Case 9. + if ( expressionDef.getOrder() == Order.DESC) { + return p.size(); + } + else { // Use Case 10. + while (r < p.size() && rowVal == null ) { + r++; + if ( r < p.size() ) { + rowVal = computeValue(p.getAt(r)); + } + } + return r; + } + } + + // Use Case 11. + if ( expressionDef.getOrder() == Order.DESC) { + while (r < p.size() && !isDistanceGreater(sortKey, rowVal, amt) ) { + r++; + if ( r < p.size() ) { + rowVal = computeValue(p.getAt(r)); + } + } + return r; + } + else { // Use Case 12. + while (r < p.size() && !isDistanceGreater(rowVal, sortKey, amt) ) { + r++; + if ( r < p.size() ) { + rowVal = computeValue(p.getAt(r)); + } + } + return r; + } + } + + public Object computeValue(Object row) throws HiveException { + Object o = expressionDef.getExprEvaluator().evaluate(row); + return ObjectInspectorUtils.copyToStandardObject(o, expressionDef.getOI()); + } + + /** + * Checks if the distance of v2 to v1 is greater than the given amt. + * @return True if the value of v1 - v2 is greater than amt or either value is null. + */ + public abstract boolean isDistanceGreater(Object v1, Object v2, int amt); + + /** + * Checks if the values of v1 or v2 are the same. + * @return True if both values are the same or both are nulls. + */ + public abstract boolean isEqual(Object v1, Object v2); + + + @SuppressWarnings("incomplete-switch") + public static SingleValueBoundaryScanner getScanner(BoundaryDef start, BoundaryDef end, OrderDef orderDef) + throws HiveException { + if (orderDef.getExpressions().size() != 1) { + throw new HiveException("Internal error: initializing SingleValueBoundaryScanner with" + + " multiple expression for sorting"); + } + OrderExpressionDef exprDef = orderDef.getExpressions().get(0); + PrimitiveObjectInspector pOI = (PrimitiveObjectInspector) exprDef.getOI(); + switch(pOI.getPrimitiveCategory()) { + case BYTE: + case INT: + case LONG: + case SHORT: + case TIMESTAMP: + return new LongValueBoundaryScanner(start, end, exprDef); + case DOUBLE: + case FLOAT: + return new DoubleValueBoundaryScanner(start, end, exprDef); + case DECIMAL: + return new HiveDecimalValueBoundaryScanner(start, end, exprDef); + case DATE: + return new DateValueBoundaryScanner(start, end, exprDef); + case STRING: + return new StringValueBoundaryScanner(start, end, exprDef); + } + throw new HiveException( + String.format("Internal Error: attempt to setup a Window for datatype %s", + pOI.getPrimitiveCategory())); + } +} + +class LongValueBoundaryScanner extends SingleValueBoundaryScanner { + public LongValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) { + super(start, end,expressionDef); + } + + @Override + public boolean isDistanceGreater(Object v1, Object v2, int amt) { + if (v1 != null && v2 != null) { + long l1 = PrimitiveObjectInspectorUtils.getLong(v1, + (PrimitiveObjectInspector) expressionDef.getOI()); + long l2 = PrimitiveObjectInspectorUtils.getLong(v2, + (PrimitiveObjectInspector) expressionDef.getOI()); + return (l1 -l2) > amt; + } + + return v1 != null || v2 != null; // True if only one value is null + } + + @Override + public boolean isEqual(Object v1, Object v2) { + if (v1 != null && v2 != null) { + long l1 = PrimitiveObjectInspectorUtils.getLong(v1, + (PrimitiveObjectInspector) expressionDef.getOI()); + long l2 = PrimitiveObjectInspectorUtils.getLong(v2, + (PrimitiveObjectInspector) expressionDef.getOI()); + return l1 == l2; + } + + return v1 == null && v2 == null; // True if both are null + } +} + +class DoubleValueBoundaryScanner extends SingleValueBoundaryScanner { + public DoubleValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) { + super(start, end,expressionDef); + } + + @Override + public boolean isDistanceGreater(Object v1, Object v2, int amt) { + if (v1 != null && v2 != null) { + double d1 = PrimitiveObjectInspectorUtils.getDouble(v1, + (PrimitiveObjectInspector) expressionDef.getOI()); + double d2 = PrimitiveObjectInspectorUtils.getDouble(v2, + (PrimitiveObjectInspector) expressionDef.getOI()); + return (d1 -d2) > amt; + } + + return v1 != null || v2 != null; // True if only one value is null + } + + @Override + public boolean isEqual(Object v1, Object v2) { + if (v1 != null && v2 != null) { + double d1 = PrimitiveObjectInspectorUtils.getDouble(v1, + (PrimitiveObjectInspector) expressionDef.getOI()); + double d2 = PrimitiveObjectInspectorUtils.getDouble(v2, + (PrimitiveObjectInspector) expressionDef.getOI()); + return d1 == d2; + } + + return v1 == null && v2 == null; // True if both are null + } +} + +class HiveDecimalValueBoundaryScanner extends SingleValueBoundaryScanner { + public HiveDecimalValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) { + super(start, end,expressionDef); + } + + @Override + public boolean isDistanceGreater(Object v1, Object v2, int amt) { + HiveDecimal d1 = PrimitiveObjectInspectorUtils.getHiveDecimal(v1, + (PrimitiveObjectInspector) expressionDef.getOI()); + HiveDecimal d2 = PrimitiveObjectInspectorUtils.getHiveDecimal(v2, + (PrimitiveObjectInspector) expressionDef.getOI()); + if ( d1 != null && d2 != null ) { + return d1.subtract(d2).intValue() > amt; // TODO: lossy conversion! + } + + return d1 != null || d2 != null; // True if only one value is null + } + + @Override + public boolean isEqual(Object v1, Object v2) { + HiveDecimal d1 = PrimitiveObjectInspectorUtils.getHiveDecimal(v1, + (PrimitiveObjectInspector) expressionDef.getOI()); + HiveDecimal d2 = PrimitiveObjectInspectorUtils.getHiveDecimal(v2, + (PrimitiveObjectInspector) expressionDef.getOI()); + if ( d1 != null && d2 != null ) { + return d1.equals(d2); + } + + return d1 == null && d2 == null; // True if both are null + } +} + +class DateValueBoundaryScanner extends SingleValueBoundaryScanner { + public DateValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) { + super(start, end,expressionDef); + } + + @Override + public boolean isDistanceGreater(Object v1, Object v2, int amt) { + Date l1 = PrimitiveObjectInspectorUtils.getDate(v1, + (PrimitiveObjectInspector) expressionDef.getOI()); + Date l2 = PrimitiveObjectInspectorUtils.getDate(v2, + (PrimitiveObjectInspector) expressionDef.getOI()); + if (l1 != null && l2 != null) { + return (double)(l1.getTime() - l2.getTime())/1000 > (long)amt * 24 * 3600; // Converts amt days to milliseconds + } + return l1 != l2; // True if only one date is null + } + + @Override + public boolean isEqual(Object v1, Object v2) { + Date l1 = PrimitiveObjectInspectorUtils.getDate(v1, + (PrimitiveObjectInspector) expressionDef.getOI()); + Date l2 = PrimitiveObjectInspectorUtils.getDate(v2, + (PrimitiveObjectInspector) expressionDef.getOI()); + return (l1 == null && l2 == null) || (l1 != null && l1.equals(l2)); + } +} + +class StringValueBoundaryScanner extends SingleValueBoundaryScanner { + public StringValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) { + super(start, end,expressionDef); + } + + @Override + public boolean isDistanceGreater(Object v1, Object v2, int amt) { + String s1 = PrimitiveObjectInspectorUtils.getString(v1, + (PrimitiveObjectInspector) expressionDef.getOI()); + String s2 = PrimitiveObjectInspectorUtils.getString(v2, + (PrimitiveObjectInspector) expressionDef.getOI()); + return s1 != null && s2 != null && s1.compareTo(s2) > 0; + } + + @Override + public boolean isEqual(Object v1, Object v2) { + String s1 = PrimitiveObjectInspectorUtils.getString(v1, + (PrimitiveObjectInspector) expressionDef.getOI()); + String s2 = PrimitiveObjectInspectorUtils.getString(v2, + (PrimitiveObjectInspector) expressionDef.getOI()); + return (s1 == null && s2 == null) || (s1 != null && s1.equals(s2)); + } +} + +/* + */ + class MultiValueBoundaryScanner extends ValueBoundaryScanner { + OrderDef orderDef; + + public MultiValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderDef orderDef) { + super(start, end); + this.orderDef = orderDef; + } + + /* +|------+----------------+----------------+----------+-------+-----------------------------------| +| Use | Boundary1.type | Boundary1. amt | Sort Key | Order | Behavior | +| Case | | | | | | +|------+----------------+----------------+----------+-------+-----------------------------------| +| 1. | PRECEDING | UNB | ANY | ANY | start = 0 | +| 2. | CURRENT ROW | | ANY | ANY | scan backwards until row R2 | +| | | | | | such R2.sk != R.sk | +| | | | | | start = R2.idx + 1 | +|------+----------------+----------------+----------+-------+-----------------------------------| + */ + @Override + public int computeStart(int rowIdx, PTFPartition p) throws HiveException { + switch(start.getDirection()) { + case PRECEDING: + return computeStartPreceding(rowIdx, p); + case CURRENT: + return computeStartCurrentRow(rowIdx, p); + case FOLLOWING: + default: + throw new HiveException( + "FOLLOWING not allowed for starting RANGE with multiple expressions in ORDER BY"); + } + } + + protected int computeStartPreceding(int rowIdx, PTFPartition p) throws HiveException { + int amt = start.getAmt(); + if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) { + return 0; + } + throw new HiveException( + "PRECEDING needs UNBOUNDED for RANGE with multiple expressions in ORDER BY"); + } + + protected int computeStartCurrentRow(int rowIdx, PTFPartition p) throws HiveException { + Object[] sortKey = computeValues(p.getAt(rowIdx)); + Object[] rowVal = sortKey; + int r = rowIdx; + + while (r >= 0 && isEqual(rowVal, sortKey) ) { + r--; + if ( r >= 0 ) { + rowVal = computeValues(p.getAt(r)); + } + } + return r + 1; + } + + /* +|------+----------------+---------------+----------+-------+-----------------------------------| +| Use | Boundary2.type | Boundary2.amt | Sort Key | Order | Behavior | +| Case | | | | | | +|------+----------------+---------------+----------+-------+-----------------------------------| +| 1. | CURRENT ROW | | ANY | ANY | scan forward until row R2 | +| | | | | | such that R2.sk != R.sk | +| | | | | | end = R2.idx | +| 2. | FOLLOWING | UNB | ANY | ANY | end = partition.size() | +|------+----------------+---------------+----------+-------+-----------------------------------| + */ + @Override + public int computeEnd(int rowIdx, PTFPartition p) throws HiveException { + switch(end.getDirection()) { + case PRECEDING: + throw new HiveException( + "PRECEDING not allowed for finishing RANGE with multiple expressions in ORDER BY"); + case CURRENT: + return computeEndCurrentRow(rowIdx, p); + case FOLLOWING: + default: + return computeEndFollowing(rowIdx, p); + } + } + + protected int computeEndCurrentRow(int rowIdx, PTFPartition p) throws HiveException { + Object[] sortKey = computeValues(p.getAt(rowIdx)); + Object[] rowVal = sortKey; + int r = rowIdx; + + while (r < p.size() && isEqual(sortKey, rowVal) ) { + r++; + if ( r < p.size() ) { + rowVal = computeValues(p.getAt(r)); + } + } + return r; + } + + protected int computeEndFollowing(int rowIdx, PTFPartition p) throws HiveException { + int amt = end.getAmt(); + if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) { + return p.size(); + } + throw new HiveException( + "FOLLOWING needs UNBOUNDED for RANGE with multiple expressions in ORDER BY"); + } + + public Object[] computeValues(Object row) throws HiveException { + Object[] objs = new Object[orderDef.getExpressions().size()]; + for (int i = 0; i < objs.length; i++) { + Object o = orderDef.getExpressions().get(i).getExprEvaluator().evaluate(row); + objs[i] = ObjectInspectorUtils.copyToStandardObject(o, orderDef.getExpressions().get(i).getOI()); + } + return objs; + } + + public boolean isEqual(Object[] v1, Object[] v2) { + assert v1.length == v2.length; + for (int i = 0; i < v1.length; i++) { + if (v1[i] == null && v2[i] == null) { + continue; + } + if (v1[i] == null || v2[i] == null) { + return false; + } + if (ObjectInspectorUtils.compare( + v1[i], orderDef.getExpressions().get(i).getOI(), + v2[i], orderDef.getExpressions().get(i).getOI()) != 0) { + return false; + } + } + return true; + } +} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java index 2fdb492..a823acd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java @@ -20,7 +20,6 @@ import java.util.AbstractList; import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -28,25 +27,19 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; -import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.PTFPartition; import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator; import org.apache.hadoop.hive.ql.exec.PTFRollingPartition; import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType; import org.apache.hadoop.hive.ql.plan.PTFDesc; import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; -import org.apache.hadoop.hive.ql.plan.ptf.OrderDef; -import org.apache.hadoop.hive.ql.plan.ptf.OrderExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; @@ -61,7 +54,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,16 +104,16 @@ public void execute(PTFPartitionIterator pItr, PTFPartition outP) throws WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef(); for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) { - boolean processWindow = processWindow(wFn); + boolean processWindow = processWindow(wFn.getWindowFrame()); pItr.reset(); if ( !processWindow ) { - Object out = evaluateWindowFunction(wFn, pItr); + Object out = evaluateFunctionOnPartition(wFn, iPart); if ( !wFn.isPivotResult()) { out = new SameList(iPart.size(), out); } oColumns.add((List)out); } else { - oColumns.add(executeFnwithWindow(getQueryDef(), wFn, iPart)); + oColumns.add(executeFnwithWindow(wFn, iPart)); } } @@ -147,30 +139,36 @@ public void execute(PTFPartitionIterator pItr, PTFPartition outP) throws } } - Object evaluateWindowFunction(WindowFunctionDef wFn, - PTFPartitionIterator pItr) throws HiveException { - GenericUDAFEvaluator fEval = wFn.getWFnEval(); - Object[] args = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()]; - AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer(); - while(pItr.hasNext()) - { - Object row = pItr.next(); - int i =0; - if ( wFn.getArgs() != null ) { - for(PTFExpressionDef arg : wFn.getArgs()) - { - args[i++] = arg.getExprEvaluator().evaluate(row); - } - } - fEval.aggregate(aggBuffer, args); + // Evaluate the result given a partition and the row number to process + private Object evaluateWindowFunction(WindowFunctionDef wFn, int rowToProcess, PTFPartition partition) + throws HiveException { + BasePartitionEvaluator partitionEval = wFn.getWFnEval() + .getPartitionWindowingEvaluator(wFn.getWindowFrame(), partition, wFn.getArgs(), wFn.getOI()); + return partitionEval.iterate(rowToProcess, ptfDesc.getLlInfo()); + } + + // Evaluate the result given a partition + private Object evaluateFunctionOnPartition(WindowFunctionDef wFn, + PTFPartition partition) throws HiveException { + BasePartitionEvaluator partitionEval = wFn.getWFnEval() + .getPartitionWindowingEvaluator(wFn.getWindowFrame(), partition, wFn.getArgs(), wFn.getOI()); + return partitionEval.getPartitionAgg(); + } + + // Evaluate the function result for each row in the partition + ArrayList executeFnwithWindow( + WindowFunctionDef wFnDef, + PTFPartition iPart) + throws HiveException { + ArrayList vals = new ArrayList(); + for(int i=0; i < iPart.size(); i++) { + Object out = evaluateWindowFunction(wFnDef, i, iPart); + vals.add(out); } - Object out = fEval.evaluate(aggBuffer); - out = ObjectInspectorUtils.copyToStandardObject(out, wFn.getOI()); - return out; + return vals; } - private boolean processWindow(WindowFunctionDef wFn) { - WindowFrameDef frame = wFn.getWindowFrame(); + private static boolean processWindow(WindowFrameDef frame) { if ( frame == null ) { return false; } @@ -391,7 +389,7 @@ public void startPartition() throws HiveException { streamingState.rollingPart.append(row); - WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef(); + WindowTableFunctionDef tabDef = (WindowTableFunctionDef) tableDef; for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) { WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i); @@ -417,10 +415,7 @@ public void startPartition() throws HiveException { } else { int rowToProcess = streamingState.rollingPart.rowToProcess(wFn.getWindowFrame()); if (rowToProcess >= 0) { - Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart); - PTFPartitionIterator rItr = rng.iterator(); - PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr); - Object out = evaluateWindowFunction(wFn, rItr); + Object out = evaluateWindowFunction(wFn, rowToProcess, streamingState.rollingPart); streamingState.fnOutputs[i].add(out); } } @@ -495,10 +490,7 @@ public void startPartition() throws HiveException { while (numRowsRemaining > 0) { int rowToProcess = streamingState.rollingPart.size() - numRowsRemaining; if (rowToProcess >= 0) { - Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart); - PTFPartitionIterator rItr = rng.iterator(); - PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr); - Object out = evaluateWindowFunction(wFn, rItr); + Object out = evaluateWindowFunction(wFn, rowToProcess, streamingState.rollingPart); streamingState.fnOutputs[i].add(out); } numRowsRemaining--; @@ -540,10 +532,10 @@ public boolean canIterateOutput() { int i=0; for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) { - boolean processWindow = processWindow(wFn); + boolean processWindow = processWindow(wFn.getWindowFrame()); pItr.reset(); if ( !processWindow && !wFn.isPivotResult() ) { - Object out = evaluateWindowFunction(wFn, pItr); + Object out = evaluateFunctionOnPartition(wFn, iPart); output.add(out); } else if (wFn.isPivotResult()) { GenericUDAFEvaluator streamingEval = wFn.getWFnEval().getWindowingEvaluator(wFn.getWindowFrame()); @@ -558,12 +550,11 @@ public boolean canIterateOutput() { output.add(null); wFnsWithWindows.add(i); } else { - outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, - pItr); + outputFromPivotFunctions[i] = (List) evaluateFunctionOnPartition(wFn, iPart); // TODO output.add(null); } } else { - outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr); + outputFromPivotFunctions[i] = (List) evaluateFunctionOnPartition(wFn, iPart); // TODO investigate what is pivot output.add(null); } } else { @@ -652,797 +643,6 @@ public boolean carryForwardNames() { } - ArrayList executeFnwithWindow(PTFDesc ptfDesc, - WindowFunctionDef wFnDef, - PTFPartition iPart) - throws HiveException { - ArrayList vals = new ArrayList(); - for(int i=0; i < iPart.size(); i++) { - Range rng = getRange(wFnDef, i, iPart); - PTFPartitionIterator rItr = rng.iterator(); - PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr); - Object out = evaluateWindowFunction(wFnDef, rItr); - vals.add(out); - } - return vals; - } - - private Range getRange(WindowFunctionDef wFnDef, int currRow, PTFPartition p) throws HiveException - { - WindowFrameDef winFrame = wFnDef.getWindowFrame(); - BoundaryDef startB = winFrame.getStart(); - BoundaryDef endB = winFrame.getEnd(); - - int start, end; - if (winFrame.getWindowType() == WindowType.ROWS) { - start = getRowBoundaryStart(startB, currRow); - end = getRowBoundaryEnd(endB, currRow, p); - } else { - ValueBoundaryScanner vbs = ValueBoundaryScanner.getScanner(winFrame); - start = vbs.computeStart(currRow, p); - end = vbs.computeEnd(currRow, p); - } - start = start < 0 ? 0 : start; - end = end > p.size() ? p.size() : end; - return new Range(start, end, p); - } - - private int getRowBoundaryStart(BoundaryDef b, int currRow) throws HiveException { - Direction d = b.getDirection(); - int amt = b.getAmt(); - switch(d) { - case PRECEDING: - if (amt == BoundarySpec.UNBOUNDED_AMOUNT) { - return 0; - } - else { - return currRow - amt; - } - case CURRENT: - return currRow; - case FOLLOWING: - return currRow + amt; - } - throw new HiveException("Unknown Start Boundary Direction: " + d); - } - - private int getRowBoundaryEnd(BoundaryDef b, int currRow, PTFPartition p) throws HiveException { - Direction d = b.getDirection(); - int amt = b.getAmt(); - switch(d) { - case PRECEDING: - if ( amt == 0 ) { - return currRow + 1; - } - return currRow - amt + 1; - case CURRENT: - return currRow + 1; - case FOLLOWING: - if (amt == BoundarySpec.UNBOUNDED_AMOUNT) { - return p.size(); - } - else { - return currRow + amt + 1; - } - } - throw new HiveException("Unknown End Boundary Direction: " + d); - } - - static class Range - { - int start; - int end; - PTFPartition p; - - public Range(int start, int end, PTFPartition p) - { - super(); - this.start = start; - this.end = end; - this.p = p; - } - - public PTFPartitionIterator iterator() - { - return p.range(start, end); - } - } - - - static abstract class ValueBoundaryScanner { - BoundaryDef start, end; - - public ValueBoundaryScanner(BoundaryDef start, BoundaryDef end) { - this.start = start; - this.end = end; - } - - protected abstract int computeStart(int rowIdx, PTFPartition p) throws HiveException; - - protected abstract int computeEnd(int rowIdx, PTFPartition p) throws HiveException; - - public static ValueBoundaryScanner getScanner(WindowFrameDef winFrameDef) - throws HiveException { - OrderDef orderDef = winFrameDef.getOrderDef(); - int numOrders = orderDef.getExpressions().size(); - if (numOrders != 1) { - return new MultiValueBoundaryScanner(winFrameDef.getStart(), winFrameDef.getEnd(), orderDef); - } else { - return SingleValueBoundaryScanner.getScanner(winFrameDef.getStart(), winFrameDef.getEnd(), orderDef); - } - } - } - - /* - * - starting from the given rowIdx scan in the given direction until a row's expr - * evaluates to an amt that crosses the 'amt' threshold specified in the BoundaryDef. - */ - static abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner { - OrderExpressionDef expressionDef; - - public SingleValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) { - super(start, end); - this.expressionDef = expressionDef; - } - - /* -| Use | Boundary1.type | Boundary1. amt | Sort Key | Order | Behavior | -| Case | | | | | | -|------+----------------+----------------+----------+-------+-----------------------------------| -| 1. | PRECEDING | UNB | ANY | ANY | start = 0 | -| 2. | PRECEDING | unsigned int | NULL | ASC | start = 0 | -| 3. | | | | DESC | scan backwards to row R2 | -| | | | | | such that R2.sk is not null | -| | | | | | start = R2.idx + 1 | -| 4. | PRECEDING | unsigned int | not NULL | DESC | scan backwards until row R2 | -| | | | | | such that R2.sk - R.sk > amt | -| | | | | | start = R2.idx + 1 | -| 5. | PRECEDING | unsigned int | not NULL | ASC | scan backward until row R2 | -| | | | | | such that R.sk - R2.sk > bnd1.amt | -| | | | | | start = R2.idx + 1 | -| 6. | CURRENT ROW | | NULL | ANY | scan backwards until row R2 | -| | | | | | such that R2.sk is not null | -| | | | | | start = R2.idx + 1 | -| 7. | CURRENT ROW | | not NULL | ANY | scan backwards until row R2 | -| | | | | | such R2.sk != R.sk | -| | | | | | start = R2.idx + 1 | -| 8. | FOLLOWING | UNB | ANY | ANY | Error | -| 9. | FOLLOWING | unsigned int | NULL | DESC | start = partition.size | -| 10. | | | | ASC | scan forward until R2 | -| | | | | | such that R2.sk is not null | -| | | | | | start = R2.idx | -| 11. | FOLLOWING | unsigned int | not NULL | DESC | scan forward until row R2 | -| | | | | | such that R.sk - R2.sk > amt | -| | | | | | start = R2.idx | -| 12. | | | | ASC | scan forward until row R2 | -| | | | | | such that R2.sk - R.sk > amt | -|------+----------------+----------------+----------+-------+-----------------------------------| - */ - @Override - protected int computeStart(int rowIdx, PTFPartition p) throws HiveException { - switch(start.getDirection()) { - case PRECEDING: - return computeStartPreceding(rowIdx, p); - case CURRENT: - return computeStartCurrentRow(rowIdx, p); - case FOLLOWING: - default: - return computeStartFollowing(rowIdx, p); - } - } - - protected int computeStartPreceding(int rowIdx, PTFPartition p) throws HiveException { - int amt = start.getAmt(); - // Use Case 1. - if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) { - return 0; - } - Object sortKey = computeValue(p.getAt(rowIdx)); - - if ( sortKey == null ) { - // Use Case 2. - if ( expressionDef.getOrder() == Order.ASC ) { - return 0; - } - else { // Use Case 3. - while ( sortKey == null && rowIdx >= 0 ) { - --rowIdx; - if ( rowIdx >= 0 ) { - sortKey = computeValue(p.getAt(rowIdx)); - } - } - return rowIdx+1; - } - } - - Object rowVal = sortKey; - int r = rowIdx; - - // Use Case 4. - if ( expressionDef.getOrder() == Order.DESC ) { - while (r >= 0 && !isDistanceGreater(rowVal, sortKey, amt) ) { - r--; - if ( r >= 0 ) { - rowVal = computeValue(p.getAt(r)); - } - } - return r + 1; - } - else { // Use Case 5. - while (r >= 0 && !isDistanceGreater(sortKey, rowVal, amt) ) { - r--; - if ( r >= 0 ) { - rowVal = computeValue(p.getAt(r)); - } - } - return r + 1; - } - } - - protected int computeStartCurrentRow(int rowIdx, PTFPartition p) throws HiveException { - Object sortKey = computeValue(p.getAt(rowIdx)); - - // Use Case 6. - if ( sortKey == null ) { - while ( sortKey == null && rowIdx >= 0 ) { - --rowIdx; - if ( rowIdx >= 0 ) { - sortKey = computeValue(p.getAt(rowIdx)); - } - } - return rowIdx+1; - } - - Object rowVal = sortKey; - int r = rowIdx; - - // Use Case 7. - while (r >= 0 && isEqual(rowVal, sortKey) ) { - r--; - if ( r >= 0 ) { - rowVal = computeValue(p.getAt(r)); - } - } - return r + 1; - } - - protected int computeStartFollowing(int rowIdx, PTFPartition p) throws HiveException { - int amt = start.getAmt(); - Object sortKey = computeValue(p.getAt(rowIdx)); - - Object rowVal = sortKey; - int r = rowIdx; - - if ( sortKey == null ) { - // Use Case 9. - if ( expressionDef.getOrder() == Order.DESC) { - return p.size(); - } - else { // Use Case 10. - while (r < p.size() && rowVal == null ) { - r++; - if ( r < p.size() ) { - rowVal = computeValue(p.getAt(r)); - } - } - return r; - } - } - - // Use Case 11. - if ( expressionDef.getOrder() == Order.DESC) { - while (r < p.size() && !isDistanceGreater(sortKey, rowVal, amt) ) { - r++; - if ( r < p.size() ) { - rowVal = computeValue(p.getAt(r)); - } - } - return r; - } - else { // Use Case 12. - while (r < p.size() && !isDistanceGreater(rowVal, sortKey, amt) ) { - r++; - if ( r < p.size() ) { - rowVal = computeValue(p.getAt(r)); - } - } - return r; - } - } - - /* -| Use | Boundary2.type | Boundary2.amt | Sort Key | Order | Behavior | -| Case | | | | | | -|------+----------------+---------------+----------+-------+-----------------------------------| -| 1. | PRECEDING | UNB | ANY | ANY | Error | -| 2. | PRECEDING | unsigned int | NULL | DESC | end = partition.size() | -| 3. | | | | ASC | end = 0 | -| 4. | PRECEDING | unsigned int | not null | DESC | scan backward until row R2 | -| | | | | | such that R2.sk - R.sk > bnd.amt | -| | | | | | end = R2.idx + 1 | -| 5. | PRECEDING | unsigned int | not null | ASC | scan backward until row R2 | -| | | | | | such that R.sk - R2.sk > bnd.amt | -| | | | | | end = R2.idx + 1 | -| 6. | CURRENT ROW | | NULL | ANY | scan forward until row R2 | -| | | | | | such that R2.sk is not null | -| | | | | | end = R2.idx | -| 7. | CURRENT ROW | | not null | ANY | scan forward until row R2 | -| | | | | | such that R2.sk != R.sk | -| | | | | | end = R2.idx | -| 8. | FOLLOWING | UNB | ANY | ANY | end = partition.size() | -| 9. | FOLLOWING | unsigned int | NULL | DESC | end = partition.size() | -| 10. | | | | ASC | scan forward until row R2 | -| | | | | | such that R2.sk is not null | -| | | | | | end = R2.idx | -| 11. | FOLLOWING | unsigned int | not NULL | DESC | scan forward until row R2 | -| | | | | | such R.sk - R2.sk > bnd.amt | -| | | | | | end = R2.idx | -| 12. | | | | ASC | scan forward until row R2 | -| | | | | | such R2.sk - R2.sk > bnd.amt | -| | | | | | end = R2.idx | -|------+----------------+---------------+----------+-------+-----------------------------------| - */ - @Override - protected int computeEnd(int rowIdx, PTFPartition p) throws HiveException { - switch(end.getDirection()) { - case PRECEDING: - return computeEndPreceding(rowIdx, p); - case CURRENT: - return computeEndCurrentRow(rowIdx, p); - case FOLLOWING: - default: - return computeEndFollowing(rowIdx, p); - } - } - - protected int computeEndPreceding(int rowIdx, PTFPartition p) throws HiveException { - int amt = end.getAmt(); - // Use Case 1. - // amt == UNBOUNDED, is caught during translation - - Object sortKey = computeValue(p.getAt(rowIdx)); - - if ( sortKey == null ) { - // Use Case 2. - if ( expressionDef.getOrder() == Order.DESC ) { - return p.size(); - } - else { // Use Case 3. - return 0; - } - } - - Object rowVal = sortKey; - int r = rowIdx; - - // Use Case 4. - if ( expressionDef.getOrder() == Order.DESC ) { - while (r >= 0 && !isDistanceGreater(rowVal, sortKey, amt) ) { - r--; - if ( r >= 0 ) { - rowVal = computeValue(p.getAt(r)); - } - } - return r + 1; - } - else { // Use Case 5. - while (r >= 0 && !isDistanceGreater(sortKey, rowVal, amt) ) { - r--; - if ( r >= 0 ) { - rowVal = computeValue(p.getAt(r)); - } - } - return r + 1; - } - } - - protected int computeEndCurrentRow(int rowIdx, PTFPartition p) throws HiveException { - Object sortKey = computeValue(p.getAt(rowIdx)); - - // Use Case 6. - if ( sortKey == null ) { - while ( sortKey == null && rowIdx < p.size() ) { - ++rowIdx; - if ( rowIdx < p.size() ) { - sortKey = computeValue(p.getAt(rowIdx)); - } - } - return rowIdx; - } - - Object rowVal = sortKey; - int r = rowIdx; - - // Use Case 7. - while (r < p.size() && isEqual(sortKey, rowVal) ) { - r++; - if ( r < p.size() ) { - rowVal = computeValue(p.getAt(r)); - } - } - return r; - } - - protected int computeEndFollowing(int rowIdx, PTFPartition p) throws HiveException { - int amt = end.getAmt(); - - // Use Case 8. - if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) { - return p.size(); - } - Object sortKey = computeValue(p.getAt(rowIdx)); - - Object rowVal = sortKey; - int r = rowIdx; - - if ( sortKey == null ) { - // Use Case 9. - if ( expressionDef.getOrder() == Order.DESC) { - return p.size(); - } - else { // Use Case 10. - while (r < p.size() && rowVal == null ) { - r++; - if ( r < p.size() ) { - rowVal = computeValue(p.getAt(r)); - } - } - return r; - } - } - - // Use Case 11. - if ( expressionDef.getOrder() == Order.DESC) { - while (r < p.size() && !isDistanceGreater(sortKey, rowVal, amt) ) { - r++; - if ( r < p.size() ) { - rowVal = computeValue(p.getAt(r)); - } - } - return r; - } - else { // Use Case 12. - while (r < p.size() && !isDistanceGreater(rowVal, sortKey, amt) ) { - r++; - if ( r < p.size() ) { - rowVal = computeValue(p.getAt(r)); - } - } - return r; - } - } - - public Object computeValue(Object row) throws HiveException { - Object o = expressionDef.getExprEvaluator().evaluate(row); - return ObjectInspectorUtils.copyToStandardObject(o, expressionDef.getOI()); - } - - /** - * Checks if the distance of v2 to v1 is greater than the given amt. - * @return True if the value of v1 - v2 is greater than amt or either value is null. - */ - public abstract boolean isDistanceGreater(Object v1, Object v2, int amt); - - /** - * Checks if the values of v1 or v2 are the same. - * @return True if both values are the same or both are nulls. - */ - public abstract boolean isEqual(Object v1, Object v2); - - - @SuppressWarnings("incomplete-switch") - public static SingleValueBoundaryScanner getScanner(BoundaryDef start, BoundaryDef end, OrderDef orderDef) - throws HiveException { - if (orderDef.getExpressions().size() != 1) { - throw new HiveException("Internal error: initializing SingleValueBoundaryScanner with" - + " multiple expression for sorting"); - } - OrderExpressionDef exprDef = orderDef.getExpressions().get(0); - PrimitiveObjectInspector pOI = (PrimitiveObjectInspector) exprDef.getOI(); - switch(pOI.getPrimitiveCategory()) { - case BYTE: - case INT: - case LONG: - case SHORT: - case TIMESTAMP: - return new LongValueBoundaryScanner(start, end, exprDef); - case DOUBLE: - case FLOAT: - return new DoubleValueBoundaryScanner(start, end, exprDef); - case DECIMAL: - return new HiveDecimalValueBoundaryScanner(start, end, exprDef); - case DATE: - return new DateValueBoundaryScanner(start, end, exprDef); - case STRING: - return new StringValueBoundaryScanner(start, end, exprDef); - } - throw new HiveException( - String.format("Internal Error: attempt to setup a Window for datatype %s", - pOI.getPrimitiveCategory())); - } - } - - public static class LongValueBoundaryScanner extends SingleValueBoundaryScanner { - public LongValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) { - super(start, end,expressionDef); - } - - @Override - public boolean isDistanceGreater(Object v1, Object v2, int amt) { - if (v1 != null && v2 != null) { - long l1 = PrimitiveObjectInspectorUtils.getLong(v1, - (PrimitiveObjectInspector) expressionDef.getOI()); - long l2 = PrimitiveObjectInspectorUtils.getLong(v2, - (PrimitiveObjectInspector) expressionDef.getOI()); - return (l1 -l2) > amt; - } - - return v1 != null || v2 != null; // True if only one value is null - } - - @Override - public boolean isEqual(Object v1, Object v2) { - if (v1 != null && v2 != null) { - long l1 = PrimitiveObjectInspectorUtils.getLong(v1, - (PrimitiveObjectInspector) expressionDef.getOI()); - long l2 = PrimitiveObjectInspectorUtils.getLong(v2, - (PrimitiveObjectInspector) expressionDef.getOI()); - return l1 == l2; - } - - return v1 == null && v2 == null; // True if both are null - } - } - - public static class DoubleValueBoundaryScanner extends SingleValueBoundaryScanner { - public DoubleValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) { - super(start, end,expressionDef); - } - - @Override - public boolean isDistanceGreater(Object v1, Object v2, int amt) { - if (v1 != null && v2 != null) { - double d1 = PrimitiveObjectInspectorUtils.getDouble(v1, - (PrimitiveObjectInspector) expressionDef.getOI()); - double d2 = PrimitiveObjectInspectorUtils.getDouble(v2, - (PrimitiveObjectInspector) expressionDef.getOI()); - return (d1 -d2) > amt; - } - - return v1 != null || v2 != null; // True if only one value is null - } - - @Override - public boolean isEqual(Object v1, Object v2) { - if (v1 != null && v2 != null) { - double d1 = PrimitiveObjectInspectorUtils.getDouble(v1, - (PrimitiveObjectInspector) expressionDef.getOI()); - double d2 = PrimitiveObjectInspectorUtils.getDouble(v2, - (PrimitiveObjectInspector) expressionDef.getOI()); - return d1 == d2; - } - - return v1 == null && v2 == null; // True if both are null - } - } - - public static class HiveDecimalValueBoundaryScanner extends SingleValueBoundaryScanner { - public HiveDecimalValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) { - super(start, end,expressionDef); - } - - @Override - public boolean isDistanceGreater(Object v1, Object v2, int amt) { - HiveDecimal d1 = PrimitiveObjectInspectorUtils.getHiveDecimal(v1, - (PrimitiveObjectInspector) expressionDef.getOI()); - HiveDecimal d2 = PrimitiveObjectInspectorUtils.getHiveDecimal(v2, - (PrimitiveObjectInspector) expressionDef.getOI()); - if ( d1 != null && d2 != null ) { - return d1.subtract(d2).intValue() > amt; // TODO: lossy conversion! - } - - return d1 != null || d2 != null; // True if only one value is null - } - - @Override - public boolean isEqual(Object v1, Object v2) { - HiveDecimal d1 = PrimitiveObjectInspectorUtils.getHiveDecimal(v1, - (PrimitiveObjectInspector) expressionDef.getOI()); - HiveDecimal d2 = PrimitiveObjectInspectorUtils.getHiveDecimal(v2, - (PrimitiveObjectInspector) expressionDef.getOI()); - if ( d1 != null && d2 != null ) { - return d1.equals(d2); - } - - return d1 == null && d2 == null; // True if both are null - } - } - - public static class DateValueBoundaryScanner extends SingleValueBoundaryScanner { - public DateValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) { - super(start, end,expressionDef); - } - - @Override - public boolean isDistanceGreater(Object v1, Object v2, int amt) { - Date l1 = PrimitiveObjectInspectorUtils.getDate(v1, - (PrimitiveObjectInspector) expressionDef.getOI()); - Date l2 = PrimitiveObjectInspectorUtils.getDate(v2, - (PrimitiveObjectInspector) expressionDef.getOI()); - if (l1 != null && l2 != null) { - return (double)(l1.getTime() - l2.getTime())/1000 > (long)amt * 24 * 3600; // Converts amt days to milliseconds - } - return l1 != l2; // True if only one date is null - } - - @Override - public boolean isEqual(Object v1, Object v2) { - Date l1 = PrimitiveObjectInspectorUtils.getDate(v1, - (PrimitiveObjectInspector) expressionDef.getOI()); - Date l2 = PrimitiveObjectInspectorUtils.getDate(v2, - (PrimitiveObjectInspector) expressionDef.getOI()); - return (l1 == null && l2 == null) || (l1 != null && l1.equals(l2)); - } - } - - public static class StringValueBoundaryScanner extends SingleValueBoundaryScanner { - public StringValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) { - super(start, end,expressionDef); - } - - @Override - public boolean isDistanceGreater(Object v1, Object v2, int amt) { - String s1 = PrimitiveObjectInspectorUtils.getString(v1, - (PrimitiveObjectInspector) expressionDef.getOI()); - String s2 = PrimitiveObjectInspectorUtils.getString(v2, - (PrimitiveObjectInspector) expressionDef.getOI()); - return s1 != null && s2 != null && s1.compareTo(s2) > 0; - } - - @Override - public boolean isEqual(Object v1, Object v2) { - String s1 = PrimitiveObjectInspectorUtils.getString(v1, - (PrimitiveObjectInspector) expressionDef.getOI()); - String s2 = PrimitiveObjectInspectorUtils.getString(v2, - (PrimitiveObjectInspector) expressionDef.getOI()); - return (s1 == null && s2 == null) || (s1 != null && s1.equals(s2)); - } - } - - /* - */ - static class MultiValueBoundaryScanner extends ValueBoundaryScanner { - OrderDef orderDef; - - public MultiValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderDef orderDef) { - super(start, end); - this.orderDef = orderDef; - } - - /* -|------+----------------+----------------+----------+-------+-----------------------------------| -| Use | Boundary1.type | Boundary1. amt | Sort Key | Order | Behavior | -| Case | | | | | | -|------+----------------+----------------+----------+-------+-----------------------------------| -| 1. | PRECEDING | UNB | ANY | ANY | start = 0 | -| 2. | CURRENT ROW | | ANY | ANY | scan backwards until row R2 | -| | | | | | such R2.sk != R.sk | -| | | | | | start = R2.idx + 1 | -|------+----------------+----------------+----------+-------+-----------------------------------| - */ - @Override - protected int computeStart(int rowIdx, PTFPartition p) throws HiveException { - switch(start.getDirection()) { - case PRECEDING: - return computeStartPreceding(rowIdx, p); - case CURRENT: - return computeStartCurrentRow(rowIdx, p); - case FOLLOWING: - default: - throw new HiveException( - "FOLLOWING not allowed for starting RANGE with multiple expressions in ORDER BY"); - } - } - - protected int computeStartPreceding(int rowIdx, PTFPartition p) throws HiveException { - int amt = start.getAmt(); - if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) { - return 0; - } - throw new HiveException( - "PRECEDING needs UNBOUNDED for RANGE with multiple expressions in ORDER BY"); - } - - protected int computeStartCurrentRow(int rowIdx, PTFPartition p) throws HiveException { - Object[] sortKey = computeValues(p.getAt(rowIdx)); - Object[] rowVal = sortKey; - int r = rowIdx; - - while (r >= 0 && isEqual(rowVal, sortKey) ) { - r--; - if ( r >= 0 ) { - rowVal = computeValues(p.getAt(r)); - } - } - return r + 1; - } - - /* -|------+----------------+---------------+----------+-------+-----------------------------------| -| Use | Boundary2.type | Boundary2.amt | Sort Key | Order | Behavior | -| Case | | | | | | -|------+----------------+---------------+----------+-------+-----------------------------------| -| 1. | CURRENT ROW | | ANY | ANY | scan forward until row R2 | -| | | | | | such that R2.sk != R.sk | -| | | | | | end = R2.idx | -| 2. | FOLLOWING | UNB | ANY | ANY | end = partition.size() | -|------+----------------+---------------+----------+-------+-----------------------------------| - */ - @Override - protected int computeEnd(int rowIdx, PTFPartition p) throws HiveException { - switch(end.getDirection()) { - case PRECEDING: - throw new HiveException( - "PRECEDING not allowed for finishing RANGE with multiple expressions in ORDER BY"); - case CURRENT: - return computeEndCurrentRow(rowIdx, p); - case FOLLOWING: - default: - return computeEndFollowing(rowIdx, p); - } - } - - protected int computeEndCurrentRow(int rowIdx, PTFPartition p) throws HiveException { - Object[] sortKey = computeValues(p.getAt(rowIdx)); - Object[] rowVal = sortKey; - int r = rowIdx; - - while (r < p.size() && isEqual(sortKey, rowVal) ) { - r++; - if ( r < p.size() ) { - rowVal = computeValues(p.getAt(r)); - } - } - return r; - } - - protected int computeEndFollowing(int rowIdx, PTFPartition p) throws HiveException { - int amt = end.getAmt(); - if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) { - return p.size(); - } - throw new HiveException( - "FOLLOWING needs UNBOUNDED for RANGE with multiple expressions in ORDER BY"); - } - - public Object[] computeValues(Object row) throws HiveException { - Object[] objs = new Object[orderDef.getExpressions().size()]; - for (int i = 0; i < objs.length; i++) { - Object o = orderDef.getExpressions().get(i).getExprEvaluator().evaluate(row); - objs[i] = ObjectInspectorUtils.copyToStandardObject(o, orderDef.getExpressions().get(i).getOI()); - } - return objs; - } - - public boolean isEqual(Object[] v1, Object[] v2) { - assert v1.length == v2.length; - for (int i = 0; i < v1.length; i++) { - if (v1[i] == null && v2[i] == null) { - continue; - } - if (v1[i] == null || v2[i] == null) { - return false; - } - if (ObjectInspectorUtils.compare( - v1[i], orderDef.getExpressions().get(i).getOI(), - v2[i], orderDef.getExpressions().get(i).getOI()) != 0) { - return false; - } - } - return true; - } - } - public static class SameList extends AbstractList { int sz; E val; @@ -1518,6 +718,8 @@ public boolean hasNext() { return currIdx < iPart.size(); } + // Given the data in a partition, evaluate the result for the next row for + // streaming and batch mode @Override public Object next() { int i; @@ -1550,10 +752,8 @@ public Object next() { } output.set(j, out); } else { - Range rng = getRange(wFn, currIdx, iPart); - PTFPartitionIterator rItr = rng.iterator(); - PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr); - output.set(j, evaluateWindowFunction(wFn, rItr)); + Object out = evaluateWindowFunction(wFn, currIdx, iPart); + output.set(j, out); } }