diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/JavaObjectSizeUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/JavaObjectSizeUtil.java new file mode 100644 index 0000000..74018d3 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/JavaObjectSizeUtil.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +/** + * Helper class to compute memory size for various Java primitives. + * Does not separate between 32 bit/ 64 bit platforms, all computations are for 64 bit. + * + */ +public final class JavaObjectSizeUtil { + public final static int OBJECT_HEADER_SIZE = 16; + public final static int OBJECT_REFERENCE_SIZE = 8; + public final static int MEMORY_ALIGN = 8; + public final static int LENGHT_FIELD_SIZE = 4; + public final static int LONG_ALIGN = 8; + public final static int LONG_SIZE = 8; + public final static int INT_ALIGN = 4; + public final static int INT_SIZE = 4; + public final static int BOOLEAN_SIZE = 1; + public final static int BYTE_SIZE = 1; + public final static int HASHMAP_OVERHEAD = 64; + + public static int alignUp(int value, int align) { + return (value + align - 1) & ~(align - 1); + } + + /** + * Returns the size of a wide (Long, Double, Object) array of given length + */ + public static int getWideArraySize(int length) { + return alignUp(OBJECT_HEADER_SIZE + + alignUp(LENGHT_FIELD_SIZE, LONG_ALIGN) + + length * LONG_SIZE, + MEMORY_ALIGN); + } + + public static int getBooleanArraySize(int length) { + return alignUp(OBJECT_HEADER_SIZE + + alignUp(LENGHT_FIELD_SIZE, INT_ALIGN) + + length * BOOLEAN_SIZE, + MEMORY_ALIGN); + } + + public static int getIntArraySize(int length) { + return alignUp(OBJECT_HEADER_SIZE + + alignUp(LENGHT_FIELD_SIZE, INT_ALIGN) + + length * INT_SIZE, + MEMORY_ALIGN); + } + + public static int getByteArraySize(int length) { + return alignUp(OBJECT_HEADER_SIZE + + alignUp(LENGHT_FIELD_SIZE, INT_ALIGN) + + length * BYTE_SIZE, + MEMORY_ALIGN); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java index 030a73c..7c6ca80 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java @@ -18,38 +18,67 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; + /** - * This maps a batch to the aggregation buffers sets to use for each row (key) + * This maps a batch to the aggregation buffers sets to use for each row (key) * */ public class VectorAggregationBufferBatch { - + /** - * Batch sized array of aggregation buffer sets. + * Batch sized array of aggregation buffer sets. * The array is preallocated and is reused for each batch, but the individual entries * will reference different aggregation buffer set from batch to batch. * the array is not reset between batches, content past this.index will be stale. */ - private VectorAggregationBufferRow[] aggregationBuffers; - + private final VectorAggregationBufferRow[] aggregationBuffers; + /** - * the selection vector that maps row within a batch to the - * specific aggregation buffer set to use. + * Same as aggregationBuffers but only distinct buffers */ - private int[] selection; - + private final VectorAggregationBufferRow[] distinctAggregationBuffers; + /** * versioning number gets incremented on each batch. This allows us to cache the selection - * mapping info in the aggregation buffer set themselves while still being able to + * mapping info in the aggregation buffer set themselves while still being able to * detect stale info. */ private int version; - + /** * Get the number of distinct aggregation buffer sets (ie. keys) used in current batch. */ private int distinctCount; - + + /** + * Memory consumed by a set of aggregation buffers + */ + private int aggregatorsFixedSize; + + /** + * Array of indexes for aggregators that have variable size + */ + private int[] variableSizeAggregators;; + + /** + * returns True if any of the aggregators has a variable size + * @return + */ + public boolean getHasVariableSize() { + return variableSizeAggregators.length > 0; + } + + /** + * Returns the fixed size consumed by the aggregation buffers + * @return + */ + public int getAggregatorsFixedSize() { + return aggregatorsFixedSize; + } + /** * the array of aggregation buffers for the current batch. * content past the {@link #getDistinctBufferSetCount()} index @@ -59,49 +88,73 @@ public VectorAggregationBufferRow[] getAggregationBuffers() { return aggregationBuffers; } - + /** - * number of distinct aggregation buffer sets (ie. keys) in the current batch. + * number of distinct aggregation buffer sets (ie. keys) in the current batch. * @return */ public int getDistinctBufferSetCount () { return distinctCount; } - /** - * gets the selection vector to use for the current batch. This maps the batch rows by position - * (row number) to an index in the {@link #getAggregationBuffers()} array. - * @return - */ - public int[] getSelectionVector() { - return selection; - } - + public VectorAggregationBufferBatch() { aggregationBuffers = new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE]; - selection = new int [VectorizedRowBatch.DEFAULT_SIZE]; + distinctAggregationBuffers = new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE]; } - + /** - * resets the internal aggregation buffers sets index and increments the versioning + * resets the internal aggregation buffers sets index and increments the versioning * used to optimize the selection vector population. */ public void startBatch() { version++; distinctCount = 0; } - + /** * assigns the given aggregation buffer set to a given batch row (by row number). * populates the selection vector appropriately. This is where the versioning numbers - * play a role in determining if the index cached on the aggregation buffer set is stale. + * play a role in determining if the index cached on the aggregation buffer set is stale. */ public void mapAggregationBufferSet(VectorAggregationBufferRow bufferSet, int row) { if (version != bufferSet.getVersion()) { bufferSet.setVersionAndIndex(version, distinctCount); + distinctAggregationBuffers[distinctCount] = bufferSet; ++distinctCount; } aggregationBuffers[row] = bufferSet; } + public void compileAggregationBatchInfo(VectorAggregateExpression[] aggregators) { + int[] variableSizeAggregators = new int[aggregators.length]; + int indexVariableSizes = 0; + aggregatorsFixedSize = JavaObjectSizeUtil.alignUp(JavaObjectSizeUtil.OBJECT_HEADER_SIZE + + JavaObjectSizeUtil.INT_SIZE * 2+ + JavaObjectSizeUtil.OBJECT_REFERENCE_SIZE, + JavaObjectSizeUtil.MEMORY_ALIGN); + aggregatorsFixedSize += JavaObjectSizeUtil.getWideArraySize(aggregators.length); + for(int i=0;i mapKeysAggregationBuffers; @@ -103,15 +156,15 @@ protected void initializeOp(Configuration hconf) throws HiveException { List keysDesc = conf.getKeys(); keyExpressions = vContext.getVectorExpressions(keysDesc); - + keyOutputWriters = new VectorExpressionWriter[keyExpressions.length]; - + for(int i = 0; i < keyExpressions.length; ++i) { keyOutputWriters[i] = VectorExpressionWriterFactory. genVectorExpressionWritable(keysDesc.get(i)); objectInspectors.add(keyOutputWriters[i].getObjectInspector()); } - + ArrayList aggrDesc = conf.getAggregators(); aggregators = new VectorAggregateExpression[aggrDesc.size()]; for (int i = 0; i < aggrDesc.size(); ++i) { @@ -119,9 +172,10 @@ protected void initializeOp(Configuration hconf) throws HiveException { aggregators[i] = vContext.getAggregatorExpression (desc); objectInspectors.add(aggregators[i].getOutputObjectInspector()); } - + keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); aggregationBatchInfo = new VectorAggregationBufferBatch(); + aggregationBatchInfo.compileAggregationBatchInfo(aggregators); mapKeysAggregationBuffers = new HashMap(); List outputFieldNames = conf.getOutputColumnNames(); @@ -133,38 +187,176 @@ protected void initializeOp(Configuration hconf) throws HiveException { } catch (Throwable e) { throw new HiveException(e); } + + computeMemoryLimits(); + initializeChildren(hconf); } + /** + * Computes the memory limits for hash table flush (spill). + */ + private void computeMemoryLimits() { + fixedHashEntrySize = + JavaObjectSizeUtil.HASHMAP_OVERHEAD + + keyWrappersBatch.getKeysFixedSize() + + aggregationBatchInfo.getAggregatorsFixedSize(); + + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + memoryThreshold = conf.getMemoryThreshold(); + // Tests may leave this unitialized, so better set it to 1 + if (memoryThreshold == 0.0f) { + memoryThreshold = 1.0f; + } + + maxHashTblMemory = (int)(maxMemory * memoryThreshold); + + LOG.info(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)", + maxHashTblMemory/1024/1024, + maxMemory/1024/1024, + memoryThreshold, + fixedHashEntrySize, + keyWrappersBatch.getKeysFixedSize(), + aggregationBatchInfo.getAggregatorsFixedSize())); + + } + @Override public void processOp(Object row, int tag) throws HiveException { VectorizedRowBatch batch = (VectorizedRowBatch) row; - + // First we traverse the batch to evaluate and prepare the KeyWrappers // After this the KeyWrappers are properly set and hash code is computed keyWrappersBatch.evaluateBatch(batch); // Next we locate the aggregation buffer set for each key prepareBatchAggregationBufferSets(batch); - + // Finally, evaluate the aggregators processAggregators(batch); + + //Flush if memory limits were reached + if (shouldFlush(batch)) { + flush(false); + } + + if (sumBatchSize == 0 && 0 != batch.size) { + // Sample the first batch processed for variable sizes. + updateAvgVariableSize(batch); + } + + sumBatchSize += batch.size; } - + + /** + * Flushes the entries in the hash table by emiting output (forward). + * When parameter 'all' is true all the entries are flushed. + * @param all + * @throws HiveException + */ + private void flush(boolean all) throws HiveException { + + int entriesToFlush = all ? numEntriesHashTable : + (int)(numEntriesHashTable * PERCENT_ENTRIES_TO_FLUSH); + int entriesFlushed = 0; + + LOG.info(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb)", + entriesToFlush, all ? "(all)" : "", + numEntriesHashTable, fixedHashEntrySize, avgVariableSize, + numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024, + maxHashTblMemory/1024/1024)); + + Object[] forwardCache = new Object[keyExpressions.length + aggregators.length]; + if (keyExpressions.length == 0 && mapKeysAggregationBuffers.isEmpty()) { + + // if this is a global aggregation (no keys) and empty set, must still emit NULLs + VectorAggregationBufferRow emptyBuffers = allocateAggregationBuffer(); + for (int i = 0; i < aggregators.length; ++i) { + forwardCache[i] = aggregators[i].evaluateOutput(emptyBuffers.getAggregationBuffer(i)); + } + forward(forwardCache, outputObjInspector); + } else { + + /* Iterate the global (keywrapper,aggregationbuffers) map and emit + a row for each key */ + Iterator> iter = + mapKeysAggregationBuffers.entrySet().iterator(); + while(iter.hasNext()) { + Map.Entry pair = iter.next(); + int fi = 0; + for (int i = 0; i < keyExpressions.length; ++i) { + VectorHashKeyWrapper kw = (VectorHashKeyWrapper)pair.getKey(); + forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( + kw, i, keyOutputWriters[i]); + } + for (int i = 0; i < aggregators.length; ++i) { + forwardCache[fi++] = aggregators[i].evaluateOutput(pair.getValue() + .getAggregationBuffer(i)); + } + LOG.debug(String.format("forwarding keys: %s: %s", + pair.getKey().toString(), Arrays.toString(forwardCache))); + forward(forwardCache, outputObjInspector); + + if (!all) { + iter.remove(); + --numEntriesHashTable; + if (++entriesFlushed >= entriesToFlush) { + break; + } + } + } + } + + if (all) { + mapKeysAggregationBuffers.clear(); + numEntriesHashTable = 0; + } + } + + /** + * Returns true if the memory threshold for the hash table was reached. + */ + private boolean shouldFlush(VectorizedRowBatch batch) { + if (numEntriesSinceCheck < FLUSH_CHECK_THRESHOLD || + batch.size == 0) { + return false; + } + // Were going to update the average variable row size by sampling the current batch + updateAvgVariableSize(batch); + numEntriesSinceCheck = 0; + return numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory; + } + + /** + * Updates the average variable size of the hash table entries. + * The average is only updates by probing the batch that added the entry in the hash table + * that caused the check threshold to be reached. + */ + private void updateAvgVariableSize(VectorizedRowBatch batch) { + int keyVariableSize = keyWrappersBatch.getVariableSize(batch.size); + int aggVariableSize = aggregationBatchInfo.getVariableSize(batch.size); + + // This assumes the distribution of variable size keys/aggregates in the input + // is the same as the distribution of variable sizes in the hash entries + avgVariableSize = (int)((avgVariableSize * sumBatchSize + keyVariableSize +aggVariableSize) / + (sumBatchSize + batch.size)); + } + /** * Evaluates the aggregators on the current batch. * The aggregationBatchInfo must have been prepared - * by calling {@link #prepareBatchAggregationBufferSets} first. + * by calling {@link #prepareBatchAggregationBufferSets} first. */ private void processAggregators(VectorizedRowBatch batch) throws HiveException { // We now have a vector of aggregation buffer sets to use for each row // We can start computing the aggregates. // If the number of distinct keys in the batch is 1 we can // use the optimized code path of aggregateInput - VectorAggregationBufferRow[] aggregationBufferSets = + VectorAggregationBufferRow[] aggregationBufferSets = aggregationBatchInfo.getAggregationBuffers(); if (aggregationBatchInfo.getDistinctBufferSetCount() == 1) { - VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = + VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = aggregationBufferSets[0].getAggregationBuffers(); for (int i = 0; i < aggregators.length; ++i) { aggregators[i].aggregateInput(aggregationBuffers[i], batch); @@ -178,16 +370,16 @@ private void processAggregators(VectorizedRowBatch batch) throws HiveException { } } } - + /** * Locates the aggregation buffer sets to use for each key in the current batch. * The keyWrappersBatch must have evaluated the current batch first. */ private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws HiveException { // The aggregation batch vector needs to know when we start a new batch - // to bump its internal version. + // to bump its internal version. aggregationBatchInfo.startBatch(); - + // We now have to probe the global hash and find-or-allocate // the aggregation buffers to use for each key present in the batch VectorHashKeyWrapper[] keyWrappers = keyWrappersBatch.getVectorHashKeyWrappers(); @@ -195,12 +387,14 @@ private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws VectorHashKeyWrapper kw = keyWrappers[i]; VectorAggregationBufferRow aggregationBuffer = mapKeysAggregationBuffers.get(kw); if (null == aggregationBuffer) { - // the probe failed, we must allocate a set of aggregation buffers + // the probe failed, we must allocate a set of aggregation buffers // and push the (keywrapper,buffers) pair into the hash. // is very important to clone the keywrapper, the one we have from our // keyWrappersBatch is going to be reset/reused on next batch. aggregationBuffer = allocateAggregationBuffer(); mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer); + numEntriesHashTable++; + numEntriesSinceCheck++; } aggregationBatchInfo.mapAggregationBufferSet(aggregationBuffer, i); } @@ -210,7 +404,7 @@ private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws * allocates a new aggregation buffer set. */ private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException { - VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = + VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = new VectorAggregateExpression.AggregationBuffer[aggregators.length]; for (int i=0; i < aggregators.length; ++i) { aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer(); @@ -223,36 +417,7 @@ private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveExcept @Override public void closeOp(boolean aborted) throws HiveException { if (!aborted) { - Object[] forwardCache = new Object[keyExpressions.length + aggregators.length]; - if (keyExpressions.length == 0 && mapKeysAggregationBuffers.isEmpty()) { - - // if this is a global aggregation (no keys) and empty set, must still emit NULLs - VectorAggregationBufferRow emptyBuffers = allocateAggregationBuffer(); - for (int i = 0; i < aggregators.length; ++i) { - forwardCache[i] = aggregators[i].evaluateOutput(emptyBuffers.getAggregationBuffer(i)); - } - forward(forwardCache, outputObjInspector); - } else { - - /* Iterate the global (keywrapper,aggregationbuffers) map and emit - a row for each key */ - for(Map.Entry pair: - mapKeysAggregationBuffers.entrySet()){ - int fi = 0; - for (int i = 0; i < keyExpressions.length; ++i) { - VectorHashKeyWrapper kw = (VectorHashKeyWrapper)pair.getKey(); - forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( - kw, i, keyOutputWriters[i]); - } - for (int i = 0; i < aggregators.length; ++i) { - forwardCache[fi++] = aggregators[i].evaluateOutput(pair.getValue() - .getAggregationBuffer(i)); - } - LOG.debug(String.format("forwarding keys: %s: %s", - pair.getKey().toString(), Arrays.toString(forwardCache))); - forward(forwardCache, outputObjInspector); - } - } + flush(true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java index 01dd7be..72da2d2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java @@ -233,6 +233,14 @@ public int getByteLength(int i) { return byteLengths[i - longValues.length - doubleValues.length]; } + public int getVariableSize() { + int variableSize = 0; + for (int i=0; i extends VectorAggregateExpression { count++; } } + + @Override + public int getVariableSize() { + throw new UnsupportedOperationException(); + } } private VectorExpression inputExpression; @@ -435,5 +441,14 @@ public class extends VectorAggregateExpression { public ObjectInspector getOutputObjectInspector() { return soi; } + + @Override + public int getAggregationBufferFixedSize() { + return JavaObjectSizeUtil.alignUp( + JavaObjectSizeUtil.OBJECT_HEADER_SIZE + + JavaObjectSizeUtil.LONG_SIZE + + JavaObjectSizeUtil.LONG_SIZE, + JavaObjectSizeUtil.MEMORY_ALIGN); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt deleted file mode 100644 index 69ff67b..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen; - -import java.util.ArrayList; - -import org.apache.hadoop.hive.ql.exec.Description; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. - VectorAggregateExpression.AggregationBuffer; -import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; - -/** -* . Vectorized implementation for COUNT aggregates. -*/ -@Description(name = "count", value = "_FUNC_(expr) - Returns the maximum value of expr (vectorized)") -public class extends VectorAggregateExpression { - - /** - /* class for storing the current aggregate value. - */ - static class Aggregation implements AggregationBuffer { - long value; - boolean isNull; - - public void initIfNull() { - if (isNull) { - isNull = false; - value = 0; - } - } - } - - private VectorExpression inputExpression; - private LongWritable result; - - public (VectorExpression inputExpression) { - super(); - this.inputExpression = inputExpression; - result = new LongWritable(0); - } - - private Aggregation getCurrentAggregationBuffer( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int row) { - VectorAggregationBufferRow mySet = aggregationBufferSets[row]; - Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); - return myagg; - } - - @Override - public void aggregateInputSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - VectorizedRowBatch batch) throws HiveException { - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - inputExpression.evaluate(batch); - - VectorColumn inputVector = (VectorColumn)batch. - cols[this.inputExpression.getOutputColumn()]; - - if (inputVector.noNulls) { - // if there are no nulls then the iteration is the same on all cases - iterateNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, batchSize); - } else if (!batch.selectedInUse) { - iterateHasNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, - batchSize, inputVector.isNull); - } else if (batch.selectedInUse) { - iterateHasNullsSelectionWithAggregationSelection( - aggregationBufferSets, aggregateIndex, - batchSize, batch.selected, inputVector.isNull); - } - } - - private void iterateNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int batchSize) { - - for (int i=0; i < batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - myagg.initIfNull(); - myagg.value++; - } - } - - private void iterateHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int batchSize, - boolean[] isNull) { - - for (int i=0; i < batchSize; ++i) { - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - myagg.initIfNull(); - myagg.value++; - } - } - } - - private void iterateHasNullsSelectionWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int batchSize, - int[] selection, - boolean[] isNull) { - - for (int j=0; j < batchSize; ++j) { - int i = selection[j]; - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - j); - myagg.initIfNull(); - myagg.value++; - } - } - } - - - @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) - throws HiveException { - - inputExpression.evaluate(batch); - - VectorColumn inputVector = (VectorColumn)batch. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - Aggregation myagg = (Aggregation)agg; - - myagg.initIfNull(); - - if (inputVector.isRepeating) { - if (inputVector.noNulls || !inputVector.isNull[0]) { - myagg.value += batchSize; - } - return; - } - - if (inputVector.noNulls) { - myagg.value += batchSize; - return; - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull); - } - else { - iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected); - } - } - - private void iterateSelectionHasNulls( - Aggregation myagg, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - int i = selected[j]; - if (!isNull[i]) { - myagg.value += 1; - } - } - } - - private void iterateNoSelectionHasNulls( - Aggregation myagg, - int batchSize, - boolean[] isNull) { - - for (int i=0; i< batchSize; ++i) { - if (!isNull[i]) { - myagg.value += 1; - } - } - } - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - return new Aggregation(); - } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; - } - - @Override - public Object evaluateOutput(AggregationBuffer agg) throws HiveException { - Aggregation myagg = (Aggregation) agg; - if (myagg.isNull) { - return null; - } - else { - result.set (myagg.value); - return result; - } - } - - @Override - public ObjectInspector getOutputObjectInspector() { - return PrimitiveObjectInspectorFactory.writableLongObjectInspector; - } - -} - diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt index d00d9ae..7894f94 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.JavaObjectSizeUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; @@ -63,6 +64,11 @@ public class extends VectorAggregateExpression { this.value = value; } } + + @Override + public int getVariableSize() { + throw new UnsupportedOperationException(); + } } private VectorExpression inputExpression; @@ -411,5 +417,14 @@ public class extends VectorAggregateExpression { public ObjectInspector getOutputObjectInspector() { return ; } + + @Override + public int getAggregationBufferFixedSize() { + return JavaObjectSizeUtil.alignUp( + JavaObjectSizeUtil.OBJECT_HEADER_SIZE + + JavaObjectSizeUtil.LONG_SIZE, + JavaObjectSizeUtil.MEMORY_ALIGN); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt index 96ce80c..5084477 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.JavaObjectSizeUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; @@ -74,6 +75,11 @@ public class extends VectorAggregateExpression { System.arraycopy(bytes, start, this.bytes, 0, length); this.length = length; } + @Override + public int getVariableSize() { + return JavaObjectSizeUtil.getByteArraySize(bytes.length); + } + } private VectorExpression inputExpression; @@ -359,5 +365,21 @@ public class extends VectorAggregateExpression { public ObjectInspector getOutputObjectInspector() { return PrimitiveObjectInspectorFactory.writableStringObjectInspector; } + + @Override + public int getAggregationBufferFixedSize() { + return JavaObjectSizeUtil.alignUp( + JavaObjectSizeUtil.OBJECT_HEADER_SIZE + + JavaObjectSizeUtil.OBJECT_REFERENCE_SIZE + + JavaObjectSizeUtil.INT_SIZE + + JavaObjectSizeUtil.BOOLEAN_SIZE, + JavaObjectSizeUtil.MEMORY_ALIGN); + } + + @Override + public boolean hasVariableSize() { + return true; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt index c4e6a19..1ad5d4c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.JavaObjectSizeUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; @@ -62,6 +63,11 @@ public class extends VectorAggregateExpression { sum += value; } } + + @Override + public int getVariableSize() { + throw new UnsupportedOperationException(); + } } VectorExpression inputExpression; @@ -404,5 +410,15 @@ public class extends VectorAggregateExpression { public ObjectInspector getOutputObjectInspector() { return ; } + + @Override + public int getAggregationBufferFixedSize() { + return JavaObjectSizeUtil.alignUp( + JavaObjectSizeUtil.OBJECT_HEADER_SIZE + + JavaObjectSizeUtil.LONG_SIZE, + JavaObjectSizeUtil.MEMORY_ALIGN); + } + + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt index 9c8ad94..b8a3f96 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer; +import org.apache.hadoop.hive.ql.exec.vector.JavaObjectSizeUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; @@ -62,6 +63,11 @@ public class extends VectorAggregateExpression { count = 0; variance = 0; } + + @Override + public int getVariableSize() { + throw new UnsupportedOperationException(); + } } private VectorExpression inputExpression; @@ -482,5 +488,17 @@ public class extends VectorAggregateExpression { public ObjectInspector getOutputObjectInspector() { return soi; } + + @Override + public int getAggregationBufferFixedSize() { + return JavaObjectSizeUtil.alignUp( + JavaObjectSizeUtil.OBJECT_HEADER_SIZE + + JavaObjectSizeUtil.LONG_SIZE + + JavaObjectSizeUtil.LONG_SIZE + + JavaObjectSizeUtil.LONG_SIZE + + JavaObjectSizeUtil.BOOLEAN_SIZE, + JavaObjectSizeUtil.MEMORY_ALIGN); + } + } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java index 0050ebc..48a26c6 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java @@ -43,18 +43,18 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; - +import org.apache.hadoop.io.Text; import org.junit.Assert; import org.junit.Test; @@ -151,12 +151,12 @@ private static GroupByDesc buildKeyGroupByDesc( ArrayList keys = new ArrayList(); keys.add(keyExp); desc.setKeys(keys); - + desc.getOutputColumnNames().add("_col1"); return desc; } - + @Test public void testDoubleValueTypeSum() throws HiveException { testKeyTypeAggregate( @@ -168,7 +168,7 @@ public void testDoubleValueTypeSum() throws HiveException { Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 20.0, null, 19.0)); } - + @Test public void testDoubleValueTypeSumOneKey() throws HiveException { testKeyTypeAggregate( @@ -179,8 +179,8 @@ public void testDoubleValueTypeSumOneKey() throws HiveException { Arrays.asList(new Object[]{ 1, 1, 1, 1}), Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 39.0)); - } - + } + @Test public void testDoubleValueTypeCount() throws HiveException { testKeyTypeAggregate( @@ -192,7 +192,7 @@ public void testDoubleValueTypeCount() throws HiveException { Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 2L, null, 1L)); } - + public void testDoubleValueTypeCountOneKey() throws HiveException { testKeyTypeAggregate( "count", @@ -202,8 +202,8 @@ public void testDoubleValueTypeCountOneKey() throws HiveException { Arrays.asList(new Object[]{ 1, 1, 1, 1}), Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 3L)); - } - + } + @Test public void testDoubleValueTypeAvg() throws HiveException { testKeyTypeAggregate( @@ -215,7 +215,7 @@ public void testDoubleValueTypeAvg() throws HiveException { Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 10.0, null, 19.0)); } - + @Test public void testDoubleValueTypeAvgOneKey() throws HiveException { testKeyTypeAggregate( @@ -226,8 +226,8 @@ public void testDoubleValueTypeAvgOneKey() throws HiveException { Arrays.asList(new Object[]{ 1, 1, 1, 1}), Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 13.0)); - } - + } + @Test public void testDoubleValueTypeMin() throws HiveException { testKeyTypeAggregate( @@ -239,7 +239,7 @@ public void testDoubleValueTypeMin() throws HiveException { Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 7.0, null, 19.0)); } - + @Test public void testDoubleValueTypeMinOneKey() throws HiveException { testKeyTypeAggregate( @@ -251,7 +251,7 @@ public void testDoubleValueTypeMinOneKey() throws HiveException { Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 7.0)); } - + @Test public void testDoubleValueTypeMax() throws HiveException { testKeyTypeAggregate( @@ -287,7 +287,7 @@ public void testDoubleValueTypeVariance() throws HiveException { Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 9.0, null, 0.0)); } - + @Test public void testDoubleValueTypeVarianceOneKey() throws HiveException { testKeyTypeAggregate( @@ -298,7 +298,7 @@ public void testDoubleValueTypeVarianceOneKey() throws HiveException { Arrays.asList(new Object[]{ 1, 1, 1, 1}), Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 24.0)); - } + } @Test public void testTinyintKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -310,7 +310,7 @@ public void testTinyintKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap((byte)1, 20L, null, 19L)); } - + @Test public void testSmallintKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -321,8 +321,8 @@ public void testSmallintKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{ 1,null, 1, null}), Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap((short)1, 20L, null, 19L)); - } - + } + @Test public void testIntKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -334,7 +334,7 @@ public void testIntKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap((int)1, 20L, null, 19L)); } - + @Test public void testBigintKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -346,7 +346,7 @@ public void testBigintKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap((long)1L, 20L, null, 19L)); } - + @Test public void testBooleanKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -358,7 +358,7 @@ public void testBooleanKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap(true, 20L, null, 19L)); } - + @Test public void testTimestampKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -369,8 +369,8 @@ public void testTimestampKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{new Timestamp(1),null, new Timestamp(1), null}), Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap(new Timestamp(1), 20L, null, 19L)); - } - + } + @Test public void testFloatKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -382,7 +382,7 @@ public void testFloatKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap((float)1.0, 20L, null, 19L)); } - + @Test public void testDoubleKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -393,8 +393,8 @@ public void testDoubleKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{ 1,null, 1, null}), Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap((double)1.0, 20L, null, 19L)); - } - + } + @Test public void testCountStar() throws HiveException { testAggregateCountStar( @@ -1262,18 +1262,18 @@ public void testStdDevSampLongRepeat () throws HiveException { 1024, (double)0); } - + private void testKeyTypeAggregate( String aggregateName, FakeVectorRowBatchFromObjectIterables data, Map expected) throws HiveException { - + Map mapColumnNames = new HashMap(); mapColumnNames.put("Key", 0); mapColumnNames.put("Value", 1); VectorizationContext ctx = new VectorizationContext(mapColumnNames, 2); Set keys = new HashSet(); - + AggregationDesc agg = buildAggregationDesc(ctx, aggregateName, "Value", TypeInfoFactory.getPrimitiveTypeInfo(data.getTypes()[1])); ArrayList aggs = new ArrayList(); @@ -1287,7 +1287,7 @@ private void testKeyTypeAggregate( desc.setOutputColumnNames(outputColumnNames); desc.setAggregators(aggs); - ExprNodeDesc keyExp = buildColumnDesc(ctx, "Key", + ExprNodeDesc keyExp = buildColumnDesc(ctx, "Key", TypeInfoFactory.getPrimitiveTypeInfo(data.getTypes()[0])); ArrayList keysDesc = new ArrayList(); keysDesc.add(keyExp); @@ -1338,10 +1338,10 @@ public void inspectRow(Object row, int tag) throws HiveException { BooleanWritable bwKey = (BooleanWritable)key; keyValue = bwKey.get(); } else { - Assert.fail(String.format("Not implemented key output type %s: %s", + Assert.fail(String.format("Not implemented key output type %s: %s", key.getClass().getName(), key)); } - + assertTrue(expected.containsKey(keyValue)); Object expectedValue = expected.get(keyValue); Object value = fields[1]; @@ -1367,8 +1367,8 @@ public void inspectRow(Object row, int tag) throws HiveException { List outBatchList = out.getCapturedRows(); assertNotNull(outBatchList); assertEquals(expected.size(), outBatchList.size()); - assertEquals(expected.size(), keys.size()); - } + assertEquals(expected.size(), keys.size()); + } public void testAggregateLongRepeats ( @@ -1502,6 +1502,10 @@ public void validate(Object expected, Object result) { BytesWritable bw = (BytesWritable) arr[0]; String sbw = new String(bw.getBytes()); assertEquals((String) expected, sbw); + } else if (arr[0] instanceof Text) { + Text tx = (Text) arr[0]; + String sbw = tx.toString(); + assertEquals((String) expected, sbw); } else if (arr[0] instanceof DoubleWritable) { DoubleWritable dw = (DoubleWritable) arr[0]; assertEquals ((Double) expected, (Double) dw.get()); @@ -1849,9 +1853,9 @@ public void inspectRow(Object row, int tag) throws HiveException { Object key = fields[0]; String keyValue = null; if (null != key) { - assertTrue(key instanceof BytesWritable); - BytesWritable bwKey = (BytesWritable)key; - keyValue = new String(bwKey.get()); + assertTrue(key instanceof Text); + Text bwKey = (Text)key; + keyValue = bwKey.toString(); } assertTrue(expected.containsKey(keyValue)); Object expectedValue = expected.get(keyValue);