diff --git metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index f9da781..fbbdc0f 100644
--- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -51,7 +51,7 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.serde.serdeConstants;;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java
new file mode 100644
index 0000000..35712d0
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * A hash map key wrapper for vectorized processing.
+ * It stores the key values as primitives in arrays for each supported primitive type.
+ * This works in conjunction with
+ * {@link org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapperBatch VectorHashKeyWrapperBatch}
+ * to hash vectorized processing units (batches).
+ */
+public class VectorHashKeyWrapper extends KeyWrapper {
+
+ private long[] longValues;
+ private double[] doubleValues;
+ private boolean[] isNull;
+ private int hashcode;
+
+ public VectorHashKeyWrapper(int longValuesCount, int doubleValuesCount) {
+ longValues = new long[longValuesCount];
+ doubleValues = new double[doubleValuesCount];
+ isNull = new boolean[longValuesCount + doubleValuesCount];
+ }
+
+ private VectorHashKeyWrapper() {
+ }
+
+ @Override
+ void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException {
+ throw new HiveException("Should not be called");
+ }
+
+ @Override
+ void setHashKey() {
+ hashcode = Arrays.hashCode(longValues) ^
+ Arrays.hashCode(doubleValues) ^
+ Arrays.hashCode(isNull);
+ }
+
+ @Override
+ public int hashCode() {
+ return hashcode;
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof VectorHashKeyWrapper) {
+ VectorHashKeyWrapper keyThat = (VectorHashKeyWrapper)that;
+ return hashcode == keyThat.hashcode &&
+ Arrays.equals(longValues, keyThat.longValues) &&
+ Arrays.equals(doubleValues, keyThat.doubleValues) &&
+ Arrays.equals(isNull, keyThat.isNull);
+ }
+ return false;
+ }
+
+ @Override
+ protected Object clone() {
+ VectorHashKeyWrapper clone = new VectorHashKeyWrapper();
+ clone.longValues = longValues.clone();
+ clone.doubleValues = doubleValues.clone();
+ clone.isNull = isNull.clone();
+ clone.hashcode = hashcode;
+ return clone;
+ }
+
+ @Override
+ public KeyWrapper copyKey() {
+ return (KeyWrapper) clone();
+ }
+
+ @Override
+ void copyKey(KeyWrapper oldWrapper) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ Object[] getKeyArray() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void assignDouble(int index, double d) {
+ doubleValues[index] = d;
+ isNull[longValues.length + index] = false;
+ }
+
+ public void assignNullDouble(int index) {
+ doubleValues[index] = 0; // assign 0 to simplify hashcode
+ isNull[longValues.length + index] = true;
+ }
+
+ public void assignLong(int index, long v) {
+ longValues[index] = v;
+ isNull[index] = false;
+ }
+
+ public void assignNullLong(int index) {
+ longValues[index] = 0; // assign 0 to simplify hashcode
+ isNull[index] = true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%d[%s] %d[%s]",
+ longValues.length, Arrays.toString(longValues),
+ doubleValues.length, Arrays.toString(doubleValues));
+ }
+
+ public boolean getIsNull(int i) {
+ return isNull[i];
+ }
+
+ public long getLongValue(int i) {
+ return longValues[i];
+ }
+
+ public double getDoubleValue(int i) {
+ return doubleValues[i - longValues.length];
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java
new file mode 100644
index 0000000..c23614c
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.Arrays;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Class for handling vectorized hash map key wrappers. It evaluates the key columns in a
+ * row batch in a vectorized fashion.
+ * This class stores additional information about keys needed to evaluate and output the key values.
+ *
+ */
+public class VectorHashKeyWrapperBatch {
+
+ /**
+ * Helper class for looking up a key value based on key index
+ *
+ */
+ private static class KeyLookupHelper {
+ public int longIndex;
+ public int doubleIndex;
+ }
+
+ /**
+ * The key expressions that require evaluation and output the primitive values for each key.
+ */
+ private VectorExpression[] keyExpressions;
+
+ /**
+ * indices of LONG primitive keys
+ */
+ private int[] longIndices;
+
+ /**
+ * indices of DOUBLE primitive keys
+ */
+ private int[] doubleIndices;
+
+ /**
+ * pre-allocated batch size vector of keys wrappers.
+ * N.B. these keys are **mutable** and should never be used in a HashMap.
+ * Always clone the key wrapper to obtain an immutable keywrapper suitable
+ * to use a key in a HashMap.
+ */
+ private VectorHashKeyWrapper[] vectorHashKeyWrappers;
+
+ /**
+ * lookup vector to map from key index to primitive type index
+ */
+ private KeyLookupHelper[] indexLookup;
+
+ /**
+ * preallocated and reused LongWritable objects for emiting row mode key values
+ */
+ private LongWritable[] longKeyValueOutput;
+
+ /**
+ * preallocated and reused DoubleWritable objects for emiting row mode key values
+ */
+ private DoubleWritable[] doubleKeyValueOutput;
+
+ /**
+ * Accessor for the batch-sized array of key wrappers
+ */
+ public VectorHashKeyWrapper[] getVectorHashKeyWrappers() {
+ return vectorHashKeyWrappers;
+ }
+
+ /**
+ * Processes a batch:
+ *
+ *
Evaluates each key vector expression.
+ *
Copies out each key's primitive values into the key wrappers
+ *
computes the hashcode of the key wrappers
+ *
+ * @param vrb
+ * @throws HiveException
+ */
+ public void evaluateBatch (VectorizedRowBatch vrb) throws HiveException {
+ for(int i = 0; i < keyExpressions.length; ++i) {
+ keyExpressions[i].evaluate(vrb);
+ }
+ for(int i = 0; i< longIndices.length; ++i) {
+ int keyIndex = longIndices[i];
+ int columnIndex = keyExpressions[keyIndex].getOutputColumn();
+ LongColumnVector columnVector = (LongColumnVector) vrb.cols[columnIndex];
+ if (columnVector.noNulls && !columnVector.isRepeating && !vrb.selectedInUse) {
+ assignLongNoNullsNoRepeatingNoSelection(i, vrb.size, columnVector);
+ } else if (columnVector.noNulls && !columnVector.isRepeating && vrb.selectedInUse) {
+ assignLongNoNullsNoRepeatingSelection(i, vrb.size, columnVector, vrb.selected);
+ } else if (columnVector.noNulls && columnVector.isRepeating) {
+ assignLongNoNullsRepeating(i, vrb.size, columnVector);
+ } else if (!columnVector.noNulls && !columnVector.isRepeating && !vrb.selectedInUse) {
+ assignLongNullsNoRepeatingNoSelection(i, vrb.size, columnVector);
+ } else if (!columnVector.noNulls && columnVector.isRepeating) {
+ assignLongNullsRepeating(i, vrb.size, columnVector);
+ } else if (!columnVector.noNulls && !columnVector.isRepeating && vrb.selectedInUse) {
+ assignLongNullsNoRepeatingSelection (i, vrb.size, columnVector, vrb.selected);
+ } else {
+ throw new HiveException (String.format("Unimplemented Long null/repeat/selected combination %b/%b/%b",
+ columnVector.noNulls, columnVector.isRepeating, vrb.selectedInUse));
+ }
+ }
+ for(int i=0;i= 0) {
+ longKeyValueOutput[klh.longIndex].set(kw.getLongValue(i));
+ return longKeyValueOutput[klh.longIndex];
+ } else if (klh.doubleIndex >= 0) {
+ doubleKeyValueOutput[klh.doubleIndex].set(kw.getDoubleValue(i));
+ return doubleKeyValueOutput[klh.doubleIndex];
+ } else {
+ throw new HiveException(String.format(
+ "Internal inconsistent KeyLookupHelper at index [%d]:%d %d",
+ i, klh.longIndex, klh.doubleIndex));
+ }
+ }
+}
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
new file mode 100644
index 0000000..030a73c
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+/**
+ * This maps a batch to the aggregation buffers sets to use for each row (key)
+ *
+ */
+public class VectorAggregationBufferBatch {
+
+ /**
+ * Batch sized array of aggregation buffer sets.
+ * The array is preallocated and is reused for each batch, but the individual entries
+ * will reference different aggregation buffer set from batch to batch.
+ * the array is not reset between batches, content past this.index will be stale.
+ */
+ private VectorAggregationBufferRow[] aggregationBuffers;
+
+ /**
+ * the selection vector that maps row within a batch to the
+ * specific aggregation buffer set to use.
+ */
+ private int[] selection;
+
+ /**
+ * versioning number gets incremented on each batch. This allows us to cache the selection
+ * mapping info in the aggregation buffer set themselves while still being able to
+ * detect stale info.
+ */
+ private int version;
+
+ /**
+ * Get the number of distinct aggregation buffer sets (ie. keys) used in current batch.
+ */
+ private int distinctCount;
+
+ /**
+ * the array of aggregation buffers for the current batch.
+ * content past the {@link #getDistinctBufferSetCount()} index
+ * is stale from previous batches.
+ * @return
+ */
+ public VectorAggregationBufferRow[] getAggregationBuffers() {
+ return aggregationBuffers;
+ }
+
+ /**
+ * number of distinct aggregation buffer sets (ie. keys) in the current batch.
+ * @return
+ */
+ public int getDistinctBufferSetCount () {
+ return distinctCount;
+ }
+
+ /**
+ * gets the selection vector to use for the current batch. This maps the batch rows by position
+ * (row number) to an index in the {@link #getAggregationBuffers()} array.
+ * @return
+ */
+ public int[] getSelectionVector() {
+ return selection;
+ }
+
+ public VectorAggregationBufferBatch() {
+ aggregationBuffers = new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE];
+ selection = new int [VectorizedRowBatch.DEFAULT_SIZE];
+ }
+
+ /**
+ * resets the internal aggregation buffers sets index and increments the versioning
+ * used to optimize the selection vector population.
+ */
+ public void startBatch() {
+ version++;
+ distinctCount = 0;
+ }
+
+ /**
+ * assigns the given aggregation buffer set to a given batch row (by row number).
+ * populates the selection vector appropriately. This is where the versioning numbers
+ * play a role in determining if the index cached on the aggregation buffer set is stale.
+ */
+ public void mapAggregationBufferSet(VectorAggregationBufferRow bufferSet, int row) {
+ if (version != bufferSet.getVersion()) {
+ bufferSet.setVersionAndIndex(version, distinctCount);
+ ++distinctCount;
+ }
+ aggregationBuffers[row] = bufferSet;
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
new file mode 100644
index 0000000..7aa4b11
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+
+/**
+ * Represents a set of aggregation buffers to be used for a specific key for UDAF GROUP BY.
+ *
+ */
+public class VectorAggregationBufferRow {
+ private VectorAggregateExpression.AggregationBuffer[] aggregationBuffers;
+ private int version;
+ private int index;
+
+ public VectorAggregationBufferRow(
+ VectorAggregateExpression.AggregationBuffer[] aggregationBuffers) {
+ this.aggregationBuffers = aggregationBuffers;
+ }
+
+ /**
+ * returns the aggregation buffer for an aggregation expression, by index.
+ */
+ public VectorAggregateExpression.AggregationBuffer getAggregationBuffer(int bufferIndex) {
+ return aggregationBuffers[bufferIndex];
+ }
+
+ /**
+ * returns the array of aggregation buffers (the entire set).
+ */
+ public VectorAggregateExpression.AggregationBuffer[] getAggregationBuffers() {
+ return aggregationBuffers;
+ }
+
+ /**
+ * Versioning used to detect staleness of the index cached for benefit of
+ * {@link org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferBatch VectorAggregationBufferBatch}.
+ */
+ public int getVersion() {
+ return version;
+ }
+
+ /**
+ * cached index used by VectorAggregationBufferBatch.
+ * @return
+ */
+ public int getIndex() {
+ return index;
+ }
+
+ /**
+ * accessor for VectorAggregationBufferBatch to set its caching info on this set.
+ */
+ public void setVersionAndIndex(int version, int index) {
+ this.index = index;
+ this.version = version;
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index bcee45c..91366dd 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -20,14 +20,20 @@
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.KeyWrapper;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapper;
+import org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapperBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates
- .VectorAggregateExpression.AggregationBuffer;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
@@ -37,19 +43,44 @@
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
/**
- * Vectorized GROUP BY operator impelementation. Consumes the vectorized input and
- * stores the aggregates operators intermediate states. Emits row mode output.
+ * Vectorized GROUP BY operator implementation. Consumes the vectorized input and
+ * stores the aggregate operators' intermediate states. Emits row mode output.
*
*/
public class VectorGroupByOperator extends Operator implements Serializable {
+ private static final Log LOG = LogFactory.getLog(
+ VectorGroupByOperator.class.getName());
+
private final VectorizationContext vContext;
- protected transient VectorAggregateExpression[] aggregators;
- protected transient AggregationBuffer[] aggregationBuffers;
+ /**
+ * This is the vector of aggregators. They are stateless and only implement
+ * the algorithm of how to compute the aggregation. state is kept in the
+ * aggregation buffers and is our responsibility to match the proper state for each key.
+ */
+ private transient VectorAggregateExpression[] aggregators;
+
+ /**
+ * Key vector expressions.
+ */
+ private transient VectorExpression[] keyExpressions;
+
+ /**
+ * The aggregation buffers to use for the current batch.
+ */
+ private transient VectorAggregationBufferBatch aggregationBatchInfo;
+
+ /**
+ * The current batch key wrappers.
+ * The very same instance gets reused for all batches.
+ */
+ private transient VectorHashKeyWrapperBatch keyWrappersBatch;
- transient int heartbeatInterval;
- transient int countAfterReport;
+ /**
+ * The global key-aggregation hash map.
+ */
+ private transient Map mapKeysAggregationBuffers;
private static final long serialVersionUID = 1L;
@@ -68,17 +99,22 @@ protected void initializeOp(Configuration hconf) throws HiveException {
vContext.setOperatorType(OperatorType.GROUPBY);
ArrayList aggrDesc = conf.getAggregators();
+ keyExpressions = vContext.getVectorExpressions(conf.getKeys());
+
+ for(int i = 0; i < keyExpressions.length; ++i) {
+ objectInspectors.add(vContext.createObjectInspector(keyExpressions[i]));
+ }
aggregators = new VectorAggregateExpression[aggrDesc.size()];
- aggregationBuffers = new AggregationBuffer[aggrDesc.size()];
for (int i = 0; i < aggrDesc.size(); ++i) {
AggregationDesc desc = aggrDesc.get(i);
aggregators[i] = vContext.getAggregatorExpression (desc);
- aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
- aggregators[i].reset(aggregationBuffers[i]);
-
objectInspectors.add(aggregators[i].getOutputObjectInspector());
}
+
+ keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
+ aggregationBatchInfo = new VectorAggregationBufferBatch();
+ mapKeysAggregationBuffers = new HashMap();
List outputFieldNames = conf.getOutputColumnNames();
outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
@@ -94,22 +130,120 @@ protected void initializeOp(Configuration hconf) throws HiveException {
@Override
public void processOp(Object row, int tag) throws HiveException {
- VectorizedRowBatch vrg = (VectorizedRowBatch) row;
+ VectorizedRowBatch batch = (VectorizedRowBatch) row;
+
+ // First we traverse the batch to evaluate and prepare the KeyWrappers
+ // After this the KeyWrappers are properly set and hash code is computed
+ keyWrappersBatch.evaluateBatch(batch);
+
+ // Next we locate the aggregation buffer set for each key
+ prepareBatchAggregationBufferSets(batch);
+
+ // Finally, evaluate the aggregators
+ processAggregators(batch);
+ }
+
+ /**
+ * Evaluates the aggregators on the current batch.
+ * The aggregationBatchInfo must have been prepared
+ * by calling {@link #prepareBatchAggregationBufferSets} first.
+ */
+ private void processAggregators(VectorizedRowBatch batch) throws HiveException {
+ // We now have a vector of aggregation buffer sets to use for each row
+ // We can start computing the aggregates.
+ // If the number of distinct keys in the batch is 1 we can
+ // use the optimized code path of aggregateInput
+ VectorAggregationBufferRow[] aggregationBufferSets =
+ aggregationBatchInfo.getAggregationBuffers();
+ if (aggregationBatchInfo.getDistinctBufferSetCount() == 1) {
+ VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
+ aggregationBufferSets[0].getAggregationBuffers();
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].aggregateInput(aggregationBuffers[i], batch);
+ }
+ } else {
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].aggregateInputSelection(
+ aggregationBufferSets,
+ i,
+ batch);
+ }
+ }
+ }
+
+ /**
+ * Locates the aggregation buffer sets to use for each key in the current batch.
+ * The keyWrappersBatch must have evaluated the current batch first.
+ */
+ private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws HiveException {
+ // The aggregation batch vector needs to know when we start a new batch
+ // to bump its internal version.
+ aggregationBatchInfo.startBatch();
+
+ // We now have to probe the global hash and find-or-allocate
+ // the aggregation buffers to use for each key present in the batch
+ VectorHashKeyWrapper[] keyWrappers = keyWrappersBatch.getVectorHashKeyWrappers();
+ for (int i=0; i < batch.size; ++i) {
+ VectorHashKeyWrapper kw = keyWrappers[i];
+ VectorAggregationBufferRow aggregationBuffer = mapKeysAggregationBuffers.get(kw);
+ if (null == aggregationBuffer) {
+ // the probe failed, we must allocate a set of aggregation buffers
+ // and push the (keywrapper,buffers) pair into the hash.
+ // is very important to clone the keywrapper, the one we have from our
+ // keyWrappersBatch is going to be reset/reused on next batch.
+ aggregationBuffer = allocateAggregationBuffer();
+ mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer);
+ }
+ aggregationBatchInfo.mapAggregationBufferSet(aggregationBuffer, i);
+ }
+ }
- //TODO: proper group by hash
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].aggregateInput(aggregationBuffers[i], vrg);
+ /**
+ * allocates a new aggregation buffer set.
+ */
+ private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException {
+ VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
+ new VectorAggregateExpression.AggregationBuffer[aggregators.length];
+ for (int i=0; i < aggregators.length; ++i) {
+ aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
+ aggregators[i].reset(aggregationBuffers[i]);
}
+ VectorAggregationBufferRow bufferSet = new VectorAggregationBufferRow(aggregationBuffers);
+ return bufferSet;
}
@Override
public void closeOp(boolean aborted) throws HiveException {
if (!aborted) {
- Object[] forwardCache = new Object[aggregators.length];
- for (int i = 0; i < aggregators.length; ++i) {
- forwardCache[i] = aggregators[i].evaluateOutput(aggregationBuffers[i]);
+ Object[] forwardCache = new Object[keyExpressions.length + aggregators.length];
+ if (keyExpressions.length == 0 && mapKeysAggregationBuffers.isEmpty()) {
+
+ // if this is a global aggregation (no keys) and empty set, must still emit NULLs
+ VectorAggregationBufferRow emptyBuffers = allocateAggregationBuffer();
+ for (int i = 0; i < aggregators.length; ++i) {
+ forwardCache[i] = aggregators[i].evaluateOutput(emptyBuffers.getAggregationBuffer(i));
+ }
+ forward(forwardCache, outputObjInspector);
+ } else {
+
+ /* Iterate the global (keywrapper,aggregationbuffers) map and emit
+ a row for each key */
+ for(Map.Entry pair:
+ mapKeysAggregationBuffers.entrySet()){
+ int fi = 0;
+ for (int i = 0; i < keyExpressions.length; ++i) {
+ VectorHashKeyWrapper kw = (VectorHashKeyWrapper)pair.getKey();
+ forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (kw, i);
+ }
+ for (int i = 0; i < aggregators.length; ++i) {
+ forwardCache[fi++] = aggregators[i].evaluateOutput(pair.getValue()
+ .getAggregationBuffer(i));
+ }
+ LOG.debug(String.format("forwarding keys: %s: %s",
+ pair.getKey().toString(), Arrays.toString(forwardCache)));
+ forward(forwardCache, outputObjInspector);
+ }
}
- forward(forwardCache, outputObjInspector);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index 1a0d98a..1ef4955 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -86,6 +86,7 @@
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
/**
@@ -209,6 +210,9 @@ private VectorExpression getVectorExpression(ExprNodeColumnDesc
public VectorExpression[] getVectorExpressions(List exprNodes) throws HiveException {
int i = 0;
+ if (null == exprNodes) {
+ return new VectorExpression[0];
+ }
VectorExpression[] ret = new VectorExpression[exprNodes.size()];
for (ExprNodeDesc e : exprNodes) {
ret[i++] = getVectorExpression(e);
@@ -1058,5 +1062,18 @@ public ColumnVector allocateColumnVector(String type, int defaultSize) {
return new LongColumnVector(defaultSize);
}
}
+
+ public ObjectInspector createObjectInspector(VectorExpression vectorExpression)
+ throws HiveException {
+ String columnType = vectorExpression.getOutputType();
+ if (columnType.equalsIgnoreCase("long") ||
+ columnType.equalsIgnoreCase("bigint")) {
+ return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ } else if (columnType.equalsIgnoreCase("double")) {
+ return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+ } else {
+ throw new HiveException(String.format("Must implement type %s", columnType));
+ }
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
index 8623fe5..8ab9f43 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -30,6 +31,8 @@
public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
public abstract void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
throws HiveException;
+ public abstract void aggregateInputSelection(VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex, VectorizedRowBatch vrg) throws HiveException;
public abstract void reset(AggregationBuffer agg) throws HiveException;
public abstract Object evaluateOutput(AggregationBuffer agg) throws HiveException;
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java
index bf6f786..54102a4 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java
@@ -24,7 +24,8 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
- VectorAggregateExpression.AggregationBuffer;
+ VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -42,19 +43,27 @@
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-/**
-* VectorUDAFAvgDouble. Vectorized implementation for AVG aggregates.
-*/
+import org.apache.hadoop.hive.ql.io.orc.*;
+
@Description(name = "avg", value = "_FUNC_(expr) - Returns the average value of expr (vectorized, type: double)")
public class VectorUDAFAvgDouble extends VectorAggregateExpression {
- /**
- /* class for storing the current aggregate value.
- */
- static private final class Aggregation implements AggregationBuffer {
+ /** class for storing the current aggregate value. */
+ static class Aggregation implements AggregationBuffer {
double sum;
long count;
boolean isNull;
+
+ public void sumValue(double value) {
+ if (isNull) {
+ sum = value;
+ count = 1;
+ isNull = false;
+ } else {
+ sum += value;
+ count++;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -67,14 +76,16 @@ public VectorUDAFAvgDouble(VectorExpression inputExpression) {
super();
this.inputExpression = inputExpression;
partialResult = new Object[2];
- partialResult[0] = resultCount = new LongWritable();
- partialResult[1] = resultSum = new DoubleWritable();
+ resultCount = new LongWritable();
+ resultSum = new DoubleWritable();
+ partialResult[0] = resultCount;
+ partialResult[1] = resultSum;
initPartialResultInspector();
}
- private void initPartialResultInspector () {
- ArrayList foi = new ArrayList();
+ private void initPartialResultInspector() {
+ ArrayList foi = new ArrayList();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
ArrayList fname = new ArrayList();
@@ -83,52 +94,239 @@ private void initPartialResultInspector () {
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(bufferIndex);
+ return myagg;
+ }
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
- throws HiveException {
-
- inputExpression.evaluate(unit);
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ VectorizedRowBatch batch) throws HiveException {
- DoubleColumnVector inputVector = (DoubleColumnVector)unit.
- cols[this.inputExpression.getOutputColumn()];
-
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
}
- Aggregation myagg = (Aggregation)agg;
-
- double[] vector = inputVector.vector;
+ inputExpression.evaluate(batch);
- if (inputVector.isRepeating) {
- if (inputVector.noNulls || !inputVector.isNull[0]) {
- if (myagg.isNull) {
- myagg.isNull = false;
- myagg.sum = 0;
- myagg.count = 0;
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+ long[] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize);
}
- myagg.sum += vector[0]*batchSize;
- myagg.count += batchSize;
}
- return;
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize, inputVector.isNull);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
}
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int[] selection,
+ int batchSize) {
- if (!unit.selectedInUse && inputVector.noNulls) {
- iterateNoSelectionNoNulls(myagg, vector, batchSize);
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[selection[i]]);
}
- else if (!unit.selectedInUse) {
- iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[i]);
}
- else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
+ }
}
- else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
+ }
}
}
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ j);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation)agg;
+
+ double[] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ if (myagg.isNull) {
+ myagg.isNull = false;
+ myagg.sum = 0;
+ myagg.count = 0;
+ }
+ myagg.sum += vector[0]*batchSize;
+ myagg.count += batchSize;
+ }
+ return;
+ }
+
+ if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNulls(myagg, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
+ }
+ }
private void iterateSelectionHasNulls(
Aggregation myagg,
@@ -236,7 +434,7 @@ public Object evaluateOutput(
@Override
public ObjectInspector getOutputObjectInspector() {
- return soi;
- }
+ return soi;
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java
index 23aa6dd..8c6844b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java
@@ -24,7 +24,8 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
- VectorAggregateExpression.AggregationBuffer;
+ VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -42,19 +43,27 @@
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-/**
-* VectorUDAFAvgLong. Vectorized implementation for AVG aggregates.
-*/
+import org.apache.hadoop.hive.ql.io.orc.*;
+
@Description(name = "avg", value = "_FUNC_(expr) - Returns the average value of expr (vectorized, type: long)")
public class VectorUDAFAvgLong extends VectorAggregateExpression {
- /**
- /* class for storing the current aggregate value.
- */
- static private final class Aggregation implements AggregationBuffer {
+ /** class for storing the current aggregate value. */
+ static class Aggregation implements AggregationBuffer {
long sum;
long count;
boolean isNull;
+
+ public void sumValue(long value) {
+ if (isNull) {
+ sum = value;
+ count = 1;
+ isNull = false;
+ } else {
+ sum += value;
+ count++;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -67,14 +76,16 @@ public VectorUDAFAvgLong(VectorExpression inputExpression) {
super();
this.inputExpression = inputExpression;
partialResult = new Object[2];
- partialResult[0] = resultCount = new LongWritable();
- partialResult[1] = resultSum = new DoubleWritable();
+ resultCount = new LongWritable();
+ resultSum = new DoubleWritable();
+ partialResult[0] = resultCount;
+ partialResult[1] = resultSum;
initPartialResultInspector();
}
- private void initPartialResultInspector () {
- ArrayList foi = new ArrayList();
+ private void initPartialResultInspector() {
+ ArrayList foi = new ArrayList();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
ArrayList fname = new ArrayList();
@@ -83,52 +94,239 @@ private void initPartialResultInspector () {
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(bufferIndex);
+ return myagg;
+ }
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
- throws HiveException {
-
- inputExpression.evaluate(unit);
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ VectorizedRowBatch batch) throws HiveException {
- LongColumnVector inputVector = (LongColumnVector)unit.
- cols[this.inputExpression.getOutputColumn()];
-
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
}
- Aggregation myagg = (Aggregation)agg;
-
- long[] vector = inputVector.vector;
+ inputExpression.evaluate(batch);
- if (inputVector.isRepeating) {
- if (inputVector.noNulls || !inputVector.isNull[0]) {
- if (myagg.isNull) {
- myagg.isNull = false;
- myagg.sum = 0;
- myagg.count = 0;
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+ long[] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize);
+ }
+ }
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize, inputVector.isNull);
}
- myagg.sum += vector[0]*batchSize;
- myagg.count += batchSize;
}
- return;
}
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int[] selection,
+ int batchSize) {
- if (!unit.selectedInUse && inputVector.noNulls) {
- iterateNoSelectionNoNulls(myagg, vector, batchSize);
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[selection[i]]);
}
- else if (!unit.selectedInUse) {
- iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[i]);
}
- else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
+ }
}
- else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
+ }
}
}
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ j);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation)agg;
+
+ long[] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ if (myagg.isNull) {
+ myagg.isNull = false;
+ myagg.sum = 0;
+ myagg.count = 0;
+ }
+ myagg.sum += vector[0]*batchSize;
+ myagg.count += batchSize;
+ }
+ return;
+ }
+
+ if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNulls(myagg, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
+ }
+ }
private void iterateSelectionHasNulls(
Aggregation myagg,
@@ -236,7 +434,7 @@ public Object evaluateOutput(
@Override
public ObjectInspector getOutputObjectInspector() {
- return soi;
- }
+ return soi;
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java
index 0995a01..7a0c22b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -52,6 +53,13 @@
static class Aggregation implements AggregationBuffer {
long value;
boolean isNull;
+
+ public void initIfNull() {
+ if (isNull) {
+ isNull = false;
+ value = 0;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -62,17 +70,112 @@ public VectorUDAFCountDouble(VectorExpression inputExpression) {
this.inputExpression = inputExpression;
result = new LongWritable(0);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ if (inputVector.noNulls) {
+ // if there are no nulls then the iteration is the same on all cases
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, batchSize);
+ } else if (!batch.selectedInUse) {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ batchSize, inputVector.isNull);
+ } else if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ batchSize, batch.selected, inputVector.isNull);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+ }
+
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- DoubleColumnVector inputVector = (DoubleColumnVector)unit.
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -80,11 +183,8 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
Aggregation myagg = (Aggregation)agg;
- if (myagg.isNull) {
- myagg.value = 0;
- myagg.isNull = false;
- }
-
+ myagg.initIfNull();
+
if (inputVector.isRepeating) {
if (inputVector.noNulls || !inputVector.isNull[0]) {
myagg.value += batchSize;
@@ -96,11 +196,11 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
myagg.value += batchSize;
return;
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull);
}
else {
- iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java
index 5f595c5..c63892c 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -52,6 +53,13 @@
static class Aggregation implements AggregationBuffer {
long value;
boolean isNull;
+
+ public void initIfNull() {
+ if (isNull) {
+ isNull = false;
+ value = 0;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -62,17 +70,112 @@ public VectorUDAFCountLong(VectorExpression inputExpression) {
this.inputExpression = inputExpression;
result = new LongWritable(0);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ if (inputVector.noNulls) {
+ // if there are no nulls then the iteration is the same on all cases
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, batchSize);
+ } else if (!batch.selectedInUse) {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ batchSize, inputVector.isNull);
+ } else if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ batchSize, batch.selected, inputVector.isNull);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+ }
+
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- LongColumnVector inputVector = (LongColumnVector)unit.
+ LongColumnVector inputVector = (LongColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -80,11 +183,8 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
Aggregation myagg = (Aggregation)agg;
- if (myagg.isNull) {
- myagg.value = 0;
- myagg.isNull = false;
- }
-
+ myagg.initIfNull();
+
if (inputVector.isRepeating) {
if (inputVector.noNulls || !inputVector.isNull[0]) {
myagg.value += batchSize;
@@ -96,11 +196,11 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
myagg.value += batchSize;
return;
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull);
}
else {
- iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxDouble.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxDouble.java
index 7616752..bc7f852 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxDouble.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxDouble.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -53,6 +54,15 @@
static private final class Aggregation implements AggregationBuffer {
double value;
boolean isNull;
+
+ public void checkValue(double value) {
+ if (isNull) {
+ isNull = false;
+ this.value = value;
+ } else if (value > this.value) {
+ this.value = value;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -64,15 +74,205 @@ public VectorUDAFMaxDouble(VectorExpression inputExpression) {
result = new DoubleWritable();
}
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+ return myagg;
+ }
+
+@Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+ double[] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize);
+ }
+ }
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize, inputVector.isNull);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double[] values,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[selection[i]]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double[] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[i]);
+ }
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double[] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ j);
+ myagg.checkValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double[] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[i]);
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throws HiveException {
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- DoubleColumnVector inputVector = (DoubleColumnVector)unit.
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -91,17 +291,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throw
return;
}
- if (!unit.selectedInUse && inputVector.noNulls) {
+ if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxLong.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxLong.java
index 3c16d33..6ba416e 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxLong.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxLong.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -53,6 +54,15 @@
static private final class Aggregation implements AggregationBuffer {
long value;
boolean isNull;
+
+ public void checkValue(long value) {
+ if (isNull) {
+ isNull = false;
+ this.value = value;
+ } else if (value > this.value) {
+ this.value = value;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -64,15 +74,205 @@ public VectorUDAFMaxLong(VectorExpression inputExpression) {
result = new LongWritable();
}
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+ return myagg;
+ }
+
+@Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+ long[] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize);
+ }
+ }
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize, inputVector.isNull);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long[] values,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[selection[i]]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long[] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[i]);
+ }
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long[] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ j);
+ myagg.checkValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long[] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[i]);
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throws HiveException {
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- LongColumnVector inputVector = (LongColumnVector)unit.
+ LongColumnVector inputVector = (LongColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -91,17 +291,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throw
return;
}
- if (!unit.selectedInUse && inputVector.noNulls) {
+ if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinDouble.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinDouble.java
index 7067c1c..d982fc2 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinDouble.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinDouble.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -53,6 +54,15 @@
static private final class Aggregation implements AggregationBuffer {
double value;
boolean isNull;
+
+ public void checkValue(double value) {
+ if (isNull) {
+ isNull = false;
+ this.value = value;
+ } else if (value < this.value) {
+ this.value = value;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -64,15 +74,205 @@ public VectorUDAFMinDouble(VectorExpression inputExpression) {
result = new DoubleWritable();
}
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+ return myagg;
+ }
+
+@Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+ double[] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize);
+ }
+ }
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize, inputVector.isNull);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double[] values,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[selection[i]]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double[] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[i]);
+ }
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double[] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ j);
+ myagg.checkValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ double[] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[i]);
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throws HiveException {
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- DoubleColumnVector inputVector = (DoubleColumnVector)unit.
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -91,17 +291,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throw
return;
}
- if (!unit.selectedInUse && inputVector.noNulls) {
+ if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinLong.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinLong.java
index 9f813b2..a8f5531 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinLong.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinLong.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -53,6 +54,15 @@
static private final class Aggregation implements AggregationBuffer {
long value;
boolean isNull;
+
+ public void checkValue(long value) {
+ if (isNull) {
+ isNull = false;
+ this.value = value;
+ } else if (value < this.value) {
+ this.value = value;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -64,15 +74,205 @@ public VectorUDAFMinLong(VectorExpression inputExpression) {
result = new LongWritable();
}
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+ return myagg;
+ }
+
+@Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+ long[] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize);
+ }
+ }
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize, inputVector.isNull);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long[] values,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[selection[i]]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long[] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[i]);
+ }
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long[] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ j);
+ myagg.checkValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ long[] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[i]);
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throws HiveException {
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- LongColumnVector inputVector = (LongColumnVector)unit.
+ LongColumnVector inputVector = (LongColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -91,17 +291,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throw
return;
}
- if (!unit.selectedInUse && inputVector.noNulls) {
+ if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopDouble.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopDouble.java
index 022b449..a4084b0 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopDouble.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopDouble.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates
.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -50,13 +51,13 @@
/**
/* class for storing the current aggregate value.
*/
- static private final class Aggregation implements AggregationBuffer {
+ private static final class Aggregation implements AggregationBuffer {
double sum;
long count;
double variance;
boolean isNull;
- public void init () {
+ public void init() {
isNull = false;
sum = 0;
count = 0;
@@ -86,7 +87,7 @@ public VectorUDAFStdPopDouble(VectorExpression inputExpression) {
initPartialResultInspector();
}
- private void initPartialResultInspector () {
+ private void initPartialResultInspector() {
ArrayList foi = new ArrayList();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
@@ -99,17 +100,200 @@ private void initPartialResultInspector () {
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ double[] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ iterateRepeatingNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector[0], batchSize);
+ }
+ }
+ else if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize,
+ inputVector.isNull, batch.selected);
+ }
+
+ }
+ private void iterateRepeatingNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double value,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize,
+ boolean[] isNull,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ int i = selected[j];
+ if (!isNull[i]) {
+ double value = vector[i];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize,
+ int[] selected) {
+
+ for (int i=0; i< batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ double value = vector[selected[i]];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateNoSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for(int i=0;i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- DoubleColumnVector inputVector = (DoubleColumnVector)unit.
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
iterateRepeatingNoNulls(myagg, vector[0], batchSize);
}
}
- else if (!unit.selectedInUse && inputVector.noNulls) {
+ else if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopLong.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopLong.java
index 9f658d1..28fdb36 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopLong.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopLong.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates
.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -50,13 +51,13 @@
/**
/* class for storing the current aggregate value.
*/
- static private final class Aggregation implements AggregationBuffer {
+ private static final class Aggregation implements AggregationBuffer {
double sum;
long count;
double variance;
boolean isNull;
- public void init () {
+ public void init() {
isNull = false;
sum = 0;
count = 0;
@@ -86,7 +87,7 @@ public VectorUDAFStdPopLong(VectorExpression inputExpression) {
initPartialResultInspector();
}
- private void initPartialResultInspector () {
+ private void initPartialResultInspector() {
ArrayList foi = new ArrayList();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
@@ -99,17 +100,200 @@ private void initPartialResultInspector () {
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ long[] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ iterateRepeatingNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector[0], batchSize);
+ }
+ }
+ else if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize,
+ inputVector.isNull, batch.selected);
+ }
+
+ }
+ private void iterateRepeatingNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize,
+ boolean[] isNull,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ int i = selected[j];
+ if (!isNull[i]) {
+ long value = vector[i];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize,
+ int[] selected) {
+
+ for (int i=0; i< batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ long value = vector[selected[i]];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateNoSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for(int i=0;i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- LongColumnVector inputVector = (LongColumnVector)unit.
+ LongColumnVector inputVector = (LongColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
iterateRepeatingNoNulls(myagg, vector[0], batchSize);
}
}
- else if (!unit.selectedInUse && inputVector.noNulls) {
+ else if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampDouble.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampDouble.java
index cb5f47d..4fa52ff 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampDouble.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampDouble.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates
.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -50,13 +51,13 @@
/**
/* class for storing the current aggregate value.
*/
- static private final class Aggregation implements AggregationBuffer {
+ private static final class Aggregation implements AggregationBuffer {
double sum;
long count;
double variance;
boolean isNull;
- public void init () {
+ public void init() {
isNull = false;
sum = 0;
count = 0;
@@ -86,7 +87,7 @@ public VectorUDAFStdSampDouble(VectorExpression inputExpression) {
initPartialResultInspector();
}
- private void initPartialResultInspector () {
+ private void initPartialResultInspector() {
ArrayList foi = new ArrayList();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
@@ -99,17 +100,200 @@ private void initPartialResultInspector () {
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ double[] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ iterateRepeatingNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector[0], batchSize);
+ }
+ }
+ else if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize,
+ inputVector.isNull, batch.selected);
+ }
+
+ }
+ private void iterateRepeatingNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double value,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize,
+ boolean[] isNull,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ int i = selected[j];
+ if (!isNull[i]) {
+ double value = vector[i];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize,
+ int[] selected) {
+
+ for (int i=0; i< batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ double value = vector[selected[i]];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateNoSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for(int i=0;i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- DoubleColumnVector inputVector = (DoubleColumnVector)unit.
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
iterateRepeatingNoNulls(myagg, vector[0], batchSize);
}
}
- else if (!unit.selectedInUse && inputVector.noNulls) {
+ else if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampLong.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampLong.java
index 882795b..551ae8a 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampLong.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampLong.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates
.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -50,13 +51,13 @@
/**
/* class for storing the current aggregate value.
*/
- static private final class Aggregation implements AggregationBuffer {
+ private static final class Aggregation implements AggregationBuffer {
double sum;
long count;
double variance;
boolean isNull;
- public void init () {
+ public void init() {
isNull = false;
sum = 0;
count = 0;
@@ -86,7 +87,7 @@ public VectorUDAFStdSampLong(VectorExpression inputExpression) {
initPartialResultInspector();
}
- private void initPartialResultInspector () {
+ private void initPartialResultInspector() {
ArrayList foi = new ArrayList();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
@@ -99,17 +100,200 @@ private void initPartialResultInspector () {
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ long[] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ iterateRepeatingNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector[0], batchSize);
+ }
+ }
+ else if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize,
+ inputVector.isNull, batch.selected);
+ }
+
+ }
+ private void iterateRepeatingNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize,
+ boolean[] isNull,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ int i = selected[j];
+ if (!isNull[i]) {
+ long value = vector[i];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize,
+ int[] selected) {
+
+ for (int i=0; i< batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ long value = vector[selected[i]];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateNoSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for(int i=0;i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- LongColumnVector inputVector = (LongColumnVector)unit.
+ LongColumnVector inputVector = (LongColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
iterateRepeatingNoNulls(myagg, vector[0], batchSize);
}
}
- else if (!unit.selectedInUse && inputVector.noNulls) {
+ else if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumDouble.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumDouble.java
index b4b425d..a2e8fb3 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumDouble.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumDouble.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -53,6 +54,15 @@
static private final class Aggregation implements AggregationBuffer {
double sum;
boolean isNull;
+
+ public void sumValue(double value) {
+ if (isNull) {
+ sum = value;
+ isNull = false;
+ } else {
+ sum += value;
+ }
+ }
}
VectorExpression inputExpression;
@@ -63,17 +73,207 @@ public VectorUDAFSumDouble(VectorExpression inputExpression) {
this.inputExpression = inputExpression;
result = new DoubleWritable();
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+ long[] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batchSize);
+ }
+ }
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batchSize, inputVector.isNull);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(values[selection[i]]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- DoubleColumnVector inputVector = (DoubleColumnVector)unit.
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -94,17 +294,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
return;
}
- if (!unit.selectedInUse && inputVector.noNulls) {
+ if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumLong.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumLong.java
index 9420aa2..71b2e3d 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumLong.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumLong.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -53,6 +54,15 @@
static private final class Aggregation implements AggregationBuffer {
long sum;
boolean isNull;
+
+ public void sumValue(long value) {
+ if (isNull) {
+ sum = value;
+ isNull = false;
+ } else {
+ sum += value;
+ }
+ }
}
VectorExpression inputExpression;
@@ -63,17 +73,207 @@ public VectorUDAFSumLong(VectorExpression inputExpression) {
this.inputExpression = inputExpression;
result = new LongWritable();
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+ long[] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batchSize);
+ }
+ }
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batchSize, inputVector.isNull);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(values[selection[i]]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- LongColumnVector inputVector = (LongColumnVector)unit.
+ LongColumnVector inputVector = (LongColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -94,17 +294,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
return;
}
- if (!unit.selectedInUse && inputVector.noNulls) {
+ if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopDouble.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopDouble.java
index 0d58bb9..2dfbfa3 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopDouble.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopDouble.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates
.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -50,13 +51,13 @@
/**
/* class for storing the current aggregate value.
*/
- static private final class Aggregation implements AggregationBuffer {
+ private static final class Aggregation implements AggregationBuffer {
double sum;
long count;
double variance;
boolean isNull;
- public void init () {
+ public void init() {
isNull = false;
sum = 0;
count = 0;
@@ -86,7 +87,7 @@ public VectorUDAFVarPopDouble(VectorExpression inputExpression) {
initPartialResultInspector();
}
- private void initPartialResultInspector () {
+ private void initPartialResultInspector() {
ArrayList foi = new ArrayList();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
@@ -99,17 +100,200 @@ private void initPartialResultInspector () {
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ double[] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ iterateRepeatingNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector[0], batchSize);
+ }
+ }
+ else if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize,
+ inputVector.isNull, batch.selected);
+ }
+
+ }
+ private void iterateRepeatingNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double value,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize,
+ boolean[] isNull,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ int i = selected[j];
+ if (!isNull[i]) {
+ double value = vector[i];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize,
+ int[] selected) {
+
+ for (int i=0; i< batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ double value = vector[selected[i]];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateNoSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for(int i=0;i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- DoubleColumnVector inputVector = (DoubleColumnVector)unit.
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
iterateRepeatingNoNulls(myagg, vector[0], batchSize);
}
}
- else if (!unit.selectedInUse && inputVector.noNulls) {
+ else if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopLong.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopLong.java
index 2c27e8f..de4811d 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopLong.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopLong.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates
.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -50,13 +51,13 @@
/**
/* class for storing the current aggregate value.
*/
- static private final class Aggregation implements AggregationBuffer {
+ private static final class Aggregation implements AggregationBuffer {
double sum;
long count;
double variance;
boolean isNull;
- public void init () {
+ public void init() {
isNull = false;
sum = 0;
count = 0;
@@ -86,7 +87,7 @@ public VectorUDAFVarPopLong(VectorExpression inputExpression) {
initPartialResultInspector();
}
- private void initPartialResultInspector () {
+ private void initPartialResultInspector() {
ArrayList foi = new ArrayList();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
@@ -99,17 +100,200 @@ private void initPartialResultInspector () {
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ long[] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ iterateRepeatingNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector[0], batchSize);
+ }
+ }
+ else if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize,
+ inputVector.isNull, batch.selected);
+ }
+
+ }
+ private void iterateRepeatingNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize,
+ boolean[] isNull,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ int i = selected[j];
+ if (!isNull[i]) {
+ long value = vector[i];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize,
+ int[] selected) {
+
+ for (int i=0; i< batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ long value = vector[selected[i]];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateNoSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for(int i=0;i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- LongColumnVector inputVector = (LongColumnVector)unit.
+ LongColumnVector inputVector = (LongColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
iterateRepeatingNoNulls(myagg, vector[0], batchSize);
}
}
- else if (!unit.selectedInUse && inputVector.noNulls) {
+ else if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampDouble.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampDouble.java
index 4156e18..5a21f44 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampDouble.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampDouble.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates
.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -50,13 +51,13 @@
/**
/* class for storing the current aggregate value.
*/
- static private final class Aggregation implements AggregationBuffer {
+ private static final class Aggregation implements AggregationBuffer {
double sum;
long count;
double variance;
boolean isNull;
- public void init () {
+ public void init() {
isNull = false;
sum = 0;
count = 0;
@@ -86,7 +87,7 @@ public VectorUDAFVarSampDouble(VectorExpression inputExpression) {
initPartialResultInspector();
}
- private void initPartialResultInspector () {
+ private void initPartialResultInspector() {
ArrayList foi = new ArrayList();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
@@ -99,17 +100,200 @@ private void initPartialResultInspector () {
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ double[] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ iterateRepeatingNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector[0], batchSize);
+ }
+ }
+ else if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize,
+ inputVector.isNull, batch.selected);
+ }
+
+ }
+ private void iterateRepeatingNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double value,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize,
+ boolean[] isNull,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ int i = selected[j];
+ if (!isNull[i]) {
+ double value = vector[i];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize,
+ int[] selected) {
+
+ for (int i=0; i< batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ double value = vector[selected[i]];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateNoSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for(int i=0;i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ double[] vector,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- DoubleColumnVector inputVector = (DoubleColumnVector)unit.
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
iterateRepeatingNoNulls(myagg, vector[0], batchSize);
}
}
- else if (!unit.selectedInUse && inputVector.noNulls) {
+ else if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampLong.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampLong.java
index 7042382..7b88c4f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampLong.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampLong.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates
.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -50,13 +51,13 @@
/**
/* class for storing the current aggregate value.
*/
- static private final class Aggregation implements AggregationBuffer {
+ private static final class Aggregation implements AggregationBuffer {
double sum;
long count;
double variance;
boolean isNull;
- public void init () {
+ public void init() {
isNull = false;
sum = 0;
count = 0;
@@ -86,7 +87,7 @@ public VectorUDAFVarSampLong(VectorExpression inputExpression) {
initPartialResultInspector();
}
- private void initPartialResultInspector () {
+ private void initPartialResultInspector() {
ArrayList foi = new ArrayList();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
@@ -99,17 +100,200 @@ private void initPartialResultInspector () {
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ long[] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ iterateRepeatingNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector[0], batchSize);
+ }
+ }
+ else if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize,
+ inputVector.isNull, batch.selected);
+ }
+
+ }
+ private void iterateRepeatingNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize,
+ boolean[] isNull,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ int i = selected[j];
+ if (!isNull[i]) {
+ long value = vector[i];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize,
+ int[] selected) {
+
+ for (int i=0; i< batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ long value = vector[selected[i]];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateNoSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for(int i=0;i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] vector,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- LongColumnVector inputVector = (LongColumnVector)unit.
+ LongColumnVector inputVector = (LongColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
iterateRepeatingNoNulls(myagg, vector[0], batchSize);
}
}
- else if (!unit.selectedInUse && inputVector.noNulls) {
+ else if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java
index 53d9a7a..fec7cbc 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java
@@ -269,6 +269,16 @@ private void generate() throws Exception {
generateColumnArithmeticColumn(tdesc);
} else if (tdesc[0].equals("ColumnUnaryMinus")) {
generateColumnUnaryMinus(tdesc);
+ } else if (tdesc[0].equals("VectorUDAFCount")) {
+ generateVectorUDAFCount(tdesc);
+ } else if (tdesc[0].equals("VectorUDAFMinMax")) {
+ generateVectorUDAFMinMax(tdesc);
+ } else if (tdesc[0].equals("VectorUDAFSum")) {
+ generateVectorUDAFSum(tdesc);
+ } else if (tdesc[0].equals("VectorUDAFAvg")) {
+ generateVectorUDAFAvg(tdesc);
+ } else if (tdesc[0].equals("VectorUDAFVar")) {
+ generateVectorUDAFVar(tdesc);
} else if (tdesc[0].equals("FilterStringColumnCompareScalar")) {
generateFilterStringColumnCompareScalar(tdesc);
} else if (tdesc[0].equals("FilterStringColumnCompareColumn")) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFAvg.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFAvg.txt
new file mode 100644
index 0000000..d85346d
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFAvg.txt
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
+ VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+
+import org.apache.hadoop.hive.ql.io.orc.*;
+
+@Description(name = "avg", value = "_FUNC_(expr) - Returns the average value of expr (vectorized, type: )")
+public class extends VectorAggregateExpression {
+
+ /** class for storing the current aggregate value. */
+ static class Aggregation implements AggregationBuffer {
+ sum;
+ long count;
+ boolean isNull;
+
+ public void sumValue( value) {
+ if (isNull) {
+ sum = value;
+ count = 1;
+ isNull = false;
+ } else {
+ sum += value;
+ count++;
+ }
+ }
+ }
+
+ private VectorExpression inputExpression;
+ private Object[] partialResult;
+ private LongWritable resultCount;
+ private DoubleWritable resultSum;
+ private StructObjectInspector soi;
+
+ public (VectorExpression inputExpression) {
+ super();
+ this.inputExpression = inputExpression;
+ partialResult = new Object[2];
+ resultCount = new LongWritable();
+ resultSum = new DoubleWritable();
+ partialResult[0] = resultCount;
+ partialResult[1] = resultSum;
+
+ initPartialResultInspector();
+ }
+
+ private void initPartialResultInspector() {
+ ArrayList foi = new ArrayList();
+ foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
+ foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ ArrayList fname = new ArrayList();
+ fname.add("count");
+ fname.add("sum");
+ soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
+ }
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(bufferIndex);
+ return myagg;
+ }
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+ long[] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize);
+ }
+ }
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize, inputVector.isNull);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[selection[i]]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ j);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ inputVector = ()batch.cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation)agg;
+
+ [] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ if (myagg.isNull) {
+ myagg.isNull = false;
+ myagg.sum = 0;
+ myagg.count = 0;
+ }
+ myagg.sum += vector[0]*batchSize;
+ myagg.count += batchSize;
+ }
+ return;
+ }
+
+ if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNulls(myagg, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
+ }
+ }
+
+ private void iterateSelectionHasNulls(
+ Aggregation myagg,
+ [] vector,
+ int batchSize,
+ boolean[] isNull,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ if (!isNull[i]) {
+ value = vector[i];
+ if (myagg.isNull) {
+ myagg.isNull = false;
+ myagg.sum = 0;
+ myagg.count = 0;
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ }
+ }
+ }
+
+ private void iterateSelectionNoNulls(
+ Aggregation myagg,
+ [] vector,
+ int batchSize,
+ int[] selected) {
+
+ if (myagg.isNull) {
+ myagg.isNull = false;
+ myagg.sum = 0;
+ myagg.count = 0;
+ }
+
+ for (int i=0; i< batchSize; ++i) {
+ value = vector[selected[i]];
+ myagg.sum += value;
+ myagg.count += 1;
+ }
+ }
+
+ private void iterateNoSelectionHasNulls(
+ Aggregation myagg,
+ [] vector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for(int i=0;i value = vector[i];
+ if (myagg.isNull) {
+ myagg.isNull = false;
+ myagg.sum = 0;
+ myagg.count = 0;
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNulls(
+ Aggregation myagg,
+ [] vector,
+ int batchSize) {
+ if (myagg.isNull) {
+ myagg.isNull = false;
+ myagg.sum = 0;
+ myagg.count = 0;
+ }
+
+ for (int i=0;i value = vector[i];
+ myagg.sum += value;
+ myagg.count += 1;
+ }
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ return new Aggregation();
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ Aggregation myAgg = (Aggregation) agg;
+ myAgg.isNull = true;
+ }
+
+ @Override
+ public Object evaluateOutput(
+ AggregationBuffer agg) throws HiveException {
+ Aggregation myagg = (Aggregation) agg;
+ if (myagg.isNull) {
+ return null;
+ }
+ else {
+ assert(0 < myagg.count);
+ resultCount.set (myagg.count);
+ resultSum.set (myagg.sum);
+ return partialResult;
+ }
+ }
+
+ @Override
+ public ObjectInspector getOutputObjectInspector() {
+ return soi;
+ }
+}
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt
index b51e526..54c80ec 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -52,6 +53,13 @@ public class extends VectorAggregateExpression {
static class Aggregation implements AggregationBuffer {
long value;
boolean isNull;
+
+ public void initIfNull() {
+ if (isNull) {
+ isNull = false;
+ value = 0;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -62,17 +70,112 @@ public class extends VectorAggregateExpression {
this.inputExpression = inputExpression;
result = new LongWritable(0);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ inputVector = ()batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ if (inputVector.noNulls) {
+ // if there are no nulls then the iteration is the same on all cases
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, batchSize);
+ } else if (!batch.selectedInUse) {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ batchSize, inputVector.isNull);
+ } else if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ batchSize, batch.selected, inputVector.isNull);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+ }
+
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- inputVector = ()unit.
+ inputVector = ()batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -80,11 +183,8 @@ public class extends VectorAggregateExpression {
Aggregation myagg = (Aggregation)agg;
- if (myagg.isNull) {
- myagg.value = 0;
- myagg.isNull = false;
- }
-
+ myagg.initIfNull();
+
if (inputVector.isRepeating) {
if (inputVector.noNulls || !inputVector.isNull[0]) {
myagg.value += batchSize;
@@ -96,11 +196,11 @@ public class extends VectorAggregateExpression {
myagg.value += batchSize;
return;
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull);
}
else {
- iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt
index 8c952c1..d00d9ae 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -53,6 +54,15 @@ public class extends VectorAggregateExpression {
static private final class Aggregation implements AggregationBuffer {
value;
boolean isNull;
+
+ public void checkValue( value) {
+ if (isNull) {
+ isNull = false;
+ this.value = value;
+ } else if (value this.value) {
+ this.value = value;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -64,15 +74,205 @@ public class extends VectorAggregateExpression {
result = new ();
}
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+ return myagg;
+ }
+
+@Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ inputVector = ()batch.
+ cols[this.inputExpression.getOutputColumn()];
+ [] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize);
+ }
+ }
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ vector, batchSize, inputVector.isNull);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ [] values,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[selection[i]]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ [] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[i]);
+ }
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(value);
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ [] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ j);
+ myagg.checkValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ [] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(values[i]);
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throws HiveException {
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- inputVector = ()unit.
+ inputVector = ()batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -91,17 +291,17 @@ public class extends VectorAggregateExpression {
return;
}
- if (!unit.selectedInUse && inputVector.noNulls) {
+ if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt
index b321654..aaaa6ad 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -50,9 +51,18 @@ public class extends VectorAggregateExpression {
/**
/* class for storing the current aggregate value.
*/
- static private final class Aggregation implements AggregationBuffer {
+ private static final class Aggregation implements AggregationBuffer {
sum;
boolean isNull;
+
+ public void sumValue( value) {
+ if (isNull) {
+ sum = value;
+ isNull = false;
+ } else {
+ sum += value;
+ }
+ }
}
VectorExpression inputExpression;
@@ -63,17 +73,207 @@ public class extends VectorAggregateExpression {
this.inputExpression = inputExpression;
result = new ();
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+ long[] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batchSize);
+ }
+ }
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batchSize, inputVector.isNull);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(values[selection[i]]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- inputVector = ()unit.
+ inputVector = ()batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -94,17 +294,17 @@ public class extends VectorAggregateExpression {
return;
}
- if (!unit.selectedInUse && inputVector.noNulls) {
+ if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt
index 29abddf..daae57b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates
.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -50,13 +51,13 @@ public class extends VectorAggregateExpression {
/**
/* class for storing the current aggregate value.
*/
- static private final class Aggregation implements AggregationBuffer {
+ private static final class Aggregation implements AggregationBuffer {
double sum;
long count;
double variance;
boolean isNull;
- public void init () {
+ public void init() {
isNull = false;
sum = 0;
count = 0;
@@ -86,7 +87,7 @@ public class extends VectorAggregateExpression {
initPartialResultInspector();
}
- private void initPartialResultInspector () {
+ private void initPartialResultInspector() {
ArrayList foi = new ArrayList();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
@@ -99,17 +100,200 @@ public class extends VectorAggregateExpression {
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ inputVector = ()batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ [] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ iterateRepeatingNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector[0], batchSize);
+ }
+ }
+ else if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize,
+ inputVector.isNull, batch.selected);
+ }
+
+ }
+ private void iterateRepeatingNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ value,
+ int batchSize) {
+
+ for (int i=0; i 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ [] vector,
+ int batchSize,
+ boolean[] isNull,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ int i = selected[j];
+ if (!isNull[i]) {
+ value = vector[i];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ [] vector,
+ int batchSize,
+ int[] selected) {
+
+ for (int i=0; i< batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ value = vector[selected[i]];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateNoSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ [] vector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for(int i=0;i value = vector[i];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ [] vector,
+ int batchSize) {
+
+ for (int i=0; i value = vector[i];
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- inputVector = ()unit.
+ inputVector = ()batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -124,17 +308,17 @@ public class extends VectorAggregateExpression {
iterateRepeatingNoNulls(myagg, vector[0], batchSize);
}
}
- else if (!unit.selectedInUse && inputVector.noNulls) {
+ else if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
index 8415498..b3b5cd2 100644
--- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
@@ -28,8 +28,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.hive.ql.exec.vector.util.FakeCaptureOutputOperator;
import org.apache.hadoop.hive.ql.exec.vector.util.FakeVectorRowBatchFromConcat;
@@ -93,8 +95,144 @@ private static GroupByDesc buildGroupByDesc(
return desc;
}
+ private static GroupByDesc buildKeyGroupByDesc(
+ VectorizationContext ctx,
+ String aggregate,
+ String column,
+ String key) {
+
+ GroupByDesc desc = buildGroupByDesc(ctx, aggregate, column);
+
+ ExprNodeDesc keyExp = buildColumnDesc(ctx, key);
+ ArrayList keys = new ArrayList();
+ keys.add(keyExp);
+ desc.setKeys(keys);
+
+ return desc;
+ }
+
+ @Test
+ public void testMinLongKeyGroupByCompactBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "min",
+ 2,
+ Arrays.asList(new Long[]{01L,1L,2L,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(1L, 5L, 2L, 7L));
+ }
+
+ @Test
+ public void testMinLongKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "min",
+ 4,
+ Arrays.asList(new Long[]{01L,1L,2L,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(1L, 5L, 2L, 7L));
+ }
+
+ @Test
+ public void testMinLongKeyGroupByCrossBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "min",
+ 2,
+ Arrays.asList(new Long[]{01L,2L,1L,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(1L, 7L, 2L, 5L));
+ }
+
@Test
- public void testMinLongSimple () throws HiveException {
+ public void testMinLongNullKeyGroupByCrossBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "min",
+ 2,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(null, 7L, 2L, 5L));
+ }
+
+ @Test
+ public void testMinLongNullKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "min",
+ 4,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(null, 7L, 2L, 5L));
+ }
+
+ @Test
+ public void testMaxLongNullKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "max",
+ 4,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(null, 13L, 2L, 19L));
+ }
+
+ @Test
+ public void testCountLongNullKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "count",
+ 4,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(null, 2L, 2L, 2L));
+ }
+
+ @Test
+ public void testSumLongNullKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "sum",
+ 4,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(null, 20L, 2L, 24L));
+ }
+
+ @Test
+ public void testAvgLongNullKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "avg",
+ 4,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(null, 10.0, 2L, 12.0));
+ }
+
+ @Test
+ public void testVarLongNullKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "variance",
+ 4,
+ Arrays.asList(new Long[]{null,2L,01L,02L,01L,01L}),
+ Arrays.asList(new Long[]{13L, 5L,18L,19L,12L,15L}),
+ buildHashMap(null, 0.0, 2L, 49.0, 01L, 6.0));
+ }
+
+ @Test
+ public void testMinNullLongNullKeyGroupBy() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "min",
+ 4,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{null, null, null, null}),
+ buildHashMap(null, null, 2L, null));
+ }
+
+ @Test
+ public void testMinLongGroupBy() throws HiveException {
+ testAggregateLongAggregate(
+ "min",
+ 2,
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ 5L);
+ }
+
+
+ @Test
+ public void testMinLongSimple() throws HiveException {
testAggregateLongAggregate(
"min",
2,
@@ -735,7 +873,28 @@ public void testAggregateLongRepeats (
new Long[] {value}, repeat, batchSize);
testAggregateLongIterable (aggregateName, fdr, expected);
}
+
+ public HashMap