diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index 2fe8192..04d362d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -41,15 +41,16 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.SelectColumnIsTrue; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFAvgDouble; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFAvgLong; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFCountDouble; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFCountLong; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxDouble; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxLong; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxString; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinDouble; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinLong; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinString; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopDouble; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopLong; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdSampDouble; @@ -941,11 +942,14 @@ private String getOutputColType(String inputType, String method) { static Object[][] aggregatesDefinition = { {"min", "Long", VectorUDAFMinLong.class}, {"min", "Double", VectorUDAFMinDouble.class}, + {"min", "String", VectorUDAFMinString.class}, {"max", "Long", VectorUDAFMaxLong.class}, {"max", "Double", VectorUDAFMaxDouble.class}, + {"max", "String", VectorUDAFMaxString.class}, {"count", null, VectorUDAFCountStar.class}, - {"count", "Long", VectorUDAFCountLong.class}, - {"count", "Double", VectorUDAFCountDouble.class}, + {"count", "Long", VectorUDAFCount.class}, + {"count", "Double", VectorUDAFCount.class}, + {"count", "String", VectorUDAFCount.class}, {"sum", "Long", VectorUDAFSumLong.class}, {"sum", "Double", VectorUDAFSumDouble.class}, {"avg", "Long", VectorUDAFAvgLong.class}, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java new file mode 100644 index 0000000..bb4800e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.LongWritable; + +/** +* VectorUDAFCountLong. Vectorized implementation for COUNT aggregates. +*/ +@Description(name = "count", value = "_FUNC_(expr) - Returns the count (vectorized)") +public class VectorUDAFCount extends VectorAggregateExpression { + + /** + /* class for storing the current aggregate value. + */ + static class Aggregation implements AggregationBuffer { + long value; + boolean isNull; + + public void initIfNull() { + if (isNull) { + isNull = false; + value = 0; + } + } + } + + private final VectorExpression inputExpression; + private final LongWritable result; + + public VectorUDAFCount(VectorExpression inputExpression) { + super(); + this.inputExpression = inputExpression; + result = new LongWritable(0); + } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + ColumnVector inputVector = batch.cols[this.inputExpression.getOutputColumn()]; + + if (inputVector.noNulls) { + // if there are no nulls then the iteration is the same on all cases + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, batchSize); + } else if (!batch.selectedInUse) { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + batchSize, inputVector.isNull); + } else if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + batchSize, batch.selected, inputVector.isNull); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.initIfNull(); + myagg.value++; + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.initIfNull(); + myagg.value++; + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + myagg.initIfNull(); + myagg.value++; + } + } + } + + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) + throws HiveException { + + inputExpression.evaluate(batch); + + ColumnVector inputVector = batch.cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + Aggregation myagg = (Aggregation)agg; + + myagg.initIfNull(); + + if (inputVector.isRepeating) { + if (inputVector.noNulls || !inputVector.isNull[0]) { + myagg.value += batchSize; + } + return; + } + + if (inputVector.noNulls) { + myagg.value += batchSize; + return; + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull); + } + else { + iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected); + } + } + + private void iterateSelectionHasNulls( + Aggregation myagg, + int batchSize, + boolean[] isNull, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + int i = selected[j]; + if (!isNull[i]) { + myagg.value += 1; + } + } + } + + private void iterateNoSelectionHasNulls( + Aggregation myagg, + int batchSize, + boolean[] isNull) { + + for (int i=0; i< batchSize; ++i) { + if (!isNull[i]) { + myagg.value += 1; + } + } + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + return new Aggregation(); + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + Aggregation myAgg = (Aggregation) agg; + myAgg.isNull = true; + } + + @Override + public Object evaluateOutput(AggregationBuffer agg) throws HiveException { + Aggregation myagg = (Aggregation) agg; + if (myagg.isNull) { + return null; + } + else { + result.set (myagg.value); + return result; + } + } + + @Override + public ObjectInspector getOutputObjectInspector() { + return PrimitiveObjectInspectorFactory.writableLongObjectInspector; + } + +} + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java deleted file mode 100644 index 7a0c22b..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen; - -import java.util.ArrayList; - -import org.apache.hadoop.hive.ql.exec.Description; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. - VectorAggregateExpression.AggregationBuffer; -import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; - -/** -* VectorUDAFCountDouble. Vectorized implementation for COUNT aggregates. -*/ -@Description(name = "count", value = "_FUNC_(expr) - Returns the maximum value of expr (vectorized, type: double)") -public class VectorUDAFCountDouble extends VectorAggregateExpression { - - /** - /* class for storing the current aggregate value. - */ - static class Aggregation implements AggregationBuffer { - long value; - boolean isNull; - - public void initIfNull() { - if (isNull) { - isNull = false; - value = 0; - } - } - } - - private VectorExpression inputExpression; - private LongWritable result; - - public VectorUDAFCountDouble(VectorExpression inputExpression) { - super(); - this.inputExpression = inputExpression; - result = new LongWritable(0); - } - - private Aggregation getCurrentAggregationBuffer( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int row) { - VectorAggregationBufferRow mySet = aggregationBufferSets[row]; - Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); - return myagg; - } - - @Override - public void aggregateInputSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - VectorizedRowBatch batch) throws HiveException { - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - inputExpression.evaluate(batch); - - DoubleColumnVector inputVector = (DoubleColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - - if (inputVector.noNulls) { - // if there are no nulls then the iteration is the same on all cases - iterateNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, batchSize); - } else if (!batch.selectedInUse) { - iterateHasNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, - batchSize, inputVector.isNull); - } else if (batch.selectedInUse) { - iterateHasNullsSelectionWithAggregationSelection( - aggregationBufferSets, aggregateIndex, - batchSize, batch.selected, inputVector.isNull); - } - } - - private void iterateNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int batchSize) { - - for (int i=0; i < batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - myagg.initIfNull(); - myagg.value++; - } - } - - private void iterateHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int batchSize, - boolean[] isNull) { - - for (int i=0; i < batchSize; ++i) { - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - myagg.initIfNull(); - myagg.value++; - } - } - } - - private void iterateHasNullsSelectionWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int batchSize, - int[] selection, - boolean[] isNull) { - - for (int j=0; j < batchSize; ++j) { - int i = selection[j]; - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - j); - myagg.initIfNull(); - myagg.value++; - } - } - } - - - @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) - throws HiveException { - - inputExpression.evaluate(batch); - - DoubleColumnVector inputVector = (DoubleColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - Aggregation myagg = (Aggregation)agg; - - myagg.initIfNull(); - - if (inputVector.isRepeating) { - if (inputVector.noNulls || !inputVector.isNull[0]) { - myagg.value += batchSize; - } - return; - } - - if (inputVector.noNulls) { - myagg.value += batchSize; - return; - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull); - } - else { - iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected); - } - } - - private void iterateSelectionHasNulls( - Aggregation myagg, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - int i = selected[j]; - if (!isNull[i]) { - myagg.value += 1; - } - } - } - - private void iterateNoSelectionHasNulls( - Aggregation myagg, - int batchSize, - boolean[] isNull) { - - for (int i=0; i< batchSize; ++i) { - if (!isNull[i]) { - myagg.value += 1; - } - } - } - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - return new Aggregation(); - } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; - } - - @Override - public Object evaluateOutput(AggregationBuffer agg) throws HiveException { - Aggregation myagg = (Aggregation) agg; - if (myagg.isNull) { - return null; - } - else { - result.set (myagg.value); - return result; - } - } - - @Override - public ObjectInspector getOutputObjectInspector() { - return PrimitiveObjectInspectorFactory.writableLongObjectInspector; - } - -} - diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java deleted file mode 100644 index c63892c..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen; - -import java.util.ArrayList; - -import org.apache.hadoop.hive.ql.exec.Description; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. - VectorAggregateExpression.AggregationBuffer; -import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; - -/** -* VectorUDAFCountLong. Vectorized implementation for COUNT aggregates. -*/ -@Description(name = "count", value = "_FUNC_(expr) - Returns the maximum value of expr (vectorized, type: long)") -public class VectorUDAFCountLong extends VectorAggregateExpression { - - /** - /* class for storing the current aggregate value. - */ - static class Aggregation implements AggregationBuffer { - long value; - boolean isNull; - - public void initIfNull() { - if (isNull) { - isNull = false; - value = 0; - } - } - } - - private VectorExpression inputExpression; - private LongWritable result; - - public VectorUDAFCountLong(VectorExpression inputExpression) { - super(); - this.inputExpression = inputExpression; - result = new LongWritable(0); - } - - private Aggregation getCurrentAggregationBuffer( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int row) { - VectorAggregationBufferRow mySet = aggregationBufferSets[row]; - Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); - return myagg; - } - - @Override - public void aggregateInputSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - VectorizedRowBatch batch) throws HiveException { - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - inputExpression.evaluate(batch); - - LongColumnVector inputVector = (LongColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - - if (inputVector.noNulls) { - // if there are no nulls then the iteration is the same on all cases - iterateNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, batchSize); - } else if (!batch.selectedInUse) { - iterateHasNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, - batchSize, inputVector.isNull); - } else if (batch.selectedInUse) { - iterateHasNullsSelectionWithAggregationSelection( - aggregationBufferSets, aggregateIndex, - batchSize, batch.selected, inputVector.isNull); - } - } - - private void iterateNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int batchSize) { - - for (int i=0; i < batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - myagg.initIfNull(); - myagg.value++; - } - } - - private void iterateHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int batchSize, - boolean[] isNull) { - - for (int i=0; i < batchSize; ++i) { - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - myagg.initIfNull(); - myagg.value++; - } - } - } - - private void iterateHasNullsSelectionWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int batchSize, - int[] selection, - boolean[] isNull) { - - for (int j=0; j < batchSize; ++j) { - int i = selection[j]; - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - j); - myagg.initIfNull(); - myagg.value++; - } - } - } - - - @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) - throws HiveException { - - inputExpression.evaluate(batch); - - LongColumnVector inputVector = (LongColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - Aggregation myagg = (Aggregation)agg; - - myagg.initIfNull(); - - if (inputVector.isRepeating) { - if (inputVector.noNulls || !inputVector.isNull[0]) { - myagg.value += batchSize; - } - return; - } - - if (inputVector.noNulls) { - myagg.value += batchSize; - return; - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull); - } - else { - iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected); - } - } - - private void iterateSelectionHasNulls( - Aggregation myagg, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - int i = selected[j]; - if (!isNull[i]) { - myagg.value += 1; - } - } - } - - private void iterateNoSelectionHasNulls( - Aggregation myagg, - int batchSize, - boolean[] isNull) { - - for (int i=0; i< batchSize; ++i) { - if (!isNull[i]) { - myagg.value += 1; - } - } - } - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - return new Aggregation(); - } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; - } - - @Override - public Object evaluateOutput(AggregationBuffer agg) throws HiveException { - Aggregation myagg = (Aggregation) agg; - if (myagg.isNull) { - return null; - } - else { - result.set (myagg.value); - return result; - } - } - - @Override - public ObjectInspector getOutputObjectInspector() { - return PrimitiveObjectInspectorFactory.writableLongObjectInspector; - } - -} - diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java new file mode 100644 index 0000000..d4eea4a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen; + +import java.util.ArrayList; +import java.util.Arrays; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. + VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.io.BytesWritable; + +/** +* VectorUDAFMaxString. Vectorized implementation for MIN/MAX aggregates. +*/ +@Description(name = "max", value = "_FUNC_(expr) - Returns the minimum value of expr (vectorized, type: string)") +public class VectorUDAFMaxString extends VectorAggregateExpression { + + /** + /* class for storing the current aggregate value. + */ + static private final class Aggregation implements AggregationBuffer { + + final static int MIN_BUFFER_SIZE = 16; + byte[] bytes = new byte[MIN_BUFFER_SIZE]; + int length; + boolean isNull; + + public void checkValue(byte[] bytes, int start, int length) { + if (isNull) { + isNull = false; + assign(bytes, start, length); + } else if (StringExpr.compare( + bytes, start, length, + this.bytes, 0, this.length) > 0) { + assign(bytes, start, length); + } + } + + public void assign(byte[] bytes, int start, int length) { + // Avoid new allocation if possible + if (this.bytes.length < length) { + this.bytes = new byte[length]; + } + System.arraycopy(bytes, start, this.bytes, 0, length); + this.length = length; + } + } + + private VectorExpression inputExpression; + private BytesWritable result; + + public VectorUDAFMaxString(VectorExpression inputExpression) { + super(); + this.inputExpression = inputExpression; + result = new BytesWritable(); + } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex); + return myagg; + } + +@Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + BytesColumnVector inputColumn = (BytesColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + if (inputColumn.noNulls) { + if (inputColumn.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batchSize); + } + } + } else { + if (inputColumn.isRepeating) { + // All nulls, no-op for min/max + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batchSize, batch.selected); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batchSize); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int batchSize) { + + byte[] bytes = inputColumn.vector[0]; + int start = inputColumn.start[0]; + int length = inputColumn.length[0]; + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(bytes, start, length); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + int row = selection[i]; + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(inputColumn.vector[row], + inputColumn.start[row], + inputColumn.length[row]); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int batchSize, + int[] selection) { + + for (int i=0; i < batchSize; ++i) { + int row = selection[i]; + if (!inputColumn.isNull[row]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(inputColumn.vector[row], + inputColumn.start[row], + inputColumn.length[row]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + if (!inputColumn.isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + } + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) + throws HiveException { + + inputExpression.evaluate(batch); + + BytesColumnVector inputColumn = (BytesColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + Aggregation myagg = (Aggregation)agg; + + if (inputColumn.isRepeating) { + if (inputColumn.noNulls) { + myagg.checkValue(inputColumn.vector[0], + inputColumn.start[0], + inputColumn.length[0]); + } + return; + } + + if (!batch.selectedInUse && inputColumn.noNulls) { + iterateNoSelectionNoNulls(myagg, inputColumn, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNulls(myagg, inputColumn, batchSize); + } + else if (inputColumn.noNulls){ + iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected); + } + else { + iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected); + } + } + + private void iterateSelectionHasNulls( + Aggregation myagg, + BytesColumnVector inputColumn, + int batchSize, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + int i = selected[j]; + if (!inputColumn.isNull[i]) { + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + } + + private void iterateSelectionNoNulls( + Aggregation myagg, + BytesColumnVector inputColumn, + int batchSize, + int[] selected) { + + for (int i=0; i< batchSize; ++i) { + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + + private void iterateNoSelectionHasNulls( + Aggregation myagg, + BytesColumnVector inputColumn, + int batchSize) { + + for (int i=0; i< batchSize; ++i) { + if (!inputColumn.isNull[i]) { + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + } + + private void iterateNoSelectionNoNulls( + Aggregation myagg, + BytesColumnVector inputColumn, + int batchSize) { + for (int i=0; i< batchSize; ++i) { + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + return new Aggregation(); + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + Aggregation myAgg = (Aggregation) agg; + myAgg.isNull = true; + } + + @Override + public Object evaluateOutput( + AggregationBuffer agg) throws HiveException { + Aggregation myagg = (Aggregation) agg; + if (myagg.isNull) { + return null; + } + else { + result.set(myagg.bytes, 0, myagg.length); + return result; + } + } + + @Override + public ObjectInspector getOutputObjectInspector() { + return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + } +} + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java new file mode 100644 index 0000000..a8ef3e0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen; + +import java.util.ArrayList; +import java.util.Arrays; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. + VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.io.BytesWritable; + +/** +* VectorUDAFMinString. Vectorized implementation for MIN/MAX aggregates. +*/ +@Description(name = "min", value = "_FUNC_(expr) - Returns the minimum value of expr (vectorized, type: string)") +public class VectorUDAFMinString extends VectorAggregateExpression { + + /** + /* class for storing the current aggregate value. + */ + static private final class Aggregation implements AggregationBuffer { + + final static int MIN_BUFFER_SIZE = 16; + byte[] bytes = new byte[MIN_BUFFER_SIZE]; + int length; + boolean isNull; + + public void checkValue(byte[] bytes, int start, int length) { + if (isNull) { + isNull = false; + assign(bytes, start, length); + } else if (StringExpr.compare( + bytes, start, length, + this.bytes, 0, this.length) < 0) { + assign(bytes, start, length); + } + } + + public void assign(byte[] bytes, int start, int length) { + // Avoid new allocation if possible + if (this.bytes.length < length) { + this.bytes = new byte[length]; + } + System.arraycopy(bytes, start, this.bytes, 0, length); + this.length = length; + } + } + + private VectorExpression inputExpression; + private BytesWritable result; + + public VectorUDAFMinString(VectorExpression inputExpression) { + super(); + this.inputExpression = inputExpression; + result = new BytesWritable(); + } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex); + return myagg; + } + +@Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + BytesColumnVector inputColumn = (BytesColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + if (inputColumn.noNulls) { + if (inputColumn.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batchSize); + } + } + } else { + if (inputColumn.isRepeating) { + // All nulls, no-op for min/max + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batchSize, batch.selected); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batchSize); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int batchSize) { + + byte[] bytes = inputColumn.vector[0]; + int start = inputColumn.start[0]; + int length = inputColumn.length[0]; + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(bytes, start, length); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + int row = selection[i]; + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(inputColumn.vector[row], + inputColumn.start[row], + inputColumn.length[row]); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int batchSize, + int[] selection) { + + for (int i=0; i < batchSize; ++i) { + int row = selection[i]; + if (!inputColumn.isNull[row]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(inputColumn.vector[row], + inputColumn.start[row], + inputColumn.length[row]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + if (!inputColumn.isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + } + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) + throws HiveException { + + inputExpression.evaluate(batch); + + BytesColumnVector inputColumn = (BytesColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + Aggregation myagg = (Aggregation)agg; + + if (inputColumn.isRepeating) { + if (inputColumn.noNulls) { + myagg.checkValue(inputColumn.vector[0], + inputColumn.start[0], + inputColumn.length[0]); + } + return; + } + + if (!batch.selectedInUse && inputColumn.noNulls) { + iterateNoSelectionNoNulls(myagg, inputColumn, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNulls(myagg, inputColumn, batchSize); + } + else if (inputColumn.noNulls){ + iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected); + } + else { + iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected); + } + } + + private void iterateSelectionHasNulls( + Aggregation myagg, + BytesColumnVector inputColumn, + int batchSize, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + int i = selected[j]; + if (!inputColumn.isNull[i]) { + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + } + + private void iterateSelectionNoNulls( + Aggregation myagg, + BytesColumnVector inputColumn, + int batchSize, + int[] selected) { + + for (int i=0; i< batchSize; ++i) { + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + + private void iterateNoSelectionHasNulls( + Aggregation myagg, + BytesColumnVector inputColumn, + int batchSize) { + + for (int i=0; i< batchSize; ++i) { + if (!inputColumn.isNull[i]) { + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + } + + private void iterateNoSelectionNoNulls( + Aggregation myagg, + BytesColumnVector inputColumn, + int batchSize) { + for (int i=0; i< batchSize; ++i) { + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + return new Aggregation(); + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + Aggregation myAgg = (Aggregation) agg; + myAgg.isNull = true; + } + + @Override + public Object evaluateOutput( + AggregationBuffer agg) throws HiveException { + Aggregation myagg = (Aggregation) agg; + if (myagg.isNull) { + return null; + } + else { + result.set(myagg.bytes, 0, myagg.length); + return result; + } + } + + @Override + public ObjectInspector getOutputObjectInspector() { + return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + } +} + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java index 3ef2aa8..4d0d309 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java @@ -194,9 +194,8 @@ {"VectorUDAFMinMax", "VectorUDAFMaxLong", "long", ">", "max", "_FUNC_(expr) - Returns the maximum value of expr (vectorized, type: long)"}, {"VectorUDAFMinMax", "VectorUDAFMaxDouble", "double", ">", "max", "_FUNC_(expr) - Returns the maximum value of expr (vectorized, type: double)"}, - //template, , - {"VectorUDAFCount", "VectorUDAFCountLong", "long"}, - {"VectorUDAFCount", "VectorUDAFCountDouble", "double"}, + {"VectorUDAFMinMaxString", "VectorUDAFMinString", "<", "min", "_FUNC_(expr) - Returns the minimum value of expr (vectorized, type: string)"}, + {"VectorUDAFMinMaxString", "VectorUDAFMaxString", ">", "max", "_FUNC_(expr) - Returns the minimum value of expr (vectorized, type: string)"}, //template, , {"VectorUDAFSum", "VectorUDAFSumLong", "long"}, @@ -280,6 +279,8 @@ private void generate() throws Exception { generateVectorUDAFCount(tdesc); } else if (tdesc[0].equals("VectorUDAFMinMax")) { generateVectorUDAFMinMax(tdesc); + } else if (tdesc[0].equals("VectorUDAFMinMaxString")) { + generateVectorUDAFMinMaxString(tdesc); } else if (tdesc[0].equals("VectorUDAFSum")) { generateVectorUDAFSum(tdesc); } else if (tdesc[0].equals("VectorUDAFAvg")) { @@ -323,6 +324,25 @@ private void generateVectorUDAFMinMax(String[] tdesc) throws Exception { } + private void generateVectorUDAFMinMaxString(String[] tdesc) throws Exception { + String className = tdesc[1]; + String operatorSymbol = tdesc[2]; + String descName = tdesc[3]; + String descValue = tdesc[4]; + + String outputFile = joinPath(this.outputDirectory, className + ".java"); + String templateFile = joinPath(this.templateDirectory, tdesc[0] + ".txt"); + + String templateString = readFile(templateFile); + templateString = templateString.replaceAll("", className); + templateString = templateString.replaceAll("", operatorSymbol); + templateString = templateString.replaceAll("", descName); + templateString = templateString.replaceAll("", descValue); + writeFile(outputFile, templateString); + + } + + private void generateVectorUDAFCount(String[] tdesc) throws IOException { String className = tdesc[1]; String valueType = tdesc[2]; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt index 54c80ec..69ff67b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt @@ -44,7 +44,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn /** * . Vectorized implementation for COUNT aggregates. */ -@Description(name = "count", value = "_FUNC_(expr) - Returns the maximum value of expr (vectorized, type: )") +@Description(name = "count", value = "_FUNC_(expr) - Returns the maximum value of expr (vectorized)") public class extends VectorAggregateExpression { /** @@ -94,7 +94,7 @@ public class extends VectorAggregateExpression { inputExpression.evaluate(batch); - inputVector = ()batch. + VectorColumn inputVector = (VectorColumn)batch. cols[this.inputExpression.getOutputColumn()]; if (inputVector.noNulls) { @@ -172,7 +172,7 @@ public class extends VectorAggregateExpression { inputExpression.evaluate(batch); - inputVector = ()batch. + VectorColumn inputVector = (VectorColumn)batch. cols[this.inputExpression.getOutputColumn()]; int batchSize = batch.size; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt new file mode 100644 index 0000000..f66bf91 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen; + +import java.util.ArrayList; +import java.util.Arrays; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. + VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.io.BytesWritable; + +/** +* . Vectorized implementation for MIN/MAX aggregates. +*/ +@Description(name = "", value = "") +public class extends VectorAggregateExpression { + + /** + /* class for storing the current aggregate value. + */ + static private final class Aggregation implements AggregationBuffer { + + final static int MIN_BUFFER_SIZE = 16; + byte[] bytes = new byte[MIN_BUFFER_SIZE]; + int length; + boolean isNull; + + public void checkValue(byte[] bytes, int start, int length) { + if (isNull) { + isNull = false; + assign(bytes, start, length); + } else if (StringExpr.compare( + bytes, start, length, + this.bytes, 0, this.length) 0) { + assign(bytes, start, length); + } + } + + public void assign(byte[] bytes, int start, int length) { + // Avoid new allocation if possible + if (this.bytes.length < length) { + this.bytes = new byte[length]; + } + System.arraycopy(bytes, start, this.bytes, 0, length); + this.length = length; + } + } + + private VectorExpression inputExpression; + private BytesWritable result; + + public (VectorExpression inputExpression) { + super(); + this.inputExpression = inputExpression; + result = new BytesWritable(); + } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex); + return myagg; + } + +@Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + BytesColumnVector inputColumn = (BytesColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + if (inputColumn.noNulls) { + if (inputColumn.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batchSize); + } + } + } else { + if (inputColumn.isRepeating) { + // All nulls, no-op for min/max + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batchSize, batch.selected); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + inputColumn, batchSize); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int batchSize) { + + byte[] bytes = inputColumn.vector[0]; + int start = inputColumn.start[0]; + int length = inputColumn.length[0]; + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(bytes, start, length); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + int row = selection[i]; + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(inputColumn.vector[row], + inputColumn.start[row], + inputColumn.length[row]); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int batchSize, + int[] selection) { + + for (int i=0; i < batchSize; ++i) { + int row = selection[i]; + if (!inputColumn.isNull[row]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(inputColumn.vector[row], + inputColumn.start[row], + inputColumn.length[row]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + BytesColumnVector inputColumn, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + if (!inputColumn.isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + } + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) + throws HiveException { + + inputExpression.evaluate(batch); + + BytesColumnVector inputColumn = (BytesColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + Aggregation myagg = (Aggregation)agg; + + if (inputColumn.isRepeating) { + if (inputColumn.noNulls) { + myagg.checkValue(inputColumn.vector[0], + inputColumn.start[0], + inputColumn.length[0]); + } + return; + } + + if (!batch.selectedInUse && inputColumn.noNulls) { + iterateNoSelectionNoNulls(myagg, inputColumn, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNulls(myagg, inputColumn, batchSize); + } + else if (inputColumn.noNulls){ + iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected); + } + else { + iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected); + } + } + + private void iterateSelectionHasNulls( + Aggregation myagg, + BytesColumnVector inputColumn, + int batchSize, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + int i = selected[j]; + if (!inputColumn.isNull[i]) { + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + } + + private void iterateSelectionNoNulls( + Aggregation myagg, + BytesColumnVector inputColumn, + int batchSize, + int[] selected) { + + for (int i=0; i< batchSize; ++i) { + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + + private void iterateNoSelectionHasNulls( + Aggregation myagg, + BytesColumnVector inputColumn, + int batchSize) { + + for (int i=0; i< batchSize; ++i) { + if (!inputColumn.isNull[i]) { + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + } + + private void iterateNoSelectionNoNulls( + Aggregation myagg, + BytesColumnVector inputColumn, + int batchSize) { + for (int i=0; i< batchSize; ++i) { + myagg.checkValue(inputColumn.vector[i], + inputColumn.start[i], + inputColumn.length[i]); + } + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + return new Aggregation(); + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + Aggregation myAgg = (Aggregation) agg; + myAgg.isNull = true; + } + + @Override + public Object evaluateOutput( + AggregationBuffer agg) throws HiveException { + Aggregation myagg = (Aggregation) agg; + if (myagg.isNull) { + return null; + } + else { + result.set(myagg.bytes, 0, myagg.length); + return result; + } + } + + @Override + public ObjectInspector getOutputObjectInspector() { + return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + } +} + diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java index 6fc230f..9e6372f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; +import org.junit.Assert; import org.junit.Test; /** @@ -66,9 +67,10 @@ private static ExprNodeDesc buildColumnDesc( private static AggregationDesc buildAggregationDesc( VectorizationContext ctx, String aggregate, - String column) { + String column, + TypeInfo typeInfo) { - ExprNodeDesc inputColumn = buildColumnDesc(ctx, column, TypeInfoFactory.longTypeInfo); + ExprNodeDesc inputColumn = buildColumnDesc(ctx, column, typeInfo); ArrayList params = new ArrayList(); params.add(inputColumn); @@ -88,12 +90,13 @@ private static AggregationDesc buildAggregationDescCountStar( } - private static GroupByDesc buildGroupByDesc( + private static GroupByDesc buildGroupByDescLong( VectorizationContext ctx, String aggregate, String column) { - AggregationDesc agg = buildAggregationDesc(ctx, aggregate, column); + AggregationDesc agg = buildAggregationDesc(ctx, aggregate, + column, TypeInfoFactory.longTypeInfo); ArrayList aggs = new ArrayList(); aggs.add(agg); @@ -106,6 +109,28 @@ private static GroupByDesc buildGroupByDesc( return desc; } + + private static GroupByDesc buildGroupByDescString( + VectorizationContext ctx, + String aggregate, + String column) { + + AggregationDesc agg = buildAggregationDesc(ctx, aggregate, + column, TypeInfoFactory.stringTypeInfo); + ArrayList aggs = new ArrayList(); + aggs.add(agg); + + ArrayList outputColumnNames = new ArrayList(); + outputColumnNames.add("_col0"); + + GroupByDesc desc = new GroupByDesc(); + desc.setOutputColumnNames(outputColumnNames); + desc.setAggregators(aggs); + + return desc; + } + + private static GroupByDesc buildGroupByDescCountStar( VectorizationContext ctx) { @@ -131,7 +156,7 @@ private static GroupByDesc buildKeyGroupByDesc( TypeInfo typeInfo, String key) { - GroupByDesc desc = buildGroupByDesc(ctx, aggregate, column); + GroupByDesc desc = buildGroupByDescLong(ctx, aggregate, column); ExprNodeDesc keyExp = buildColumnDesc(ctx, key, typeInfo); ArrayList keys = new ArrayList(); @@ -150,6 +175,76 @@ public void testCountStar () throws HiveException { } @Test + public void testCountString () throws HiveException { + testAggregateString( + "count", + 2, + Arrays.asList(new Object[]{"A","B","C"}), + 3L); + } + + @Test + public void testMaxString () throws HiveException { + testAggregateString( + "max", + 2, + Arrays.asList(new Object[]{"A","B","C"}), + "C"); + testAggregateString( + "max", + 2, + Arrays.asList(new Object[]{"C", "B", "A"}), + "C"); + } + + @Test + public void testMinString () throws HiveException { + testAggregateString( + "min", + 2, + Arrays.asList(new Object[]{"A","B","C"}), + "A"); + testAggregateString( + "min", + 2, + Arrays.asList(new Object[]{"C", "B", "A"}), + "A"); + } + + @Test + public void testMaxNullString () throws HiveException { + testAggregateString( + "max", + 2, + Arrays.asList(new Object[]{"A","B",null}), + "B"); + testAggregateString( + "max", + 2, + Arrays.asList(new Object[]{null, null, null}), + null); + } + + @Test + public void testCountStringWithNull () throws HiveException { + testAggregateString( + "count", + 2, + Arrays.asList(new Object[]{"A",null,"C", "D", null}), + 3L); + } + + @Test + public void testCountStringAllNull () throws HiveException { + testAggregateString( + "count", + 4, + Arrays.asList(new Object[]{null, null, null, null, null}), + 0L); + } + + + @Test public void testMinLongNullStringKeys() throws HiveException { testAggregateStringKeyAggregate( "min", @@ -969,6 +1064,19 @@ public void testAggregateLongKeyAggregate ( testAggregateLongKeyIterable (aggregateName, fdr, expected); } + public void testAggregateString ( + String aggregateName, + int batchSize, + Iterable values, + Object expected) throws HiveException { + + @SuppressWarnings("unchecked") + FakeVectorRowBatchFromObjectIterables fdr = new FakeVectorRowBatchFromObjectIterables( + batchSize, new String[] {"string"}, values); + testAggregateStringIterable (aggregateName, fdr, expected); + } + + public void testAggregateLongAggregate ( String aggregateName, int batchSize, @@ -1001,14 +1109,19 @@ public void validate(Object expected, Object result) { assertEquals(true, result instanceof Object[]); Object[] arr = (Object[]) result; - assertEquals (1, arr.length); + assertEquals(1, arr.length); if (expected == null) { assertNull (arr[0]); - } else { - assertEquals (true, arr[0] instanceof LongWritable); + } else if (arr[0] instanceof LongWritable) { LongWritable lw = (LongWritable) arr[0]; - assertEquals ((Long) expected, (Long) lw.get()); + assertEquals((Long) expected, (Long) lw.get()); + } else if (arr[0] instanceof BytesWritable) { + BytesWritable bw = (BytesWritable) arr[0]; + String sbw = new String(bw.getBytes()); + assertEquals((String) expected, sbw); + } else { + Assert.fail("Unsupported result type: " + expected.getClass().getName()); } } } @@ -1159,6 +1272,37 @@ public void testAggregateCountStarIterable ( validator.validate(expected, result); } + public void testAggregateStringIterable ( + String aggregateName, + Iterable data, + Object expected) throws HiveException { + Map mapColumnNames = new HashMap(); + mapColumnNames.put("A", 0); + VectorizationContext ctx = new VectorizationContext(mapColumnNames, 1); + + GroupByDesc desc = buildGroupByDescString (ctx, aggregateName, "A"); + + VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc); + + FakeCaptureOutputOperator out = FakeCaptureOutputOperator.addCaptureOutputChild(vgo); + vgo.initialize(null, null); + + for (VectorizedRowBatch unit: data) { + vgo.process(unit, 0); + } + vgo.close(false); + + List outBatchList = out.getCapturedRows(); + assertNotNull(outBatchList); + assertEquals(1, outBatchList.size()); + + Object result = outBatchList.get(0); + + Validator validator = getValidator(aggregateName); + validator.validate(expected, result); + } + + public void testAggregateLongIterable ( String aggregateName, Iterable data, @@ -1167,7 +1311,7 @@ public void testAggregateLongIterable ( mapColumnNames.put("A", 0); VectorizationContext ctx = new VectorizationContext(mapColumnNames, 1); - GroupByDesc desc = buildGroupByDesc (ctx, aggregateName, "A"); + GroupByDesc desc = buildGroupByDescLong (ctx, aggregateName, "A"); VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc);