diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java index f9da781..fbbdc0f 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java @@ -51,7 +51,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; -import org.apache.hadoop.hive.serde.serdeConstants;; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java new file mode 100644 index 0000000..35712d0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * A hash map key wrapper for vectorized processing. + * It stores the key values as primitives in arrays for each supported primitive type. + * This works in conjunction with + * {@link org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapperBatch VectorHashKeyWrapperBatch} + * to hash vectorized processing units (batches). + */ +public class VectorHashKeyWrapper extends KeyWrapper { + + private long[] longValues; + private double[] doubleValues; + private boolean[] isNull; + private int hashcode; + + public VectorHashKeyWrapper(int longValuesCount, int doubleValuesCount) { + longValues = new long[longValuesCount]; + doubleValues = new double[doubleValuesCount]; + isNull = new boolean[longValuesCount + doubleValuesCount]; + } + + private VectorHashKeyWrapper() { + } + + @Override + void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException { + throw new HiveException("Should not be called"); + } + + @Override + void setHashKey() { + hashcode = Arrays.hashCode(longValues) ^ + Arrays.hashCode(doubleValues) ^ + Arrays.hashCode(isNull); + } + + @Override + public int hashCode() { + return hashcode; + } + + @Override + public boolean equals(Object that) { + if (that instanceof VectorHashKeyWrapper) { + VectorHashKeyWrapper keyThat = (VectorHashKeyWrapper)that; + return hashcode == keyThat.hashcode && + Arrays.equals(longValues, keyThat.longValues) && + Arrays.equals(doubleValues, keyThat.doubleValues) && + Arrays.equals(isNull, keyThat.isNull); + } + return false; + } + + @Override + protected Object clone() { + VectorHashKeyWrapper clone = new VectorHashKeyWrapper(); + clone.longValues = longValues.clone(); + clone.doubleValues = doubleValues.clone(); + clone.isNull = isNull.clone(); + clone.hashcode = hashcode; + return clone; + } + + @Override + public KeyWrapper copyKey() { + return (KeyWrapper) clone(); + } + + @Override + void copyKey(KeyWrapper oldWrapper) { + // TODO Auto-generated method stub + + } + + @Override + Object[] getKeyArray() { + // TODO Auto-generated method stub + return null; + } + + public void assignDouble(int index, double d) { + doubleValues[index] = d; + isNull[longValues.length + index] = false; + } + + public void assignNullDouble(int index) { + doubleValues[index] = 0; // assign 0 to simplify hashcode + isNull[longValues.length + index] = true; + } + + public void assignLong(int index, long v) { + longValues[index] = v; + isNull[index] = false; + } + + public void assignNullLong(int index) { + longValues[index] = 0; // assign 0 to simplify hashcode + isNull[index] = true; + } + + @Override + public String toString() + { + return String.format("%d[%s] %d[%s]", + longValues.length, Arrays.toString(longValues), + doubleValues.length, Arrays.toString(doubleValues)); + } + + public boolean getIsNull(int i) { + return isNull[i]; + } + + public long getLongValue(int i) { + return longValues[i]; + } + + public double getDoubleValue(int i) { + return doubleValues[i - longValues.length]; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java new file mode 100644 index 0000000..c23614c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.Arrays; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; + +/** + * Class for handling vectorized hash map key wrappers. It evaluates the key columns in a + * row batch in a vectorized fashion. + * This class stores additional information about keys needed to evaluate and output the key values. + * + */ +public class VectorHashKeyWrapperBatch { + + /** + * Helper class for looking up a key value based on key index + * + */ + private static class KeyLookupHelper { + public int longIndex; + public int doubleIndex; + } + + /** + * The key expressions that require evaluation and output the primitive values for each key. + */ + private VectorExpression[] keyExpressions; + + /** + * indices of LONG primitive keys + */ + private int[] longIndices; + + /** + * indices of DOUBLE primitive keys + */ + private int[] doubleIndices; + + /** + * pre-allocated batch size vector of keys wrappers. + * N.B. these keys are **mutable** and should never be used in a HashMap. + * Always clone the key wrapper to obtain an immutable keywrapper suitable + * to use a key in a HashMap. + */ + private VectorHashKeyWrapper[] vectorHashKeyWrappers; + + /** + * lookup vector to map from key index to primitive type index + */ + private KeyLookupHelper[] indexLookup; + + /** + * preallocated and reused LongWritable objects for emiting row mode key values + */ + private LongWritable[] longKeyValueOutput; + + /** + * preallocated and reused DoubleWritable objects for emiting row mode key values + */ + private DoubleWritable[] doubleKeyValueOutput; + + /** + * Accessor for the batch-sized array of key wrappers + */ + public VectorHashKeyWrapper[] getVectorHashKeyWrappers() { + return vectorHashKeyWrappers; + } + + /** + * Processes a batch: + * + * @param vrb + * @throws HiveException + */ + public void evaluateBatch (VectorizedRowBatch vrb) throws HiveException { + for(int i = 0; i < keyExpressions.length; ++i) { + keyExpressions[i].evaluate(vrb); + } + for(int i = 0; i< longIndices.length; ++i) { + int keyIndex = longIndices[i]; + int columnIndex = keyExpressions[keyIndex].getOutputColumn(); + LongColumnVector columnVector = (LongColumnVector) vrb.cols[columnIndex]; + if (columnVector.noNulls && !columnVector.isRepeating && !vrb.selectedInUse) { + assignLongNoNullsNoRepeatingNoSelection(i, vrb.size, columnVector); + } else if (columnVector.noNulls && !columnVector.isRepeating && vrb.selectedInUse) { + assignLongNoNullsNoRepeatingSelection(i, vrb.size, columnVector, vrb.selected); + } else if (columnVector.noNulls && columnVector.isRepeating) { + assignLongNoNullsRepeating(i, vrb.size, columnVector); + } else if (!columnVector.noNulls && !columnVector.isRepeating && !vrb.selectedInUse) { + assignLongNullsNoRepeatingNoSelection(i, vrb.size, columnVector); + } else if (!columnVector.noNulls && columnVector.isRepeating) { + assignLongNullsRepeating(i, vrb.size, columnVector); + } else if (!columnVector.noNulls && !columnVector.isRepeating && vrb.selectedInUse) { + assignLongNullsNoRepeatingSelection (i, vrb.size, columnVector, vrb.selected); + } else { + throw new HiveException (String.format("Unimplemented Long null/repeat/selected combination %b/%b/%b", + columnVector.noNulls, columnVector.isRepeating, vrb.selectedInUse)); + } + } + for(int i=0;i= 0) { + longKeyValueOutput[klh.longIndex].set(kw.getLongValue(i)); + return longKeyValueOutput[klh.longIndex]; + } else if (klh.doubleIndex >= 0) { + doubleKeyValueOutput[klh.doubleIndex].set(kw.getDoubleValue(i)); + return doubleKeyValueOutput[klh.doubleIndex]; + } else { + throw new HiveException(String.format( + "Internal inconsistent KeyLookupHelper at index [%d]:%d %d", + i, klh.longIndex, klh.doubleIndex)); + } + } +} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java new file mode 100644 index 0000000..030a73c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +/** + * This maps a batch to the aggregation buffers sets to use for each row (key) + * + */ +public class VectorAggregationBufferBatch { + + /** + * Batch sized array of aggregation buffer sets. + * The array is preallocated and is reused for each batch, but the individual entries + * will reference different aggregation buffer set from batch to batch. + * the array is not reset between batches, content past this.index will be stale. + */ + private VectorAggregationBufferRow[] aggregationBuffers; + + /** + * the selection vector that maps row within a batch to the + * specific aggregation buffer set to use. + */ + private int[] selection; + + /** + * versioning number gets incremented on each batch. This allows us to cache the selection + * mapping info in the aggregation buffer set themselves while still being able to + * detect stale info. + */ + private int version; + + /** + * Get the number of distinct aggregation buffer sets (ie. keys) used in current batch. + */ + private int distinctCount; + + /** + * the array of aggregation buffers for the current batch. + * content past the {@link #getDistinctBufferSetCount()} index + * is stale from previous batches. + * @return + */ + public VectorAggregationBufferRow[] getAggregationBuffers() { + return aggregationBuffers; + } + + /** + * number of distinct aggregation buffer sets (ie. keys) in the current batch. + * @return + */ + public int getDistinctBufferSetCount () { + return distinctCount; + } + + /** + * gets the selection vector to use for the current batch. This maps the batch rows by position + * (row number) to an index in the {@link #getAggregationBuffers()} array. + * @return + */ + public int[] getSelectionVector() { + return selection; + } + + public VectorAggregationBufferBatch() { + aggregationBuffers = new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE]; + selection = new int [VectorizedRowBatch.DEFAULT_SIZE]; + } + + /** + * resets the internal aggregation buffers sets index and increments the versioning + * used to optimize the selection vector population. + */ + public void startBatch() { + version++; + distinctCount = 0; + } + + /** + * assigns the given aggregation buffer set to a given batch row (by row number). + * populates the selection vector appropriately. This is where the versioning numbers + * play a role in determining if the index cached on the aggregation buffer set is stale. + */ + public void mapAggregationBufferSet(VectorAggregationBufferRow bufferSet, int row) { + if (version != bufferSet.getVersion()) { + bufferSet.setVersionAndIndex(version, distinctCount); + ++distinctCount; + } + aggregationBuffers[row] = bufferSet; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java new file mode 100644 index 0000000..7aa4b11 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; + +/** + * Represents a set of aggregation buffers to be used for a specific key for UDAF GROUP BY. + * + */ +public class VectorAggregationBufferRow { + private VectorAggregateExpression.AggregationBuffer[] aggregationBuffers; + private int version; + private int index; + + public VectorAggregationBufferRow( + VectorAggregateExpression.AggregationBuffer[] aggregationBuffers) { + this.aggregationBuffers = aggregationBuffers; + } + + /** + * returns the aggregation buffer for an aggregation expression, by index. + */ + public VectorAggregateExpression.AggregationBuffer getAggregationBuffer(int bufferIndex) { + return aggregationBuffers[bufferIndex]; + } + + /** + * returns the array of aggregation buffers (the entire set). + */ + public VectorAggregateExpression.AggregationBuffer[] getAggregationBuffers() { + return aggregationBuffers; + } + + /** + * Versioning used to detect staleness of the index cached for benefit of + * {@link org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferBatch VectorAggregationBufferBatch}. + */ + public int getVersion() { + return version; + } + + /** + * cached index used by VectorAggregationBufferBatch. + * @return + */ + public int getIndex() { + return index; + } + + /** + * accessor for VectorAggregationBufferBatch to set its caching info on this set. + */ + public void setVersionAndIndex(int version, int index) { + this.index = index; + this.version = version; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index bcee45c..91366dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -20,14 +20,20 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.KeyWrapper; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapper; +import org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapperBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates - .VectorAggregateExpression.AggregationBuffer; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; @@ -37,19 +43,44 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; /** - * Vectorized GROUP BY operator impelementation. Consumes the vectorized input and - * stores the aggregates operators intermediate states. Emits row mode output. + * Vectorized GROUP BY operator implementation. Consumes the vectorized input and + * stores the aggregate operators' intermediate states. Emits row mode output. * */ public class VectorGroupByOperator extends Operator implements Serializable { + private static final Log LOG = LogFactory.getLog( + VectorGroupByOperator.class.getName()); + private final VectorizationContext vContext; - protected transient VectorAggregateExpression[] aggregators; - protected transient AggregationBuffer[] aggregationBuffers; + /** + * This is the vector of aggregators. They are stateless and only implement + * the algorithm of how to compute the aggregation. state is kept in the + * aggregation buffers and is our responsibility to match the proper state for each key. + */ + private transient VectorAggregateExpression[] aggregators; + + /** + * Key vector expressions. + */ + private transient VectorExpression[] keyExpressions; + + /** + * The aggregation buffers to use for the current batch. + */ + private transient VectorAggregationBufferBatch aggregationBatchInfo; + + /** + * The current batch key wrappers. + * The very same instance gets reused for all batches. + */ + private transient VectorHashKeyWrapperBatch keyWrappersBatch; - transient int heartbeatInterval; - transient int countAfterReport; + /** + * The global key-aggregation hash map. + */ + private transient Map mapKeysAggregationBuffers; private static final long serialVersionUID = 1L; @@ -68,17 +99,22 @@ protected void initializeOp(Configuration hconf) throws HiveException { vContext.setOperatorType(OperatorType.GROUPBY); ArrayList aggrDesc = conf.getAggregators(); + keyExpressions = vContext.getVectorExpressions(conf.getKeys()); + + for(int i = 0; i < keyExpressions.length; ++i) { + objectInspectors.add(vContext.createObjectInspector(keyExpressions[i])); + } aggregators = new VectorAggregateExpression[aggrDesc.size()]; - aggregationBuffers = new AggregationBuffer[aggrDesc.size()]; for (int i = 0; i < aggrDesc.size(); ++i) { AggregationDesc desc = aggrDesc.get(i); aggregators[i] = vContext.getAggregatorExpression (desc); - aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer(); - aggregators[i].reset(aggregationBuffers[i]); - objectInspectors.add(aggregators[i].getOutputObjectInspector()); } + + keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); + aggregationBatchInfo = new VectorAggregationBufferBatch(); + mapKeysAggregationBuffers = new HashMap(); List outputFieldNames = conf.getOutputColumnNames(); outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector( @@ -94,22 +130,120 @@ protected void initializeOp(Configuration hconf) throws HiveException { @Override public void processOp(Object row, int tag) throws HiveException { - VectorizedRowBatch vrg = (VectorizedRowBatch) row; + VectorizedRowBatch batch = (VectorizedRowBatch) row; + + // First we traverse the batch to evaluate and prepare the KeyWrappers + // After this the KeyWrappers are properly set and hash code is computed + keyWrappersBatch.evaluateBatch(batch); + + // Next we locate the aggregation buffer set for each key + prepareBatchAggregationBufferSets(batch); + + // Finally, evaluate the aggregators + processAggregators(batch); + } + + /** + * Evaluates the aggregators on the current batch. + * The aggregationBatchInfo must have been prepared + * by calling {@link #prepareBatchAggregationBufferSets} first. + */ + private void processAggregators(VectorizedRowBatch batch) throws HiveException { + // We now have a vector of aggregation buffer sets to use for each row + // We can start computing the aggregates. + // If the number of distinct keys in the batch is 1 we can + // use the optimized code path of aggregateInput + VectorAggregationBufferRow[] aggregationBufferSets = + aggregationBatchInfo.getAggregationBuffers(); + if (aggregationBatchInfo.getDistinctBufferSetCount() == 1) { + VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = + aggregationBufferSets[0].getAggregationBuffers(); + for (int i = 0; i < aggregators.length; ++i) { + aggregators[i].aggregateInput(aggregationBuffers[i], batch); + } + } else { + for (int i = 0; i < aggregators.length; ++i) { + aggregators[i].aggregateInputSelection( + aggregationBufferSets, + i, + batch); + } + } + } + + /** + * Locates the aggregation buffer sets to use for each key in the current batch. + * The keyWrappersBatch must have evaluated the current batch first. + */ + private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws HiveException { + // The aggregation batch vector needs to know when we start a new batch + // to bump its internal version. + aggregationBatchInfo.startBatch(); + + // We now have to probe the global hash and find-or-allocate + // the aggregation buffers to use for each key present in the batch + VectorHashKeyWrapper[] keyWrappers = keyWrappersBatch.getVectorHashKeyWrappers(); + for (int i=0; i < batch.size; ++i) { + VectorHashKeyWrapper kw = keyWrappers[i]; + VectorAggregationBufferRow aggregationBuffer = mapKeysAggregationBuffers.get(kw); + if (null == aggregationBuffer) { + // the probe failed, we must allocate a set of aggregation buffers + // and push the (keywrapper,buffers) pair into the hash. + // is very important to clone the keywrapper, the one we have from our + // keyWrappersBatch is going to be reset/reused on next batch. + aggregationBuffer = allocateAggregationBuffer(); + mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer); + } + aggregationBatchInfo.mapAggregationBufferSet(aggregationBuffer, i); + } + } - //TODO: proper group by hash - for (int i = 0; i < aggregators.length; ++i) { - aggregators[i].aggregateInput(aggregationBuffers[i], vrg); + /** + * allocates a new aggregation buffer set. + */ + private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException { + VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = + new VectorAggregateExpression.AggregationBuffer[aggregators.length]; + for (int i=0; i < aggregators.length; ++i) { + aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer(); + aggregators[i].reset(aggregationBuffers[i]); } + VectorAggregationBufferRow bufferSet = new VectorAggregationBufferRow(aggregationBuffers); + return bufferSet; } @Override public void closeOp(boolean aborted) throws HiveException { if (!aborted) { - Object[] forwardCache = new Object[aggregators.length]; - for (int i = 0; i < aggregators.length; ++i) { - forwardCache[i] = aggregators[i].evaluateOutput(aggregationBuffers[i]); + Object[] forwardCache = new Object[keyExpressions.length + aggregators.length]; + if (keyExpressions.length == 0 && mapKeysAggregationBuffers.isEmpty()) { + + // if this is a global aggregation (no keys) and empty set, must still emit NULLs + VectorAggregationBufferRow emptyBuffers = allocateAggregationBuffer(); + for (int i = 0; i < aggregators.length; ++i) { + forwardCache[i] = aggregators[i].evaluateOutput(emptyBuffers.getAggregationBuffer(i)); + } + forward(forwardCache, outputObjInspector); + } else { + + /* Iterate the global (keywrapper,aggregationbuffers) map and emit + a row for each key */ + for(Map.Entry pair: + mapKeysAggregationBuffers.entrySet()){ + int fi = 0; + for (int i = 0; i < keyExpressions.length; ++i) { + VectorHashKeyWrapper kw = (VectorHashKeyWrapper)pair.getKey(); + forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (kw, i); + } + for (int i = 0; i < aggregators.length; ++i) { + forwardCache[fi++] = aggregators[i].evaluateOutput(pair.getValue() + .getAggregationBuffer(i)); + } + LOG.debug(String.format("forwarding keys: %s: %s", + pair.getKey().toString(), Arrays.toString(forwardCache))); + forward(forwardCache, outputObjInspector); + } } - forward(forwardCache, outputObjInspector); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index 1a0d98a..1ef4955 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -86,6 +86,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; /** @@ -209,6 +210,9 @@ private VectorExpression getVectorExpression(ExprNodeColumnDesc public VectorExpression[] getVectorExpressions(List exprNodes) throws HiveException { int i = 0; + if (null == exprNodes) { + return new VectorExpression[0]; + } VectorExpression[] ret = new VectorExpression[exprNodes.size()]; for (ExprNodeDesc e : exprNodes) { ret[i++] = getVectorExpression(e); @@ -1058,5 +1062,18 @@ public ColumnVector allocateColumnVector(String type, int defaultSize) { return new LongColumnVector(defaultSize); } } + + public ObjectInspector createObjectInspector(VectorExpression vectorExpression) + throws HiveException { + String columnType = vectorExpression.getOutputType(); + if (columnType.equalsIgnoreCase("long") || + columnType.equalsIgnoreCase("bigint")) { + return PrimitiveObjectInspectorFactory.writableLongObjectInspector; + } else if (columnType.equalsIgnoreCase("double")) { + return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; + } else { + throw new HiveException(String.format("Must implement type %s", columnType)); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java index 8623fe5..8ab9f43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -30,6 +31,8 @@ public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException; public abstract void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throws HiveException; + public abstract void aggregateInputSelection(VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, VectorizedRowBatch vrg) throws HiveException; public abstract void reset(AggregationBuffer agg) throws HiveException; public abstract Object evaluateOutput(AggregationBuffer agg) throws HiveException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java index bf6f786..54102a4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java @@ -24,7 +24,8 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. - VectorAggregateExpression.AggregationBuffer; + VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -42,19 +43,27 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -/** -* VectorUDAFAvgDouble. Vectorized implementation for AVG aggregates. -*/ +import org.apache.hadoop.hive.ql.io.orc.*; + @Description(name = "avg", value = "_FUNC_(expr) - Returns the average value of expr (vectorized, type: double)") public class VectorUDAFAvgDouble extends VectorAggregateExpression { - /** - /* class for storing the current aggregate value. - */ - static private final class Aggregation implements AggregationBuffer { + /** class for storing the current aggregate value. */ + static class Aggregation implements AggregationBuffer { double sum; long count; boolean isNull; + + public void sumValue(double value) { + if (isNull) { + sum = value; + count = 1; + isNull = false; + } else { + sum += value; + count++; + } + } } private VectorExpression inputExpression; @@ -67,14 +76,16 @@ public VectorUDAFAvgDouble(VectorExpression inputExpression) { super(); this.inputExpression = inputExpression; partialResult = new Object[2]; - partialResult[0] = resultCount = new LongWritable(); - partialResult[1] = resultSum = new DoubleWritable(); + resultCount = new LongWritable(); + resultSum = new DoubleWritable(); + partialResult[0] = resultCount; + partialResult[1] = resultSum; initPartialResultInspector(); } - private void initPartialResultInspector () { - ArrayList foi = new ArrayList(); + private void initPartialResultInspector() { + ArrayList foi = new ArrayList(); foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); ArrayList fname = new ArrayList(); @@ -83,52 +94,239 @@ private void initPartialResultInspector () { soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); } + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(bufferIndex); + return myagg; + } @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) - throws HiveException { - - inputExpression.evaluate(unit); + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + VectorizedRowBatch batch) throws HiveException { - DoubleColumnVector inputVector = (DoubleColumnVector)unit. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; } - Aggregation myagg = (Aggregation)agg; - - double[] vector = inputVector.vector; + inputExpression.evaluate(batch); - if (inputVector.isRepeating) { - if (inputVector.noNulls || !inputVector.isNull[0]) { - if (myagg.isNull) { - myagg.isNull = false; - myagg.sum = 0; - myagg.count = 0; + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + long[] vector = inputVector.vector; + + if (inputVector.noNulls) { + if (inputVector.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector[0], batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector, batchSize); } - myagg.sum += vector[0]*batchSize; - myagg.count += batchSize; } - return; + } else { + if (inputVector.isRepeating) { + if (batch.selectedInUse) { + iterateHasNullsRepeatingSelectionWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector[0], batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsRepeatingWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector[0], batchSize, inputVector.isNull); + } + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector, batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector, batchSize, inputVector.isNull); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long value, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(value); } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long[] values, + int[] selection, + int batchSize) { - if (!unit.selectedInUse && inputVector.noNulls) { - iterateNoSelectionNoNulls(myagg, vector, batchSize); + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(values[selection[i]]); } - else if (!unit.selectedInUse) { - iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long[] values, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(values[i]); } - else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + } + + private void iterateHasNullsRepeatingSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long value, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[selection[i]]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(value); + } } - else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + + } + + private void iterateHasNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long value, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(value); + } } } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long[] values, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + j); + myagg.sumValue(values[i]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long[] values, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(values[i]); + } + } + } + + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { + + inputExpression.evaluate(batch); + + DoubleColumnVector inputVector = (DoubleColumnVector)batch.cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + Aggregation myagg = (Aggregation)agg; + + double[] vector = inputVector.vector; + + if (inputVector.isRepeating) { + if (inputVector.noNulls || !inputVector.isNull[0]) { + if (myagg.isNull) { + myagg.isNull = false; + myagg.sum = 0; + myagg.count = 0; + } + myagg.sum += vector[0]*batchSize; + myagg.count += batchSize; + } + return; + } + + if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNulls(myagg, vector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); + } + else { + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); + } + } private void iterateSelectionHasNulls( Aggregation myagg, @@ -236,7 +434,7 @@ public Object evaluateOutput( @Override public ObjectInspector getOutputObjectInspector() { - return soi; - } + return soi; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java index 23aa6dd..8c6844b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java @@ -24,7 +24,8 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. - VectorAggregateExpression.AggregationBuffer; + VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -42,19 +43,27 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -/** -* VectorUDAFAvgLong. Vectorized implementation for AVG aggregates. -*/ +import org.apache.hadoop.hive.ql.io.orc.*; + @Description(name = "avg", value = "_FUNC_(expr) - Returns the average value of expr (vectorized, type: long)") public class VectorUDAFAvgLong extends VectorAggregateExpression { - /** - /* class for storing the current aggregate value. - */ - static private final class Aggregation implements AggregationBuffer { + /** class for storing the current aggregate value. */ + static class Aggregation implements AggregationBuffer { long sum; long count; boolean isNull; + + public void sumValue(long value) { + if (isNull) { + sum = value; + count = 1; + isNull = false; + } else { + sum += value; + count++; + } + } } private VectorExpression inputExpression; @@ -67,14 +76,16 @@ public VectorUDAFAvgLong(VectorExpression inputExpression) { super(); this.inputExpression = inputExpression; partialResult = new Object[2]; - partialResult[0] = resultCount = new LongWritable(); - partialResult[1] = resultSum = new DoubleWritable(); + resultCount = new LongWritable(); + resultSum = new DoubleWritable(); + partialResult[0] = resultCount; + partialResult[1] = resultSum; initPartialResultInspector(); } - private void initPartialResultInspector () { - ArrayList foi = new ArrayList(); + private void initPartialResultInspector() { + ArrayList foi = new ArrayList(); foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); ArrayList fname = new ArrayList(); @@ -83,52 +94,239 @@ private void initPartialResultInspector () { soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); } + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(bufferIndex); + return myagg; + } @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) - throws HiveException { - - inputExpression.evaluate(unit); + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + VectorizedRowBatch batch) throws HiveException { - LongColumnVector inputVector = (LongColumnVector)unit. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; } - Aggregation myagg = (Aggregation)agg; - - long[] vector = inputVector.vector; + inputExpression.evaluate(batch); - if (inputVector.isRepeating) { - if (inputVector.noNulls || !inputVector.isNull[0]) { - if (myagg.isNull) { - myagg.isNull = false; - myagg.sum = 0; - myagg.count = 0; + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + long[] vector = inputVector.vector; + + if (inputVector.noNulls) { + if (inputVector.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector[0], batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector, batchSize); + } + } + } else { + if (inputVector.isRepeating) { + if (batch.selectedInUse) { + iterateHasNullsRepeatingSelectionWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector[0], batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsRepeatingWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector[0], batchSize, inputVector.isNull); + } + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector, batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector, batchSize, inputVector.isNull); } - myagg.sum += vector[0]*batchSize; - myagg.count += batchSize; } - return; } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long value, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(value); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long[] values, + int[] selection, + int batchSize) { - if (!unit.selectedInUse && inputVector.noNulls) { - iterateNoSelectionNoNulls(myagg, vector, batchSize); + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(values[selection[i]]); } - else if (!unit.selectedInUse) { - iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long[] values, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(values[i]); } - else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + } + + private void iterateHasNullsRepeatingSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long value, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[selection[i]]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(value); + } } - else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + + } + + private void iterateHasNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long value, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(value); + } } } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long[] values, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + j); + myagg.sumValue(values[i]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long[] values, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(values[i]); + } + } + } + + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch.cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + Aggregation myagg = (Aggregation)agg; + + long[] vector = inputVector.vector; + + if (inputVector.isRepeating) { + if (inputVector.noNulls || !inputVector.isNull[0]) { + if (myagg.isNull) { + myagg.isNull = false; + myagg.sum = 0; + myagg.count = 0; + } + myagg.sum += vector[0]*batchSize; + myagg.count += batchSize; + } + return; + } + + if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNulls(myagg, vector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); + } + else { + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); + } + } private void iterateSelectionHasNulls( Aggregation myagg, @@ -236,7 +434,7 @@ public Object evaluateOutput( @Override public ObjectInspector getOutputObjectInspector() { - return soi; - } + return soi; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java index 0995a01..7a0c22b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -52,6 +53,13 @@ static class Aggregation implements AggregationBuffer { long value; boolean isNull; + + public void initIfNull() { + if (isNull) { + isNull = false; + value = 0; + } + } } private VectorExpression inputExpression; @@ -62,17 +70,112 @@ public VectorUDAFCountDouble(VectorExpression inputExpression) { this.inputExpression = inputExpression; result = new LongWritable(0); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + DoubleColumnVector inputVector = (DoubleColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + if (inputVector.noNulls) { + // if there are no nulls then the iteration is the same on all cases + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, batchSize); + } else if (!batch.selectedInUse) { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + batchSize, inputVector.isNull); + } else if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + batchSize, batch.selected, inputVector.isNull); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.initIfNull(); + myagg.value++; + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.initIfNull(); + myagg.value++; + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + myagg.initIfNull(); + myagg.value++; + } + } + } + + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - DoubleColumnVector inputVector = (DoubleColumnVector)unit. + DoubleColumnVector inputVector = (DoubleColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -80,11 +183,8 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) Aggregation myagg = (Aggregation)agg; - if (myagg.isNull) { - myagg.value = 0; - myagg.isNull = false; - } - + myagg.initIfNull(); + if (inputVector.isRepeating) { if (inputVector.noNulls || !inputVector.isNull[0]) { myagg.value += batchSize; @@ -96,11 +196,11 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) myagg.value += batchSize; return; } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull); } else { - iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java index 5f595c5..c63892c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -52,6 +53,13 @@ static class Aggregation implements AggregationBuffer { long value; boolean isNull; + + public void initIfNull() { + if (isNull) { + isNull = false; + value = 0; + } + } } private VectorExpression inputExpression; @@ -62,17 +70,112 @@ public VectorUDAFCountLong(VectorExpression inputExpression) { this.inputExpression = inputExpression; result = new LongWritable(0); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + if (inputVector.noNulls) { + // if there are no nulls then the iteration is the same on all cases + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, batchSize); + } else if (!batch.selectedInUse) { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + batchSize, inputVector.isNull); + } else if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + batchSize, batch.selected, inputVector.isNull); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.initIfNull(); + myagg.value++; + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.initIfNull(); + myagg.value++; + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + myagg.initIfNull(); + myagg.value++; + } + } + } + + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - LongColumnVector inputVector = (LongColumnVector)unit. + LongColumnVector inputVector = (LongColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -80,11 +183,8 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) Aggregation myagg = (Aggregation)agg; - if (myagg.isNull) { - myagg.value = 0; - myagg.isNull = false; - } - + myagg.initIfNull(); + if (inputVector.isRepeating) { if (inputVector.noNulls || !inputVector.isNull[0]) { myagg.value += batchSize; @@ -96,11 +196,11 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) myagg.value += batchSize; return; } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull); } else { - iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxDouble.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxDouble.java index 7616752..bc7f852 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxDouble.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxDouble.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -53,6 +54,15 @@ static private final class Aggregation implements AggregationBuffer { double value; boolean isNull; + + public void checkValue(double value) { + if (isNull) { + isNull = false; + this.value = value; + } else if (value > this.value) { + this.value = value; + } + } } private VectorExpression inputExpression; @@ -64,15 +74,205 @@ public VectorUDAFMaxDouble(VectorExpression inputExpression) { result = new DoubleWritable(); } + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex); + return myagg; + } + +@Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + DoubleColumnVector inputVector = (DoubleColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + double[] vector = inputVector.vector; + + if (inputVector.noNulls) { + if (inputVector.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize); + } + } + } else { + if (inputVector.isRepeating) { + if (batch.selectedInUse) { + iterateHasNullsRepeatingSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize, inputVector.isNull); + } + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize, inputVector.isNull); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double value, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double[] values, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[selection[i]]); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double[] values, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[i]); + } + } + + private void iterateHasNullsRepeatingSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double value, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[selection[i]]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + + } + + private void iterateHasNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double value, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double[] values, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + j); + myagg.checkValue(values[i]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double[] values, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[i]); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throws HiveException { + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) + throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - DoubleColumnVector inputVector = (DoubleColumnVector)unit. + DoubleColumnVector inputVector = (DoubleColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -91,17 +291,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throw return; } - if (!unit.selectedInUse && inputVector.noNulls) { + if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxLong.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxLong.java index 3c16d33..6ba416e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxLong.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxLong.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -53,6 +54,15 @@ static private final class Aggregation implements AggregationBuffer { long value; boolean isNull; + + public void checkValue(long value) { + if (isNull) { + isNull = false; + this.value = value; + } else if (value > this.value) { + this.value = value; + } + } } private VectorExpression inputExpression; @@ -64,15 +74,205 @@ public VectorUDAFMaxLong(VectorExpression inputExpression) { result = new LongWritable(); } + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex); + return myagg; + } + +@Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + long[] vector = inputVector.vector; + + if (inputVector.noNulls) { + if (inputVector.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize); + } + } + } else { + if (inputVector.isRepeating) { + if (batch.selectedInUse) { + iterateHasNullsRepeatingSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize, inputVector.isNull); + } + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize, inputVector.isNull); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long value, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long[] values, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[selection[i]]); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long[] values, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[i]); + } + } + + private void iterateHasNullsRepeatingSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long value, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[selection[i]]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + + } + + private void iterateHasNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long value, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long[] values, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + j); + myagg.checkValue(values[i]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long[] values, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[i]); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throws HiveException { + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) + throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - LongColumnVector inputVector = (LongColumnVector)unit. + LongColumnVector inputVector = (LongColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -91,17 +291,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throw return; } - if (!unit.selectedInUse && inputVector.noNulls) { + if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinDouble.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinDouble.java index 7067c1c..d982fc2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinDouble.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinDouble.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -53,6 +54,15 @@ static private final class Aggregation implements AggregationBuffer { double value; boolean isNull; + + public void checkValue(double value) { + if (isNull) { + isNull = false; + this.value = value; + } else if (value < this.value) { + this.value = value; + } + } } private VectorExpression inputExpression; @@ -64,15 +74,205 @@ public VectorUDAFMinDouble(VectorExpression inputExpression) { result = new DoubleWritable(); } + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex); + return myagg; + } + +@Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + DoubleColumnVector inputVector = (DoubleColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + double[] vector = inputVector.vector; + + if (inputVector.noNulls) { + if (inputVector.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize); + } + } + } else { + if (inputVector.isRepeating) { + if (batch.selectedInUse) { + iterateHasNullsRepeatingSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize, inputVector.isNull); + } + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize, inputVector.isNull); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double value, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double[] values, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[selection[i]]); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double[] values, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[i]); + } + } + + private void iterateHasNullsRepeatingSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double value, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[selection[i]]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + + } + + private void iterateHasNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double value, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double[] values, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + j); + myagg.checkValue(values[i]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + double[] values, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[i]); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throws HiveException { + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) + throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - DoubleColumnVector inputVector = (DoubleColumnVector)unit. + DoubleColumnVector inputVector = (DoubleColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -91,17 +291,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throw return; } - if (!unit.selectedInUse && inputVector.noNulls) { + if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinLong.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinLong.java index 9f813b2..a8f5531 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinLong.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinLong.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -53,6 +54,15 @@ static private final class Aggregation implements AggregationBuffer { long value; boolean isNull; + + public void checkValue(long value) { + if (isNull) { + isNull = false; + this.value = value; + } else if (value < this.value) { + this.value = value; + } + } } private VectorExpression inputExpression; @@ -64,15 +74,205 @@ public VectorUDAFMinLong(VectorExpression inputExpression) { result = new LongWritable(); } + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex); + return myagg; + } + +@Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + long[] vector = inputVector.vector; + + if (inputVector.noNulls) { + if (inputVector.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize); + } + } + } else { + if (inputVector.isRepeating) { + if (batch.selectedInUse) { + iterateHasNullsRepeatingSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize, inputVector.isNull); + } + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize, inputVector.isNull); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long value, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long[] values, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[selection[i]]); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long[] values, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[i]); + } + } + + private void iterateHasNullsRepeatingSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long value, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[selection[i]]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + + } + + private void iterateHasNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long value, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long[] values, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + j); + myagg.checkValue(values[i]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + long[] values, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[i]); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throws HiveException { + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) + throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - LongColumnVector inputVector = (LongColumnVector)unit. + LongColumnVector inputVector = (LongColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -91,17 +291,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throw return; } - if (!unit.selectedInUse && inputVector.noNulls) { + if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopDouble.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopDouble.java index 022b449..a4084b0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopDouble.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopDouble.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates .VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -50,13 +51,13 @@ /** /* class for storing the current aggregate value. */ - static private final class Aggregation implements AggregationBuffer { + private static final class Aggregation implements AggregationBuffer { double sum; long count; double variance; boolean isNull; - public void init () { + public void init() { isNull = false; sum = 0; count = 0; @@ -86,7 +87,7 @@ public VectorUDAFStdPopDouble(VectorExpression inputExpression) { initPartialResultInspector(); } - private void initPartialResultInspector () { + private void initPartialResultInspector() { ArrayList foi = new ArrayList(); foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); @@ -99,17 +100,200 @@ private void initPartialResultInspector () { soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + inputExpression.evaluate(batch); + + DoubleColumnVector inputVector = (DoubleColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + double[] vector = inputVector.vector; + + if (inputVector.isRepeating) { + if (inputVector.noNulls || !inputVector.isNull[0]) { + iterateRepeatingNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector[0], batchSize); + } + } + else if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected); + } + else { + iterateSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, + inputVector.isNull, batch.selected); + } + + } + private void iterateRepeatingNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double value, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize, + boolean[] isNull, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + int i = selected[j]; + if (!isNull[i]) { + double value = vector[i]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize, + int[] selected) { + + for (int i=0; i< batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + double value = vector[selected[i]]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateNoSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize, + boolean[] isNull) { + + for(int i=0;i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateNoSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - DoubleColumnVector inputVector = (DoubleColumnVector)unit. + DoubleColumnVector inputVector = (DoubleColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) iterateRepeatingNoNulls(myagg, vector[0], batchSize); } } - else if (!unit.selectedInUse && inputVector.noNulls) { + else if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopLong.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopLong.java index 9f658d1..28fdb36 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopLong.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopLong.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates .VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -50,13 +51,13 @@ /** /* class for storing the current aggregate value. */ - static private final class Aggregation implements AggregationBuffer { + private static final class Aggregation implements AggregationBuffer { double sum; long count; double variance; boolean isNull; - public void init () { + public void init() { isNull = false; sum = 0; count = 0; @@ -86,7 +87,7 @@ public VectorUDAFStdPopLong(VectorExpression inputExpression) { initPartialResultInspector(); } - private void initPartialResultInspector () { + private void initPartialResultInspector() { ArrayList foi = new ArrayList(); foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); @@ -99,17 +100,200 @@ private void initPartialResultInspector () { soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + long[] vector = inputVector.vector; + + if (inputVector.isRepeating) { + if (inputVector.noNulls || !inputVector.isNull[0]) { + iterateRepeatingNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector[0], batchSize); + } + } + else if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected); + } + else { + iterateSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, + inputVector.isNull, batch.selected); + } + + } + private void iterateRepeatingNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize, + boolean[] isNull, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + int i = selected[j]; + if (!isNull[i]) { + long value = vector[i]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize, + int[] selected) { + + for (int i=0; i< batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + long value = vector[selected[i]]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateNoSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize, + boolean[] isNull) { + + for(int i=0;i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateNoSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - LongColumnVector inputVector = (LongColumnVector)unit. + LongColumnVector inputVector = (LongColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) iterateRepeatingNoNulls(myagg, vector[0], batchSize); } } - else if (!unit.selectedInUse && inputVector.noNulls) { + else if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampDouble.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampDouble.java index cb5f47d..4fa52ff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampDouble.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampDouble.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates .VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -50,13 +51,13 @@ /** /* class for storing the current aggregate value. */ - static private final class Aggregation implements AggregationBuffer { + private static final class Aggregation implements AggregationBuffer { double sum; long count; double variance; boolean isNull; - public void init () { + public void init() { isNull = false; sum = 0; count = 0; @@ -86,7 +87,7 @@ public VectorUDAFStdSampDouble(VectorExpression inputExpression) { initPartialResultInspector(); } - private void initPartialResultInspector () { + private void initPartialResultInspector() { ArrayList foi = new ArrayList(); foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); @@ -99,17 +100,200 @@ private void initPartialResultInspector () { soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + inputExpression.evaluate(batch); + + DoubleColumnVector inputVector = (DoubleColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + double[] vector = inputVector.vector; + + if (inputVector.isRepeating) { + if (inputVector.noNulls || !inputVector.isNull[0]) { + iterateRepeatingNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector[0], batchSize); + } + } + else if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected); + } + else { + iterateSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, + inputVector.isNull, batch.selected); + } + + } + private void iterateRepeatingNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double value, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize, + boolean[] isNull, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + int i = selected[j]; + if (!isNull[i]) { + double value = vector[i]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize, + int[] selected) { + + for (int i=0; i< batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + double value = vector[selected[i]]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateNoSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize, + boolean[] isNull) { + + for(int i=0;i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateNoSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - DoubleColumnVector inputVector = (DoubleColumnVector)unit. + DoubleColumnVector inputVector = (DoubleColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) iterateRepeatingNoNulls(myagg, vector[0], batchSize); } } - else if (!unit.selectedInUse && inputVector.noNulls) { + else if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampLong.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampLong.java index 882795b..551ae8a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampLong.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampLong.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates .VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -50,13 +51,13 @@ /** /* class for storing the current aggregate value. */ - static private final class Aggregation implements AggregationBuffer { + private static final class Aggregation implements AggregationBuffer { double sum; long count; double variance; boolean isNull; - public void init () { + public void init() { isNull = false; sum = 0; count = 0; @@ -86,7 +87,7 @@ public VectorUDAFStdSampLong(VectorExpression inputExpression) { initPartialResultInspector(); } - private void initPartialResultInspector () { + private void initPartialResultInspector() { ArrayList foi = new ArrayList(); foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); @@ -99,17 +100,200 @@ private void initPartialResultInspector () { soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + long[] vector = inputVector.vector; + + if (inputVector.isRepeating) { + if (inputVector.noNulls || !inputVector.isNull[0]) { + iterateRepeatingNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector[0], batchSize); + } + } + else if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected); + } + else { + iterateSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, + inputVector.isNull, batch.selected); + } + + } + private void iterateRepeatingNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize, + boolean[] isNull, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + int i = selected[j]; + if (!isNull[i]) { + long value = vector[i]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize, + int[] selected) { + + for (int i=0; i< batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + long value = vector[selected[i]]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateNoSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize, + boolean[] isNull) { + + for(int i=0;i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateNoSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - LongColumnVector inputVector = (LongColumnVector)unit. + LongColumnVector inputVector = (LongColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) iterateRepeatingNoNulls(myagg, vector[0], batchSize); } } - else if (!unit.selectedInUse && inputVector.noNulls) { + else if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumDouble.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumDouble.java index b4b425d..a2e8fb3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumDouble.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumDouble.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -53,6 +54,15 @@ static private final class Aggregation implements AggregationBuffer { double sum; boolean isNull; + + public void sumValue(double value) { + if (isNull) { + sum = value; + isNull = false; + } else { + sum += value; + } + } } VectorExpression inputExpression; @@ -63,17 +73,207 @@ public VectorUDAFSumDouble(VectorExpression inputExpression) { this.inputExpression = inputExpression; result = new DoubleWritable(); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + long[] vector = inputVector.vector; + + if (inputVector.noNulls) { + if (inputVector.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector[0], batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batchSize); + } + } + } else { + if (inputVector.isRepeating) { + if (batch.selectedInUse) { + iterateHasNullsRepeatingSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector[0], batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector[0], batchSize, inputVector.isNull); + } + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batchSize, inputVector.isNull); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(value); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(values[selection[i]]); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(values[i]); + } + } + + private void iterateHasNullsRepeatingSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[selection[i]]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(value); + } + } + + } + + private void iterateHasNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(value); + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + myagg.sumValue(values[i]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(values[i]); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - DoubleColumnVector inputVector = (DoubleColumnVector)unit. + DoubleColumnVector inputVector = (DoubleColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -94,17 +294,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) return; } - if (!unit.selectedInUse && inputVector.noNulls) { + if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumLong.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumLong.java index 9420aa2..71b2e3d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumLong.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumLong.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -53,6 +54,15 @@ static private final class Aggregation implements AggregationBuffer { long sum; boolean isNull; + + public void sumValue(long value) { + if (isNull) { + sum = value; + isNull = false; + } else { + sum += value; + } + } } VectorExpression inputExpression; @@ -63,17 +73,207 @@ public VectorUDAFSumLong(VectorExpression inputExpression) { this.inputExpression = inputExpression; result = new LongWritable(); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + long[] vector = inputVector.vector; + + if (inputVector.noNulls) { + if (inputVector.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector[0], batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batchSize); + } + } + } else { + if (inputVector.isRepeating) { + if (batch.selectedInUse) { + iterateHasNullsRepeatingSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector[0], batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector[0], batchSize, inputVector.isNull); + } + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batchSize, inputVector.isNull); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(value); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(values[selection[i]]); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(values[i]); + } + } + + private void iterateHasNullsRepeatingSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[selection[i]]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(value); + } + } + + } + + private void iterateHasNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(value); + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + myagg.sumValue(values[i]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(values[i]); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - LongColumnVector inputVector = (LongColumnVector)unit. + LongColumnVector inputVector = (LongColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -94,17 +294,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) return; } - if (!unit.selectedInUse && inputVector.noNulls) { + if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopDouble.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopDouble.java index 0d58bb9..2dfbfa3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopDouble.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopDouble.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates .VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -50,13 +51,13 @@ /** /* class for storing the current aggregate value. */ - static private final class Aggregation implements AggregationBuffer { + private static final class Aggregation implements AggregationBuffer { double sum; long count; double variance; boolean isNull; - public void init () { + public void init() { isNull = false; sum = 0; count = 0; @@ -86,7 +87,7 @@ public VectorUDAFVarPopDouble(VectorExpression inputExpression) { initPartialResultInspector(); } - private void initPartialResultInspector () { + private void initPartialResultInspector() { ArrayList foi = new ArrayList(); foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); @@ -99,17 +100,200 @@ private void initPartialResultInspector () { soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + inputExpression.evaluate(batch); + + DoubleColumnVector inputVector = (DoubleColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + double[] vector = inputVector.vector; + + if (inputVector.isRepeating) { + if (inputVector.noNulls || !inputVector.isNull[0]) { + iterateRepeatingNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector[0], batchSize); + } + } + else if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected); + } + else { + iterateSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, + inputVector.isNull, batch.selected); + } + + } + private void iterateRepeatingNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double value, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize, + boolean[] isNull, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + int i = selected[j]; + if (!isNull[i]) { + double value = vector[i]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize, + int[] selected) { + + for (int i=0; i< batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + double value = vector[selected[i]]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateNoSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize, + boolean[] isNull) { + + for(int i=0;i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateNoSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - DoubleColumnVector inputVector = (DoubleColumnVector)unit. + DoubleColumnVector inputVector = (DoubleColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) iterateRepeatingNoNulls(myagg, vector[0], batchSize); } } - else if (!unit.selectedInUse && inputVector.noNulls) { + else if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopLong.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopLong.java index 2c27e8f..de4811d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopLong.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopLong.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates .VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -50,13 +51,13 @@ /** /* class for storing the current aggregate value. */ - static private final class Aggregation implements AggregationBuffer { + private static final class Aggregation implements AggregationBuffer { double sum; long count; double variance; boolean isNull; - public void init () { + public void init() { isNull = false; sum = 0; count = 0; @@ -86,7 +87,7 @@ public VectorUDAFVarPopLong(VectorExpression inputExpression) { initPartialResultInspector(); } - private void initPartialResultInspector () { + private void initPartialResultInspector() { ArrayList foi = new ArrayList(); foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); @@ -99,17 +100,200 @@ private void initPartialResultInspector () { soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + long[] vector = inputVector.vector; + + if (inputVector.isRepeating) { + if (inputVector.noNulls || !inputVector.isNull[0]) { + iterateRepeatingNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector[0], batchSize); + } + } + else if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected); + } + else { + iterateSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, + inputVector.isNull, batch.selected); + } + + } + private void iterateRepeatingNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize, + boolean[] isNull, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + int i = selected[j]; + if (!isNull[i]) { + long value = vector[i]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize, + int[] selected) { + + for (int i=0; i< batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + long value = vector[selected[i]]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateNoSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize, + boolean[] isNull) { + + for(int i=0;i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateNoSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - LongColumnVector inputVector = (LongColumnVector)unit. + LongColumnVector inputVector = (LongColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) iterateRepeatingNoNulls(myagg, vector[0], batchSize); } } - else if (!unit.selectedInUse && inputVector.noNulls) { + else if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampDouble.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampDouble.java index 4156e18..5a21f44 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampDouble.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampDouble.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates .VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -50,13 +51,13 @@ /** /* class for storing the current aggregate value. */ - static private final class Aggregation implements AggregationBuffer { + private static final class Aggregation implements AggregationBuffer { double sum; long count; double variance; boolean isNull; - public void init () { + public void init() { isNull = false; sum = 0; count = 0; @@ -86,7 +87,7 @@ public VectorUDAFVarSampDouble(VectorExpression inputExpression) { initPartialResultInspector(); } - private void initPartialResultInspector () { + private void initPartialResultInspector() { ArrayList foi = new ArrayList(); foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); @@ -99,17 +100,200 @@ private void initPartialResultInspector () { soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + inputExpression.evaluate(batch); + + DoubleColumnVector inputVector = (DoubleColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + double[] vector = inputVector.vector; + + if (inputVector.isRepeating) { + if (inputVector.noNulls || !inputVector.isNull[0]) { + iterateRepeatingNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector[0], batchSize); + } + } + else if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected); + } + else { + iterateSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, + inputVector.isNull, batch.selected); + } + + } + private void iterateRepeatingNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double value, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize, + boolean[] isNull, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + int i = selected[j]; + if (!isNull[i]) { + double value = vector[i]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize, + int[] selected) { + + for (int i=0; i< batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + double value = vector[selected[i]]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateNoSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize, + boolean[] isNull) { + + for(int i=0;i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateNoSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double[] vector, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - DoubleColumnVector inputVector = (DoubleColumnVector)unit. + DoubleColumnVector inputVector = (DoubleColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) iterateRepeatingNoNulls(myagg, vector[0], batchSize); } } - else if (!unit.selectedInUse && inputVector.noNulls) { + else if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampLong.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampLong.java index 7042382..7b88c4f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampLong.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampLong.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates .VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -50,13 +51,13 @@ /** /* class for storing the current aggregate value. */ - static private final class Aggregation implements AggregationBuffer { + private static final class Aggregation implements AggregationBuffer { double sum; long count; double variance; boolean isNull; - public void init () { + public void init() { isNull = false; sum = 0; count = 0; @@ -86,7 +87,7 @@ public VectorUDAFVarSampLong(VectorExpression inputExpression) { initPartialResultInspector(); } - private void initPartialResultInspector () { + private void initPartialResultInspector() { ArrayList foi = new ArrayList(); foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); @@ -99,17 +100,200 @@ private void initPartialResultInspector () { soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + long[] vector = inputVector.vector; + + if (inputVector.isRepeating) { + if (inputVector.noNulls || !inputVector.isNull[0]) { + iterateRepeatingNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector[0], batchSize); + } + } + else if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected); + } + else { + iterateSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, + inputVector.isNull, batch.selected); + } + + } + private void iterateRepeatingNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize, + boolean[] isNull, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + int i = selected[j]; + if (!isNull[i]) { + long value = vector[i]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize, + int[] selected) { + + for (int i=0; i< batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + long value = vector[selected[i]]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateNoSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize, + boolean[] isNull) { + + for(int i=0;i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateNoSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] vector, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - LongColumnVector inputVector = (LongColumnVector)unit. + LongColumnVector inputVector = (LongColumnVector)batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -124,17 +308,17 @@ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) iterateRepeatingNoNulls(myagg, vector[0], batchSize); } } - else if (!unit.selectedInUse && inputVector.noNulls) { + else if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java index 53d9a7a..fec7cbc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java @@ -269,6 +269,16 @@ private void generate() throws Exception { generateColumnArithmeticColumn(tdesc); } else if (tdesc[0].equals("ColumnUnaryMinus")) { generateColumnUnaryMinus(tdesc); + } else if (tdesc[0].equals("VectorUDAFCount")) { + generateVectorUDAFCount(tdesc); + } else if (tdesc[0].equals("VectorUDAFMinMax")) { + generateVectorUDAFMinMax(tdesc); + } else if (tdesc[0].equals("VectorUDAFSum")) { + generateVectorUDAFSum(tdesc); + } else if (tdesc[0].equals("VectorUDAFAvg")) { + generateVectorUDAFAvg(tdesc); + } else if (tdesc[0].equals("VectorUDAFVar")) { + generateVectorUDAFVar(tdesc); } else if (tdesc[0].equals("FilterStringColumnCompareScalar")) { generateFilterStringColumnCompareScalar(tdesc); } else if (tdesc[0].equals("FilterStringColumnCompareColumn")) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFAvg.txt b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFAvg.txt new file mode 100644 index 0000000..d85346d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFAvg.txt @@ -0,0 +1,440 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen; + +import java.util.ArrayList; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. + VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; + +import org.apache.hadoop.hive.ql.io.orc.*; + +@Description(name = "avg", value = "_FUNC_(expr) - Returns the average value of expr (vectorized, type: )") +public class extends VectorAggregateExpression { + + /** class for storing the current aggregate value. */ + static class Aggregation implements AggregationBuffer { + sum; + long count; + boolean isNull; + + public void sumValue( value) { + if (isNull) { + sum = value; + count = 1; + isNull = false; + } else { + sum += value; + count++; + } + } + } + + private VectorExpression inputExpression; + private Object[] partialResult; + private LongWritable resultCount; + private DoubleWritable resultSum; + private StructObjectInspector soi; + + public (VectorExpression inputExpression) { + super(); + this.inputExpression = inputExpression; + partialResult = new Object[2]; + resultCount = new LongWritable(); + resultSum = new DoubleWritable(); + partialResult[0] = resultCount; + partialResult[1] = resultSum; + + initPartialResultInspector(); + } + + private void initPartialResultInspector() { + ArrayList foi = new ArrayList(); + foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); + foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); + ArrayList fname = new ArrayList(); + fname.add("count"); + fname.add("sum"); + soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); + } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(bufferIndex); + return myagg; + } + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + long[] vector = inputVector.vector; + + if (inputVector.noNulls) { + if (inputVector.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector[0], batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector, batchSize); + } + } + } else { + if (inputVector.isRepeating) { + if (batch.selectedInUse) { + iterateHasNullsRepeatingSelectionWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector[0], batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsRepeatingWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector[0], batchSize, inputVector.isNull); + } + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector, batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, bufferIndex, + vector, batchSize, inputVector.isNull); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long value, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(value); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long[] values, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(values[selection[i]]); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long[] values, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(values[i]); + } + } + + private void iterateHasNullsRepeatingSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long value, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[selection[i]]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(value); + } + } + + } + + private void iterateHasNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long value, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(value); + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long[] values, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + j); + myagg.sumValue(values[i]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int bufferIndex, + long[] values, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + bufferIndex, + i); + myagg.sumValue(values[i]); + } + } + } + + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { + + inputExpression.evaluate(batch); + + inputVector = ()batch.cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + Aggregation myagg = (Aggregation)agg; + + [] vector = inputVector.vector; + + if (inputVector.isRepeating) { + if (inputVector.noNulls || !inputVector.isNull[0]) { + if (myagg.isNull) { + myagg.isNull = false; + myagg.sum = 0; + myagg.count = 0; + } + myagg.sum += vector[0]*batchSize; + myagg.count += batchSize; + } + return; + } + + if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNulls(myagg, vector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); + } + else { + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); + } + } + + private void iterateSelectionHasNulls( + Aggregation myagg, + [] vector, + int batchSize, + boolean[] isNull, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + int i = selected[j]; + if (!isNull[i]) { + value = vector[i]; + if (myagg.isNull) { + myagg.isNull = false; + myagg.sum = 0; + myagg.count = 0; + } + myagg.sum += value; + myagg.count += 1; + } + } + } + + private void iterateSelectionNoNulls( + Aggregation myagg, + [] vector, + int batchSize, + int[] selected) { + + if (myagg.isNull) { + myagg.isNull = false; + myagg.sum = 0; + myagg.count = 0; + } + + for (int i=0; i< batchSize; ++i) { + value = vector[selected[i]]; + myagg.sum += value; + myagg.count += 1; + } + } + + private void iterateNoSelectionHasNulls( + Aggregation myagg, + [] vector, + int batchSize, + boolean[] isNull) { + + for(int i=0;i value = vector[i]; + if (myagg.isNull) { + myagg.isNull = false; + myagg.sum = 0; + myagg.count = 0; + } + myagg.sum += value; + myagg.count += 1; + } + } + } + + private void iterateNoSelectionNoNulls( + Aggregation myagg, + [] vector, + int batchSize) { + if (myagg.isNull) { + myagg.isNull = false; + myagg.sum = 0; + myagg.count = 0; + } + + for (int i=0;i value = vector[i]; + myagg.sum += value; + myagg.count += 1; + } + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + return new Aggregation(); + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + Aggregation myAgg = (Aggregation) agg; + myAgg.isNull = true; + } + + @Override + public Object evaluateOutput( + AggregationBuffer agg) throws HiveException { + Aggregation myagg = (Aggregation) agg; + if (myagg.isNull) { + return null; + } + else { + assert(0 < myagg.count); + resultCount.set (myagg.count); + resultSum.set (myagg.sum); + return partialResult; + } + } + + @Override + public ObjectInspector getOutputObjectInspector() { + return soi; + } +} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt index b51e526..54c80ec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -52,6 +53,13 @@ public class extends VectorAggregateExpression { static class Aggregation implements AggregationBuffer { long value; boolean isNull; + + public void initIfNull() { + if (isNull) { + isNull = false; + value = 0; + } + } } private VectorExpression inputExpression; @@ -62,17 +70,112 @@ public class extends VectorAggregateExpression { this.inputExpression = inputExpression; result = new LongWritable(0); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + inputVector = ()batch. + cols[this.inputExpression.getOutputColumn()]; + + if (inputVector.noNulls) { + // if there are no nulls then the iteration is the same on all cases + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, batchSize); + } else if (!batch.selectedInUse) { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + batchSize, inputVector.isNull); + } else if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + batchSize, batch.selected, inputVector.isNull); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.initIfNull(); + myagg.value++; + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.initIfNull(); + myagg.value++; + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + myagg.initIfNull(); + myagg.value++; + } + } + } + + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - inputVector = ()unit. + inputVector = ()batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -80,11 +183,8 @@ public class extends VectorAggregateExpression { Aggregation myagg = (Aggregation)agg; - if (myagg.isNull) { - myagg.value = 0; - myagg.isNull = false; - } - + myagg.initIfNull(); + if (inputVector.isRepeating) { if (inputVector.noNulls || !inputVector.isNull[0]) { myagg.value += batchSize; @@ -96,11 +196,11 @@ public class extends VectorAggregateExpression { myagg.value += batchSize; return; } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull); } else { - iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt index 8c952c1..d00d9ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -53,6 +54,15 @@ public class extends VectorAggregateExpression { static private final class Aggregation implements AggregationBuffer { value; boolean isNull; + + public void checkValue( value) { + if (isNull) { + isNull = false; + this.value = value; + } else if (value this.value) { + this.value = value; + } + } } private VectorExpression inputExpression; @@ -64,15 +74,205 @@ public class extends VectorAggregateExpression { result = new (); } + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex); + return myagg; + } + +@Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + inputVector = ()batch. + cols[this.inputExpression.getOutputColumn()]; + [] vector = inputVector.vector; + + if (inputVector.noNulls) { + if (inputVector.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize); + } + } + } else { + if (inputVector.isRepeating) { + if (batch.selectedInUse) { + iterateHasNullsRepeatingSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector[0], batchSize, inputVector.isNull); + } + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregrateIndex, + vector, batchSize, inputVector.isNull); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + value, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + [] values, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[selection[i]]); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + [] values, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[i]); + } + } + + private void iterateHasNullsRepeatingSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + value, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[selection[i]]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + + } + + private void iterateHasNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + value, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(value); + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + [] values, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + j); + myagg.checkValue(values[i]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregrateIndex, + [] values, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregrateIndex, + i); + myagg.checkValue(values[i]); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) throws HiveException { + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) + throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - inputVector = ()unit. + inputVector = ()batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -91,17 +291,17 @@ public class extends VectorAggregateExpression { return; } - if (!unit.selectedInUse && inputVector.noNulls) { + if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt index b321654..aaaa6ad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -50,9 +51,18 @@ public class extends VectorAggregateExpression { /** /* class for storing the current aggregate value. */ - static private final class Aggregation implements AggregationBuffer { + private static final class Aggregation implements AggregationBuffer { sum; boolean isNull; + + public void sumValue( value) { + if (isNull) { + sum = value; + isNull = false; + } else { + sum += value; + } + } } VectorExpression inputExpression; @@ -63,17 +73,207 @@ public class extends VectorAggregateExpression { this.inputExpression = inputExpression; result = new (); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + LongColumnVector inputVector = (LongColumnVector)batch. + cols[this.inputExpression.getOutputColumn()]; + long[] vector = inputVector.vector; + + if (inputVector.noNulls) { + if (inputVector.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector[0], batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batchSize); + } + } + } else { + if (inputVector.isRepeating) { + if (batch.selectedInUse) { + iterateHasNullsRepeatingSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector[0], batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector[0], batchSize, inputVector.isNull); + } + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + vector, batchSize, inputVector.isNull); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(value); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(values[selection[i]]); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(values[i]); + } + } + + private void iterateHasNullsRepeatingSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[selection[i]]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(value); + } + } + + } + + private void iterateHasNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long value, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(value); + } + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + myagg.sumValue(values[i]); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + long[] values, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(values[i]); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - inputVector = ()unit. + inputVector = ()batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -94,17 +294,17 @@ public class extends VectorAggregateExpression { return; } - if (!unit.selectedInUse && inputVector.noNulls) { + if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt index 29abddf..daae57b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates .VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -50,13 +51,13 @@ public class extends VectorAggregateExpression { /** /* class for storing the current aggregate value. */ - static private final class Aggregation implements AggregationBuffer { + private static final class Aggregation implements AggregationBuffer { double sum; long count; double variance; boolean isNull; - public void init () { + public void init() { isNull = false; sum = 0; count = 0; @@ -86,7 +87,7 @@ public class extends VectorAggregateExpression { initPartialResultInspector(); } - private void initPartialResultInspector () { + private void initPartialResultInspector() { ArrayList foi = new ArrayList(); foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); @@ -99,17 +100,200 @@ public class extends VectorAggregateExpression { soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + inputExpression.evaluate(batch); + + inputVector = ()batch. + cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + [] vector = inputVector.vector; + + if (inputVector.isRepeating) { + if (inputVector.noNulls || !inputVector.isNull[0]) { + iterateRepeatingNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector[0], batchSize); + } + } + else if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected); + } + else { + iterateSelectionHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, vector, batchSize, + inputVector.isNull, batch.selected); + } + + } + private void iterateRepeatingNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + value, + int batchSize) { + + for (int i=0; i 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + [] vector, + int batchSize, + boolean[] isNull, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + int i = selected[j]; + if (!isNull[i]) { + value = vector[i]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + [] vector, + int batchSize, + int[] selected) { + + for (int i=0; i< batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + value = vector[selected[i]]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + + private void iterateNoSelectionHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + [] vector, + int batchSize, + boolean[] isNull) { + + for(int i=0;i value = vector[i]; + if (myagg.isNull) { + myagg.init (); + } + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + } + + private void iterateNoSelectionNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + [] vector, + int batchSize) { + + for (int i=0; i value = vector[i]; + myagg.sum += value; + myagg.count += 1; + if(myagg.count > 1) { + double t = myagg.count*value - myagg.sum; + myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); + } + } + } + @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException { - inputExpression.evaluate(unit); + inputExpression.evaluate(batch); - inputVector = ()unit. + inputVector = ()batch. cols[this.inputExpression.getOutputColumn()]; - int batchSize = unit.size; + int batchSize = batch.size; if (batchSize == 0) { return; @@ -124,17 +308,17 @@ public class extends VectorAggregateExpression { iterateRepeatingNoNulls(myagg, vector[0], batchSize); } } - else if (!unit.selectedInUse && inputVector.noNulls) { + else if (!batch.selectedInUse && inputVector.noNulls) { iterateNoSelectionNoNulls(myagg, vector, batchSize); } - else if (!unit.selectedInUse) { + else if (!batch.selectedInUse) { iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); } else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected); + iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); } else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected); + iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java index 8415498..b3b5cd2 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java @@ -28,8 +28,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.hive.ql.exec.vector.util.FakeCaptureOutputOperator; import org.apache.hadoop.hive.ql.exec.vector.util.FakeVectorRowBatchFromConcat; @@ -93,8 +95,144 @@ private static GroupByDesc buildGroupByDesc( return desc; } + private static GroupByDesc buildKeyGroupByDesc( + VectorizationContext ctx, + String aggregate, + String column, + String key) { + + GroupByDesc desc = buildGroupByDesc(ctx, aggregate, column); + + ExprNodeDesc keyExp = buildColumnDesc(ctx, key); + ArrayList keys = new ArrayList(); + keys.add(keyExp); + desc.setKeys(keys); + + return desc; + } + + @Test + public void testMinLongKeyGroupByCompactBatch() throws HiveException { + testAggregateLongKeyAggregate( + "min", + 2, + Arrays.asList(new Long[]{01L,1L,2L,02L}), + Arrays.asList(new Long[]{13L,5L,7L,19L}), + buildHashMap(1L, 5L, 2L, 7L)); + } + + @Test + public void testMinLongKeyGroupBySingleBatch() throws HiveException { + testAggregateLongKeyAggregate( + "min", + 4, + Arrays.asList(new Long[]{01L,1L,2L,02L}), + Arrays.asList(new Long[]{13L,5L,7L,19L}), + buildHashMap(1L, 5L, 2L, 7L)); + } + + @Test + public void testMinLongKeyGroupByCrossBatch() throws HiveException { + testAggregateLongKeyAggregate( + "min", + 2, + Arrays.asList(new Long[]{01L,2L,1L,02L}), + Arrays.asList(new Long[]{13L,5L,7L,19L}), + buildHashMap(1L, 7L, 2L, 5L)); + } + @Test - public void testMinLongSimple () throws HiveException { + public void testMinLongNullKeyGroupByCrossBatch() throws HiveException { + testAggregateLongKeyAggregate( + "min", + 2, + Arrays.asList(new Long[]{null,2L,null,02L}), + Arrays.asList(new Long[]{13L,5L,7L,19L}), + buildHashMap(null, 7L, 2L, 5L)); + } + + @Test + public void testMinLongNullKeyGroupBySingleBatch() throws HiveException { + testAggregateLongKeyAggregate( + "min", + 4, + Arrays.asList(new Long[]{null,2L,null,02L}), + Arrays.asList(new Long[]{13L,5L,7L,19L}), + buildHashMap(null, 7L, 2L, 5L)); + } + + @Test + public void testMaxLongNullKeyGroupBySingleBatch() throws HiveException { + testAggregateLongKeyAggregate( + "max", + 4, + Arrays.asList(new Long[]{null,2L,null,02L}), + Arrays.asList(new Long[]{13L,5L,7L,19L}), + buildHashMap(null, 13L, 2L, 19L)); + } + + @Test + public void testCountLongNullKeyGroupBySingleBatch() throws HiveException { + testAggregateLongKeyAggregate( + "count", + 4, + Arrays.asList(new Long[]{null,2L,null,02L}), + Arrays.asList(new Long[]{13L,5L,7L,19L}), + buildHashMap(null, 2L, 2L, 2L)); + } + + @Test + public void testSumLongNullKeyGroupBySingleBatch() throws HiveException { + testAggregateLongKeyAggregate( + "sum", + 4, + Arrays.asList(new Long[]{null,2L,null,02L}), + Arrays.asList(new Long[]{13L,5L,7L,19L}), + buildHashMap(null, 20L, 2L, 24L)); + } + + @Test + public void testAvgLongNullKeyGroupBySingleBatch() throws HiveException { + testAggregateLongKeyAggregate( + "avg", + 4, + Arrays.asList(new Long[]{null,2L,null,02L}), + Arrays.asList(new Long[]{13L,5L,7L,19L}), + buildHashMap(null, 10.0, 2L, 12.0)); + } + + @Test + public void testVarLongNullKeyGroupBySingleBatch() throws HiveException { + testAggregateLongKeyAggregate( + "variance", + 4, + Arrays.asList(new Long[]{null,2L,01L,02L,01L,01L}), + Arrays.asList(new Long[]{13L, 5L,18L,19L,12L,15L}), + buildHashMap(null, 0.0, 2L, 49.0, 01L, 6.0)); + } + + @Test + public void testMinNullLongNullKeyGroupBy() throws HiveException { + testAggregateLongKeyAggregate( + "min", + 4, + Arrays.asList(new Long[]{null,2L,null,02L}), + Arrays.asList(new Long[]{null, null, null, null}), + buildHashMap(null, null, 2L, null)); + } + + @Test + public void testMinLongGroupBy() throws HiveException { + testAggregateLongAggregate( + "min", + 2, + Arrays.asList(new Long[]{13L,5L,7L,19L}), + 5L); + } + + + @Test + public void testMinLongSimple() throws HiveException { testAggregateLongAggregate( "min", 2, @@ -735,7 +873,28 @@ public void testAggregateLongRepeats ( new Long[] {value}, repeat, batchSize); testAggregateLongIterable (aggregateName, fdr, expected); } + + public HashMap buildHashMap(Object... pairs) { + HashMap map = new HashMap(); + for(int i = 0; i < pairs.length; i += 2) { + map.put(pairs[i], pairs[i+1]); + } + return map; + } + + + public void testAggregateLongKeyAggregate ( + String aggregateName, + int batchSize, + Iterable keys, + Iterable values, + HashMap expected) throws HiveException { + @SuppressWarnings("unchecked") + FakeVectorRowBatchFromIterables fdr = new FakeVectorRowBatchFromIterables(batchSize, keys, values); + testAggregateLongKeyIterable (aggregateName, fdr, expected); + } + public void testAggregateLongAggregate ( String aggregateName, int batchSize, @@ -915,5 +1074,68 @@ public void testAggregateLongIterable ( Validator validator = getValidator(aggregateName); validator.validate(expected, result); } + + public void testAggregateLongKeyIterable ( + String aggregateName, + Iterable data, + HashMap expected) throws HiveException { + Map mapColumnNames = new HashMap(); + mapColumnNames.put("Key", 0); + mapColumnNames.put("Value", 1); + VectorizationContext ctx = new VectorizationContext(mapColumnNames, 2); + Set keys = new HashSet(); + + GroupByDesc desc = buildKeyGroupByDesc (ctx, aggregateName, "Value", "Key"); + + VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc); + + FakeCaptureOutputOperator out = FakeCaptureOutputOperator.addCaptureOutputChild(vgo); + vgo.initialize(null, null); + out.setOutputInspector(new FakeCaptureOutputOperator.OutputInspector() { + + private int rowIndex; + private String aggregateName; + private HashMap expected; + private Set keys; + + @Override + public void inspectRow(Object row, int tag) throws HiveException { + assertTrue(row instanceof Object[]); + Object[] fields = (Object[]) row; + assertEquals(2, fields.length); + Object key = fields[0]; + Long keyValue = null; + if (null != key) { + assertTrue(key instanceof LongWritable); + LongWritable lwKey = (LongWritable)key; + keyValue = lwKey.get(); + } + assertTrue(expected.containsKey(keyValue)); + Object expectedValue = expected.get(keyValue); + Object value = fields[1]; + Validator validator = getValidator(aggregateName); + validator.validate(expectedValue, new Object[] {value}); + keys.add(keyValue); + } + + private FakeCaptureOutputOperator.OutputInspector init( + String aggregateName, HashMap expected, Set keys) { + this.aggregateName = aggregateName; + this.expected = expected; + this.keys = keys; + return this; + } + }.init(aggregateName, expected, keys)); + + for (VectorizedRowBatch unit: data) { + vgo.process(unit, 0); + } + vgo.close(false); + + List outBatchList = out.getCapturedRows(); + assertNotNull(outBatchList); + assertEquals(expected.size(), outBatchList.size()); + assertEquals(expected.size(), keys.size()); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestConstantVectorExpression.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestConstantVectorExpression.java index f2e5399..c656ce4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestConstantVectorExpression.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestConstantVectorExpression.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.util.VectorizedRowGroupGenUtil; import org.junit.Test; public class TestConstantVectorExpression { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java index e4cd0b6..43458d9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -35,6 +36,20 @@ public class FakeCaptureOutputOperator extends Operator implements Serializable { private static final long serialVersionUID = 1L; + + public interface OutputInspector { + public void inspectRow(Object row, int tag) throws HiveException; + } + + private OutputInspector outputInspector; + + public void setOutputInspector(OutputInspector outputInspector) { + this.outputInspector = outputInspector; + } + + public OutputInspector getOutputInspector() { + return outputInspector; + } private transient List rows; @@ -52,6 +67,7 @@ public static FakeCaptureOutputOperator addCaptureOutputChild( return out; } + public List getCapturedRows() { return rows; } @@ -64,6 +80,9 @@ public void initializeOp(Configuration conf) throws HiveException { @Override public void processOp(Object row, int tag) throws HiveException { rows.add(row); + if (null != outputInspector) { + outputInspector.inspectRow(row, tag); + } } @Override