diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index d6efd49..ff874e2 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -128,6 +128,7 @@ minitez.query.files.shared=alter_merge_2_orc.q,\ union8.q,\ union9.q,\ vector_cast_constant.q,\ + vector_count_empty.q\ vector_data_types.q,\ vector_decimal_aggregate.q,\ vector_left_outer_join.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index acee377..de33830 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountMerge; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFSumDecimal; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFAvgDouble; @@ -1898,7 +1899,7 @@ static String getUndecoratedName(String hiveTypeName) { add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class)); add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class)); add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFSumLong.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class)); add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java index 5aa4d4c..4e43905 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java @@ -46,15 +46,7 @@ private static final long serialVersionUID = 1L; - transient private long value; - transient private boolean isNull; - - public void initIfNull() { - if (isNull) { - isNull = false; - value = 0; - } - } + transient private long count; @Override public int getVariableSize() { @@ -63,8 +55,7 @@ public int getVariableSize() { @Override public void reset() { - isNull = true; - value = 0L; + count = 0L; } } @@ -131,8 +122,7 @@ private void iterateNoNullsWithAggregationSelection( aggregationBufferSets, aggregateIndex, i); - myagg.initIfNull(); - myagg.value++; + myagg.count++; } } @@ -148,8 +138,7 @@ private void iterateHasNullsWithAggregationSelection( aggregationBufferSets, aggregateIndex, i); - myagg.initIfNull(); - myagg.value++; + myagg.count++; } } } @@ -168,8 +157,7 @@ private void iterateHasNullsSelectionWithAggregationSelection( aggregationBufferSets, aggregateIndex, j); - myagg.initIfNull(); - myagg.value++; + myagg.count++; } } } @@ -191,17 +179,15 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) Aggregation myagg = (Aggregation)agg; - myagg.initIfNull(); - if (inputVector.isRepeating) { if (inputVector.noNulls || !inputVector.isNull[0]) { - myagg.value += batchSize; + myagg.count += batchSize; } return; } if (inputVector.noNulls) { - myagg.value += batchSize; + myagg.count += batchSize; return; } else if (!batch.selectedInUse) { @@ -221,7 +207,7 @@ private void iterateSelectionHasNulls( for (int j=0; j< batchSize; ++j) { int i = selected[j]; if (!isNull[i]) { - myagg.value += 1; + myagg.count += 1; } } } @@ -233,7 +219,7 @@ private void iterateNoSelectionHasNulls( for (int i=0; i< batchSize; ++i) { if (!isNull[i]) { - myagg.value += 1; + myagg.count += 1; } } } @@ -251,14 +237,9 @@ public void reset(AggregationBuffer agg) throws HiveException { @Override public Object evaluateOutput(AggregationBuffer agg) throws HiveException { - Aggregation myagg = (Aggregation) agg; - if (myagg.isNull) { - return null; - } - else { - result.set (myagg.value); + Aggregation myagg = (Aggregation) agg; + result.set (myagg.count); return result; - } } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java new file mode 100644 index 0000000..7dabbd8 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java @@ -0,0 +1,400 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.LongWritable; + + +/** + * VectorUDAFCountMerge. Vectorized implementation for COUNT aggregate on reduce-side (merge). + */ +@Description(name = "count", value = "_FUNC_(expr) - Returns the merged sum value of expr (vectorized, type: long)") + +public class VectorUDAFCountMerge extends VectorAggregateExpression { + + private static final long serialVersionUID = 1L; + + /** + * class for storing the current aggregate value. + */ + static class Aggregation implements AggregationBuffer { + + private static final long serialVersionUID = 1L; + + transient private long value; + + @Override + public int getVariableSize() { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + value = 0L; + } + } + + private VectorExpression inputExpression = null; + transient private final LongWritable result; + + public VectorUDAFCountMerge(VectorExpression inputExpression) { + this(); + this.inputExpression = inputExpression; + } + + public VectorUDAFCountMerge() { + super(); + result = new LongWritable(0); + } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + long[] vector = inputVector.vector; + + if (inputVector.noNulls) { + if (inputVector.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector[0], batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batchSize); + } + } + } else { + if (inputVector.isRepeating) { + if (batch.selectedInUse) { + iterateHasNullsRepeatingSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector[0], batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector[0], batchSize, inputVector.isNull); + } + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batchSize, inputVector.isNull); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.value += value; + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.value += values[selection[i]]; + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.value += values[i]; + } + } + + private void iterateHasNullsRepeatingSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[selection[i]]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.value += value; + } + } + + } + + private void iterateHasNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.value += value; + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + myagg.value += values[i]; + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.value += values[i]; + } + } + } + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) + throws HiveException { + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + Aggregation myagg = (Aggregation)agg; + + long[] vector = inputVector.vector; + + if (inputVector.isRepeating) { + if (inputVector.noNulls) { + myagg.value += vector[0]*batchSize; + } + return; + } + + if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNulls(myagg, vector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); + } + else { + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); + } + } + + private void iterateSelectionHasNulls( + Aggregation myagg, + long[] vector, + int batchSize, + boolean[] isNull, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + int i = selected[j]; + if (!isNull[i]) { + myagg.value += vector[i]; + } + } + } + + private void iterateSelectionNoNulls( + Aggregation myagg, + long[] vector, + int batchSize, + int[] selected) { + + for (int i=0; i< batchSize; ++i) { + myagg.value += vector[selected[i]]; + } + } + + private void iterateNoSelectionHasNulls( + Aggregation myagg, + long[] vector, + int batchSize, + boolean[] isNull) { + + for(int i=0;i