diff --git ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt index 547a60a..6f7ee6a 100644 --- ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt +++ ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt @@ -75,6 +75,13 @@ public class extends VectorAggregateExpression { public int getVariableSize() { throw new UnsupportedOperationException(); } + + @Override + public void reset () { + isNull = true; + sum = 0; + count = 0L; + } } private VectorExpression inputExpression; @@ -430,7 +437,7 @@ public class extends VectorAggregateExpression { @Override public void reset(AggregationBuffer agg) throws HiveException { Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; + myAgg.reset(); } @Override diff --git ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt index dcc1dfb..0595f71 100644 --- ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt +++ ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt @@ -68,6 +68,12 @@ public class extends VectorAggregateExpression { public int getVariableSize() { throw new UnsupportedOperationException(); } + + @Override + public void reset () { + isNull = true; + value = 0; + } } private VectorExpression inputExpression; @@ -405,7 +411,7 @@ public class extends VectorAggregateExpression { @Override public void reset(AggregationBuffer agg) throws HiveException { Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; + myAgg.reset(); } @Override diff --git ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt index 37ce103..ea3666a 100644 --- ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt +++ ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt @@ -73,6 +73,12 @@ public class extends VectorAggregateExpression { public int getVariableSize() { throw new UnsupportedOperationException(); } + + @Override + public void reset () { + isNull = true; + value.zeroClear(); + } } private VectorExpression inputExpression; @@ -423,7 +429,7 @@ public class extends VectorAggregateExpression { @Override public void reset(AggregationBuffer agg) throws HiveException { Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; + myAgg.reset(); } @Override diff --git ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt index 1f8b28c..7e0dda6 100644 --- ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt +++ ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt @@ -83,6 +83,13 @@ public class extends VectorAggregateExpression { JavaDataModel model = JavaDataModel.get(); return model.lengthForByteArrayOfSize(bytes.length); } + + @Override + public void reset () { + isNull = true; + length = 0; + } + } private VectorExpression inputExpression; @@ -352,7 +359,7 @@ public class extends VectorAggregateExpression { @Override public void reset(AggregationBuffer agg) throws HiveException { Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; + myAgg.reset(); } @Override diff --git ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt index cb0be33..4f70733 100644 --- ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt +++ ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt @@ -69,6 +69,12 @@ public class extends VectorAggregateExpression { public int getVariableSize() { throw new UnsupportedOperationException(); } + + @Override + public void reset () { + isNull = true; + sum = 0;; + } } private VectorExpression inputExpression; @@ -396,7 +402,7 @@ public class extends VectorAggregateExpression { @Override public void reset(AggregationBuffer agg) throws HiveException { Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; + myAgg.reset(); } @Override diff --git ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt index 49b0edd..c6c9c52 100644 --- ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt +++ ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt @@ -73,6 +73,14 @@ public class extends VectorAggregateExpression { public int getVariableSize() { throw new UnsupportedOperationException(); } + + @Override + public void reset () { + isNull = true; + sum = 0; + count = 0; + variance = 0; + } } private VectorExpression inputExpression; @@ -475,7 +483,7 @@ public class extends VectorAggregateExpression { @Override public void reset(AggregationBuffer agg) throws HiveException { Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; + myAgg.reset(); } @Override diff --git ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt index c5af930..c58f9b6 100644 --- ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt +++ ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt @@ -79,6 +79,14 @@ public class extends VectorAggregateExpression { throw new UnsupportedOperationException(); } + @Override + public void reset () { + isNull = true; + sum.zeroClear(); + count = 0; + variance = 0f; + } + public void updateValueWithCheckAndInit(Decimal128 scratch, Decimal128 value, short scale) { if (this.isNull) { this.init(); @@ -437,7 +445,7 @@ public class extends VectorAggregateExpression { @Override public void reset(AggregationBuffer agg) throws HiveException { Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; + myAgg.reset(); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index c4c85fa..fc8aa72 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -120,7 +120,7 @@ transient boolean firstRow; transient long totalMemory; - transient boolean hashAggr; + protected transient boolean hashAggr; // The reduction is happening on the reducer, and the grouping key and // reduction keys are different. // For example: select a, count(distinct b) from T group by a @@ -130,8 +130,8 @@ transient long numRowsInput; transient long numRowsHashTbl; transient int groupbyMapAggrInterval; - transient long numRowsCompareHashAggr; - transient float minReductionHashAggr; + protected transient long numRowsCompareHashAggr; + protected transient float minReductionHashAggr; // current Key ObjectInspectors are standard ObjectInspectors protected transient ObjectInspector[] currentKeyObjectInspectors; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java index 7aa4b11..99cf9b3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java @@ -71,5 +71,14 @@ public void setVersionAndIndex(int version, int index) { this.index = index; this.version = version; } + + /** + * Resets the aggregation buffers for reuse + */ + public void reset() { + for(int i = 0; i < aggregationBuffers.length; ++i) { + aggregationBuffers[i].reset(); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 4568496..6c3d4f5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -82,46 +82,470 @@ */ private transient VectorHashKeyWrapperBatch keyWrappersBatch; - /** - * Total per hashtable entry fixed memory (does not depend on key/agg values). - */ - private transient int fixedHashEntrySize; + private transient Object[] forwardCache; /** - * Average per hashtable entry variable size memory (depends on key/agg value). + * Interface for processing mode: global, hash or streaming */ - private transient int avgVariableSize; + private static interface IProcessingMode { + public void initialize(Configuration hconf) throws HiveException; + public void processBatch(VectorizedRowBatch batch) throws HiveException; + public void close(boolean aborted) throws HiveException; + } /** - * Number of entries added to the hashtable since the last check if it should flush. + * Base class for all processing modes */ - private transient int numEntriesSinceCheck; + private abstract class ProcessingModeBase implements IProcessingMode { + /** + * Evaluates the aggregators on the current batch. + * The aggregationBatchInfo must have been prepared + * by calling {@link #prepareBatchAggregationBufferSets} first. + */ + protected void processAggregators(VectorizedRowBatch batch) throws HiveException { + // We now have a vector of aggregation buffer sets to use for each row + // We can start computing the aggregates. + // If the number of distinct keys in the batch is 1 we can + // use the optimized code path of aggregateInput + VectorAggregationBufferRow[] aggregationBufferSets = + aggregationBatchInfo.getAggregationBuffers(); + if (aggregationBatchInfo.getDistinctBufferSetCount() == 1) { + VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = + aggregationBufferSets[0].getAggregationBuffers(); + for (int i = 0; i < aggregators.length; ++i) { + aggregators[i].aggregateInput(aggregationBuffers[i], batch); + } + } else { + for (int i = 0; i < aggregators.length; ++i) { + aggregators[i].aggregateInputSelection( + aggregationBufferSets, + i, + batch); + } + } + } - /** - * Sum of batch size processed (ie. rows). - */ - private transient long sumBatchSize; - - /** - * Max number of entries in the vector group by aggregation hashtables. - * Exceeding this will trigger a flush irrelevant of memory pressure condition. - */ - private transient int maxHtEntries = 1000000; + /** + * allocates a new aggregation buffer set. + */ + protected VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException { + VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = + new VectorAggregateExpression.AggregationBuffer[aggregators.length]; + for (int i=0; i < aggregators.length; ++i) { + aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer(); + aggregators[i].reset(aggregationBuffers[i]); + } + VectorAggregationBufferRow bufferSet = new VectorAggregationBufferRow(aggregationBuffers); + return bufferSet; + } + + } /** - * The number of new entries that must be added to the hashtable before a memory size check. + * Global aggregates (no GROUP BY clause, no keys) + * This mode is very simple, there are no keys to consider, and only flushes one row at closing + * The one row must flush even if no input was seen (NULLs) */ - private transient int checkInterval = 10000; + private class ProcessingModeGlobalAggregate extends ProcessingModeBase { + + private VectorAggregationBufferRow aggregationBuffers; + + @Override + public void initialize(Configuration hconf) throws HiveException { + aggregationBuffers = allocateAggregationBuffer(); + LOG.info("using global aggregation processing mode"); + } + + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { + for (int i = 0; i < aggregators.length; ++i) { + aggregators[i].aggregateInput(aggregationBuffers.getAggregationBuffer(i), batch); + } + } + + @Override + public void close(boolean aborted) throws HiveException { + if (!aborted) { + flushSingleRow(null, aggregationBuffers); + } + } + } /** - * Percent of entries to flush when memory threshold exceeded. + * Hash Aggregate mode processing */ - private transient float percentEntriesToFlush = 0.1f; + private class ProcessingModeHashAggregate extends ProcessingModeBase { + + /** + * The global key-aggregation hash map. + */ + private transient Map mapKeysAggregationBuffers; + + /** + * Total per hashtable entry fixed memory (does not depend on key/agg values). + */ + private transient int fixedHashEntrySize; + + /** + * Average per hashtable entry variable size memory (depends on key/agg value). + */ + private transient int avgVariableSize; + + /** + * Number of entries added to the hashtable since the last check if it should flush. + */ + private transient int numEntriesSinceCheck; + + /** + * Sum of batch size processed (ie. rows). + */ + private transient long sumBatchSize; + + /** + * Max number of entries in the vector group by aggregation hashtables. + * Exceeding this will trigger a flush irrelevant of memory pressure condition. + */ + private transient int maxHtEntries = 1000000; + + /** + * The number of new entries that must be added to the hashtable before a memory size check. + */ + private transient int checkInterval = 10000; + + /** + * Percent of entries to flush when memory threshold exceeded. + */ + private transient float percentEntriesToFlush = 0.1f; + + /** + * Count of rows since the last check for changing from aggregate to streaming mode + */ + private transient long lastModeCheckRowCount = 0; + + @Override + public void initialize(Configuration hconf) throws HiveException { + // hconf is null in unit testing + if (null != hconf) { + this.percentEntriesToFlush = HiveConf.getFloatVar(hconf, + HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT); + this.checkInterval = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL); + this.maxHtEntries = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES); + } + + mapKeysAggregationBuffers = new HashMap(); + computeMemoryLimits(); + LOG.info("using hash aggregation processing mode"); + } + + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { + + // First we traverse the batch to evaluate and prepare the KeyWrappers + // After this the KeyWrappers are properly set and hash code is computed + keyWrappersBatch.evaluateBatch(batch); + + // Next we locate the aggregation buffer set for each key + prepareBatchAggregationBufferSets(batch); + + // Finally, evaluate the aggregators + processAggregators(batch); + + //Flush if memory limits were reached + // We keep flushing until the memory is under threshold + int preFlushEntriesCount = numEntriesHashTable; + while (shouldFlush(batch)) { + flush(false); + + //Validate that some progress is being made + if (!(numEntriesHashTable < preFlushEntriesCount)) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Flush did not progress: %d entries before, %d entries after", + preFlushEntriesCount, + numEntriesHashTable)); + } + break; + } + preFlushEntriesCount = numEntriesHashTable; + } + + if (sumBatchSize == 0 && 0 != batch.size) { + // Sample the first batch processed for variable sizes. + updateAvgVariableSize(batch); + } + + sumBatchSize += batch.size; + lastModeCheckRowCount += batch.size; + + // Check if we should turn into streaming mode + checkHashModeEfficiency(); + } + + @Override + public void close(boolean aborted) throws HiveException { + if (!aborted) { + flush(true); + } + } + + /** + * Locates the aggregation buffer sets to use for each key in the current batch. + * The keyWrappersBatch must have evaluated the current batch first. + */ + private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws HiveException { + // The aggregation batch vector needs to know when we start a new batch + // to bump its internal version. + aggregationBatchInfo.startBatch(); + + // We now have to probe the global hash and find-or-allocate + // the aggregation buffers to use for each key present in the batch + VectorHashKeyWrapper[] keyWrappers = keyWrappersBatch.getVectorHashKeyWrappers(); + for (int i=0; i < batch.size; ++i) { + VectorHashKeyWrapper kw = keyWrappers[i]; + VectorAggregationBufferRow aggregationBuffer = mapKeysAggregationBuffers.get(kw); + if (null == aggregationBuffer) { + // the probe failed, we must allocate a set of aggregation buffers + // and push the (keywrapper,buffers) pair into the hash. + // is very important to clone the keywrapper, the one we have from our + // keyWrappersBatch is going to be reset/reused on next batch. + aggregationBuffer = allocateAggregationBuffer(); + mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer); + numEntriesHashTable++; + numEntriesSinceCheck++; + } + aggregationBatchInfo.mapAggregationBufferSet(aggregationBuffer, i); + } + } + + /** + * Computes the memory limits for hash table flush (spill). + */ + private void computeMemoryLimits() { + JavaDataModel model = JavaDataModel.get(); + + fixedHashEntrySize = + model.hashMapEntry() + + keyWrappersBatch.getKeysFixedSize() + + aggregationBatchInfo.getAggregatorsFixedSize(); + + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + memoryThreshold = conf.getMemoryThreshold(); + // Tests may leave this unitialized, so better set it to 1 + if (memoryThreshold == 0.0f) { + memoryThreshold = 1.0f; + } + + maxHashTblMemory = (int)(maxMemory * memoryThreshold); + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)", + maxHashTblMemory/1024/1024, + maxMemory/1024/1024, + memoryThreshold, + fixedHashEntrySize, + keyWrappersBatch.getKeysFixedSize(), + aggregationBatchInfo.getAggregatorsFixedSize())); + } + } + + /** + * Flushes the entries in the hash table by emiting output (forward). + * When parameter 'all' is true all the entries are flushed. + * @param all + * @throws HiveException + */ + private void flush(boolean all) throws HiveException { + + int entriesToFlush = all ? numEntriesHashTable : + (int)(numEntriesHashTable * this.percentEntriesToFlush); + int entriesFlushed = 0; + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb)", + entriesToFlush, all ? "(all)" : "", + numEntriesHashTable, fixedHashEntrySize, avgVariableSize, + numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024, + maxHashTblMemory/1024/1024)); + } + + /* Iterate the global (keywrapper,aggregationbuffers) map and emit + a row for each key */ + Iterator> iter = + mapKeysAggregationBuffers.entrySet().iterator(); + while(iter.hasNext()) { + Map.Entry pair = iter.next(); + + flushSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue()); + + if (!all) { + iter.remove(); + --numEntriesHashTable; + if (++entriesFlushed >= entriesToFlush) { + break; + } + } + } + + if (all) { + mapKeysAggregationBuffers.clear(); + numEntriesHashTable = 0; + } + } + + /** + * Returns true if the memory threshold for the hash table was reached. + */ + private boolean shouldFlush(VectorizedRowBatch batch) { + if (batch.size == 0) { + return false; + } + //numEntriesSinceCheck is the number of entries added to the hash table + // since the last time we checked the average variable size + if (numEntriesSinceCheck >= this.checkInterval) { + // Were going to update the average variable row size by sampling the current batch + updateAvgVariableSize(batch); + numEntriesSinceCheck = 0; + } + return numEntriesHashTable > this.maxHtEntries || + numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory; + } + + /** + * Updates the average variable size of the hash table entries. + * The average is only updates by probing the batch that added the entry in the hash table + * that caused the check threshold to be reached. + */ + private void updateAvgVariableSize(VectorizedRowBatch batch) { + int keyVariableSize = keyWrappersBatch.getVariableSize(batch.size); + int aggVariableSize = aggregationBatchInfo.getVariableSize(batch.size); + + // This assumes the distribution of variable size keys/aggregates in the input + // is the same as the distribution of variable sizes in the hash entries + avgVariableSize = (int)((avgVariableSize * sumBatchSize + keyVariableSize +aggVariableSize) / + (sumBatchSize + batch.size)); + } + + private void checkHashModeEfficiency() throws HiveException { + if (numRowsCompareHashAggr > lastModeCheckRowCount) { + lastModeCheckRowCount = 0; + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("checkHashModeEfficiency: HT:%d RC:%d MIN:%d", + numEntriesHashTable, sumBatchSize, (long)(sumBatchSize * minReductionHashAggr))); + } + if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) { + flush(true); + + changeToStreamingMode(); + } + } + } + } /** - * The global key-aggregation hash map. + * Streaming processing mode. Intermediate values are flushed each time key changes. + * In this mode we're relying on the MR shuffle and merge the intermediates in the reduce. */ - private transient Map mapKeysAggregationBuffers; + private class ProcessingModeStreaming extends ProcessingModeBase { + + /** + * The aggreagation buffers used in streaming mode + */ + private VectorAggregationBufferRow currentStreamingAggregators; + + /** + * The current key, used in streaming mode + */ + private VectorHashKeyWrapper streamingKey; + + private final VectorHashKeyWrapper[] keysToFlush = + new VectorHashKeyWrapper[VectorizedRowBatch.DEFAULT_SIZE]; + + private final VectorAggregationBufferRow[] rowsToFlush = + new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE]; + + /** + * A pool of VectorAggregationBufferRow to avoid repeated allocations + */ + private VectorUtilBatchObjectPool + streamAggregationBufferRowPool; + + @Override + public void initialize(Configuration hconf) throws HiveException { + streamAggregationBufferRowPool = new VectorUtilBatchObjectPool( + VectorizedRowBatch.DEFAULT_SIZE, + new VectorUtilBatchObjectPool.IAllocator() { + + @Override + public VectorAggregationBufferRow alloc() throws HiveException { + return allocateAggregationBuffer(); + } + + @Override + public void free(VectorAggregationBufferRow t) { + // Nothing to do + } + }); + LOG.info("using streaming aggregation processing mode"); + } + + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { + // First we traverse the batch to evaluate and prepare the KeyWrappers + // After this the KeyWrappers are properly set and hash code is computed + keyWrappersBatch.evaluateBatch(batch); + + VectorHashKeyWrapper[] batchKeys = keyWrappersBatch.getVectorHashKeyWrappers(); + + if (streamingKey == null) { + // This is the first batch we process after switching from hash mode + currentStreamingAggregators = streamAggregationBufferRowPool.getFromPool(); + streamingKey = (VectorHashKeyWrapper) batchKeys[0].copyKey(); + } + + aggregationBatchInfo.startBatch(); + int flushMark = 0; + + for(int i = 0; i < batch.size; ++i) { + if (!batchKeys[i].equals(streamingKey)) { + // We've encountered a new key, must save current one + // We can't forward yet, the aggregators have not been evaluated + rowsToFlush[flushMark] = currentStreamingAggregators; + if (keysToFlush[flushMark] == null) { + keysToFlush[flushMark] = (VectorHashKeyWrapper) streamingKey.copyKey(); + } + else { + streamingKey.duplicateTo(keysToFlush[flushMark]); + } + + currentStreamingAggregators = streamAggregationBufferRowPool.getFromPool(); + batchKeys[i].duplicateTo(streamingKey); + ++flushMark; + } + aggregationBatchInfo.mapAggregationBufferSet(currentStreamingAggregators, i); + } + + // evaluate the aggregators + processAggregators(batch); + + // Now flush/forward all keys/rows, except the last (current) one + for (int i = 0; i < flushMark; ++i) { + flushSingleRow(keysToFlush[i], rowsToFlush[i]); + rowsToFlush[i].reset(); + streamAggregationBufferRowPool.putInPool(rowsToFlush[i]); + } + } + + @Override + public void close(boolean aborted) throws HiveException { + if (!aborted && null != streamingKey) { + flushSingleRow(streamingKey, currentStreamingAggregators); + } + } + } + + private transient IProcessingMode processingMode; private static final long serialVersionUID = 1L; @@ -146,15 +570,10 @@ public VectorGroupByOperator() { @Override protected void initializeOp(Configuration hconf) throws HiveException { - - // hconf is null in unit testing - if (null != hconf) { - this.percentEntriesToFlush = HiveConf.getFloatVar(hconf, - HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT); - this.checkInterval = HiveConf.getIntVar(hconf, - HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL); - this.maxHtEntries = HiveConf.getIntVar(hconf, - HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES); + + if (hconf != null) { + // Initialize Super for things like this.hashAggr, this.minReductionHashAggr + super.initializeOp(hconf); } List objectInspectors = new ArrayList(); @@ -178,7 +597,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); aggregationBatchInfo = new VectorAggregationBufferBatch(); aggregationBatchInfo.compileAggregationBatchInfo(aggregators); - mapKeysAggregationBuffers = new HashMap(); List outputFieldNames = conf.getOutputColumnNames(); outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector( @@ -190,260 +608,69 @@ protected void initializeOp(Configuration hconf) throws HiveException { throw new HiveException(e); } - computeMemoryLimits(); - initializeChildren(hconf); - } - - /** - * Computes the memory limits for hash table flush (spill). - */ - private void computeMemoryLimits() { - JavaDataModel model = JavaDataModel.get(); - - fixedHashEntrySize = - model.hashMapEntry() + - keyWrappersBatch.getKeysFixedSize() + - aggregationBatchInfo.getAggregatorsFixedSize(); - - MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); - maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); - memoryThreshold = conf.getMemoryThreshold(); - // Tests may leave this unitialized, so better set it to 1 - if (memoryThreshold == 0.0f) { - memoryThreshold = 1.0f; - } - maxHashTblMemory = (int)(maxMemory * memoryThreshold); + forwardCache =new Object[keyExpressions.length + aggregators.length]; - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)", - maxHashTblMemory/1024/1024, - maxMemory/1024/1024, - memoryThreshold, - fixedHashEntrySize, - keyWrappersBatch.getKeysFixedSize(), - aggregationBatchInfo.getAggregatorsFixedSize())); + if (keyExpressions.length == 0) { + processingMode = this.new ProcessingModeGlobalAggregate(); } + else { + //TODO: consider if parent can offer order guarantees + // If input is sorted, is more efficient to use the streaming mode + processingMode = this.new ProcessingModeHashAggregate(); + } + processingMode.initialize(hconf); + } + /** + * changes the processing mode to streaming + * This is done at the request of the hash agg mode, if the number of keys + * exceeds the minReductionHashAggr factor + * @throws HiveException + */ + private void changeToStreamingMode() throws HiveException { + processingMode = this.new ProcessingModeStreaming(); + processingMode.initialize(null); + LOG.trace("switched to streaming mode"); } @Override public void processOp(Object row, int tag) throws HiveException { VectorizedRowBatch batch = (VectorizedRowBatch) row; - // First we traverse the batch to evaluate and prepare the KeyWrappers - // After this the KeyWrappers are properly set and hash code is computed - keyWrappersBatch.evaluateBatch(batch); - - // Next we locate the aggregation buffer set for each key - prepareBatchAggregationBufferSets(batch); - - // Finally, evaluate the aggregators - processAggregators(batch); - - //Flush if memory limits were reached - // We keep flushing until the memory is under threshold - int preFlushEntriesCount = numEntriesHashTable; - while (shouldFlush(batch)) { - flush(false); - - //Validate that some progress is being made - if (!(numEntriesHashTable < preFlushEntriesCount)) { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Flush did not progress: %d entries before, %d entries after", - preFlushEntriesCount, - numEntriesHashTable)); - } - break; - } - preFlushEntriesCount = numEntriesHashTable; + if (batch.size > 0) { + processingMode.processBatch(batch); } - - if (sumBatchSize == 0 && 0 != batch.size) { - // Sample the first batch processed for variable sizes. - updateAvgVariableSize(batch); - } - - sumBatchSize += batch.size; } /** - * Flushes the entries in the hash table by emiting output (forward). - * When parameter 'all' is true all the entries are flushed. - * @param all + * Emits a single row, made from the key and the row aggregation buffers values + * kw is null if keyExpressions.length is 0 + * @param kw + * @param agg * @throws HiveException */ - private void flush(boolean all) throws HiveException { - - int entriesToFlush = all ? numEntriesHashTable : - (int)(numEntriesHashTable * this.percentEntriesToFlush); - int entriesFlushed = 0; - - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb)", - entriesToFlush, all ? "(all)" : "", - numEntriesHashTable, fixedHashEntrySize, avgVariableSize, - numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024, - maxHashTblMemory/1024/1024)); - } - - Object[] forwardCache = new Object[keyExpressions.length + aggregators.length]; - if (keyExpressions.length == 0 && mapKeysAggregationBuffers.isEmpty()) { - // if this is a global aggregation (no keys) and empty set, must still emit NULLs - VectorAggregationBufferRow emptyBuffers = allocateAggregationBuffer(); - for (int i = 0; i < aggregators.length; ++i) { - forwardCache[i] = aggregators[i].evaluateOutput(emptyBuffers.getAggregationBuffer(i)); - } - forward(forwardCache, outputObjInspector); - } else { - /* Iterate the global (keywrapper,aggregationbuffers) map and emit - a row for each key */ - Iterator> iter = - mapKeysAggregationBuffers.entrySet().iterator(); - while(iter.hasNext()) { - Map.Entry pair = iter.next(); - int fi = 0; - for (int i = 0; i < keyExpressions.length; ++i) { - VectorHashKeyWrapper kw = (VectorHashKeyWrapper)pair.getKey(); - forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( - kw, i, keyOutputWriters[i]); - } - for (int i = 0; i < aggregators.length; ++i) { - forwardCache[fi++] = aggregators[i].evaluateOutput(pair.getValue() - .getAggregationBuffer(i)); - } - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("forwarding keys: %s: %s", - pair.getKey().toString(), Arrays.toString(forwardCache))); - } - forward(forwardCache, outputObjInspector); - - if (!all) { - iter.remove(); - --numEntriesHashTable; - if (++entriesFlushed >= entriesToFlush) { - break; - } - } - } - } - - if (all) { - mapKeysAggregationBuffers.clear(); - numEntriesHashTable = 0; - } - } - - /** - * Returns true if the memory threshold for the hash table was reached. - */ - private boolean shouldFlush(VectorizedRowBatch batch) { - if (batch.size == 0) { - return false; - } - //numEntriesSinceCheck is the number of entries added to the hash table - // since the last time we checked the average variable size - if (numEntriesSinceCheck >= this.checkInterval) { - // Were going to update the average variable row size by sampling the current batch - updateAvgVariableSize(batch); - numEntriesSinceCheck = 0; - } - return numEntriesHashTable > this.maxHtEntries || - numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory; - } - - /** - * Updates the average variable size of the hash table entries. - * The average is only updates by probing the batch that added the entry in the hash table - * that caused the check threshold to be reached. - */ - private void updateAvgVariableSize(VectorizedRowBatch batch) { - int keyVariableSize = keyWrappersBatch.getVariableSize(batch.size); - int aggVariableSize = aggregationBatchInfo.getVariableSize(batch.size); - - // This assumes the distribution of variable size keys/aggregates in the input - // is the same as the distribution of variable sizes in the hash entries - avgVariableSize = (int)((avgVariableSize * sumBatchSize + keyVariableSize +aggVariableSize) / - (sumBatchSize + batch.size)); - } - - /** - * Evaluates the aggregators on the current batch. - * The aggregationBatchInfo must have been prepared - * by calling {@link #prepareBatchAggregationBufferSets} first. - */ - private void processAggregators(VectorizedRowBatch batch) throws HiveException { - // We now have a vector of aggregation buffer sets to use for each row - // We can start computing the aggregates. - // If the number of distinct keys in the batch is 1 we can - // use the optimized code path of aggregateInput - VectorAggregationBufferRow[] aggregationBufferSets = - aggregationBatchInfo.getAggregationBuffers(); - if (aggregationBatchInfo.getDistinctBufferSetCount() == 1) { - VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = - aggregationBufferSets[0].getAggregationBuffers(); - for (int i = 0; i < aggregators.length; ++i) { - aggregators[i].aggregateInput(aggregationBuffers[i], batch); - } - } else { - for (int i = 0; i < aggregators.length; ++i) { - aggregators[i].aggregateInputSelection( - aggregationBufferSets, - i, - batch); - } + private void flushSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg) + throws HiveException { + int fi = 0; + for (int i = 0; i < keyExpressions.length; ++i) { + forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( + kw, i, keyOutputWriters[i]); } - } - - /** - * Locates the aggregation buffer sets to use for each key in the current batch. - * The keyWrappersBatch must have evaluated the current batch first. - */ - private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws HiveException { - // The aggregation batch vector needs to know when we start a new batch - // to bump its internal version. - aggregationBatchInfo.startBatch(); - - // We now have to probe the global hash and find-or-allocate - // the aggregation buffers to use for each key present in the batch - VectorHashKeyWrapper[] keyWrappers = keyWrappersBatch.getVectorHashKeyWrappers(); - for (int i=0; i < batch.size; ++i) { - VectorHashKeyWrapper kw = keyWrappers[i]; - VectorAggregationBufferRow aggregationBuffer = mapKeysAggregationBuffers.get(kw); - if (null == aggregationBuffer) { - // the probe failed, we must allocate a set of aggregation buffers - // and push the (keywrapper,buffers) pair into the hash. - // is very important to clone the keywrapper, the one we have from our - // keyWrappersBatch is going to be reset/reused on next batch. - aggregationBuffer = allocateAggregationBuffer(); - mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer); - numEntriesHashTable++; - numEntriesSinceCheck++; - } - aggregationBatchInfo.mapAggregationBufferSet(aggregationBuffer, i); + for (int i = 0; i < aggregators.length; ++i) { + forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)); } - } - - /** - * allocates a new aggregation buffer set. - */ - private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException { - VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = - new VectorAggregateExpression.AggregationBuffer[aggregators.length]; - for (int i=0; i < aggregators.length; ++i) { - aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer(); - aggregators[i].reset(aggregationBuffers[i]); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("forwarding keys: %s: %s", + kw, Arrays.toString(forwardCache))); } - VectorAggregationBufferRow bufferSet = new VectorAggregationBufferRow(aggregationBuffers); - return bufferSet; + forward(forwardCache, outputObjInspector); } @Override public void closeOp(boolean aborted) throws HiveException { - if (!aborted) { - flush(true); - } + processingMode.close(aborted); } static public String getOperatorName() { @@ -467,4 +694,3 @@ public void setAggregators(VectorAggregateExpression[] aggregators) { } } - diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java index a2a7266..2229079 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java @@ -143,6 +143,11 @@ private boolean bytesEquals(VectorHashKeyWrapper keyThat) { @Override protected Object clone() { VectorHashKeyWrapper clone = new VectorHashKeyWrapper(); + duplicateTo(clone); + return clone; + } + + public void duplicateTo(VectorHashKeyWrapper clone) { clone.longValues = longValues.clone(); clone.doubleValues = doubleValues.clone(); clone.isNull = isNull.clone(); @@ -167,7 +172,6 @@ protected Object clone() { } clone.hashcode = hashcode; assert clone.equals(this); - return clone; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java index bd6c24b..4ebf924 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java @@ -617,9 +617,8 @@ public static VectorHashKeyWrapperBatch compileKeyWrapperBatch(VectorExpression[ compiledKeyWrapperBatch.vectorHashKeyWrappers = new VectorHashKeyWrapper[VectorizedRowBatch.DEFAULT_SIZE]; for(int i=0;i { + private final T[] buffer; + + /** + * Head of the pool. This is where where we should insert the next + * object returned to the pool + */ + private int head = 0; + + /** + * Count of available elements. They are behind the head, with wrap-around + * The head itself is not free, is null + */ + private int count = 0; + + private IAllocator allocator; + + public static interface IAllocator { + public T alloc() throws HiveException; + public void free(T t); + } + + @SuppressWarnings("unchecked") + public VectorUtilBatchObjectPool(int size, IAllocator allocator) { + buffer = (T[]) new Object[size]; + this.allocator = allocator; + } + + public T getFromPool() throws HiveException { + T ret = null; + if (count == 0) { + // Pool is exhausted, return a new object + ret = allocator.alloc(); + } + else { + int tail = (head + buffer.length - count) % buffer.length; + ret = buffer[tail]; + buffer[tail] = null; + --count; + } + + return ret; + } + + public void putInPool(T object) { + if (count < buffer.length) { + buffer[head] = object; + ++count; + ++head; + if (head == buffer.length) { + head = 0; + } + } + else { + allocator.free(object); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java index 1836169..96e62cf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java @@ -38,6 +38,8 @@ */ public static interface AggregationBuffer extends Serializable { int getVariableSize(); + + void reset(); }; public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java index 8418587..e263e48 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java @@ -82,6 +82,13 @@ public void sumValueNoCheck(Decimal128 value, short scale) { public int getVariableSize() { throw new UnsupportedOperationException(); } + + @Override + public void reset() { + isNull = true; + sum.zeroClear(); + count = 0L; + } } private VectorExpression inputExpression; @@ -462,7 +469,7 @@ public AggregationBuffer getNewAggregationBuffer() throws HiveException { @Override public void reset(AggregationBuffer agg) throws HiveException { Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; + myAgg.reset(); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java index 086f91f..5aa4d4c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java @@ -60,6 +60,12 @@ public void initIfNull() { public int getVariableSize() { throw new UnsupportedOperationException(); } + + @Override + public void reset() { + isNull = true; + value = 0L; + } } private VectorExpression inputExpression = null; @@ -240,7 +246,7 @@ public AggregationBuffer getNewAggregationBuffer() throws HiveException { @Override public void reset(AggregationBuffer agg) throws HiveException { Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; + myAgg.reset(); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java index 4926f6c..ad45da5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java @@ -51,6 +51,12 @@ public int getVariableSize() { throw new UnsupportedOperationException(); } + + @Override + public void reset() { + isNull = true; + value = 0L; + } } transient private final LongWritable result; @@ -117,7 +123,7 @@ public AggregationBuffer getNewAggregationBuffer() throws HiveException { @Override public void reset(AggregationBuffer agg) throws HiveException { Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; + myAgg.reset(); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java index 0089ad3..341b695 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java @@ -64,6 +64,12 @@ public void sumValue(Decimal128 value, short scale) { public int getVariableSize() { throw new UnsupportedOperationException(); } + + @Override + public void reset() { + isNull = true; + sum.zeroClear(); + } } private VectorExpression inputExpression; @@ -411,7 +417,7 @@ public AggregationBuffer getNewAggregationBuffer() throws HiveException { @Override public void reset(AggregationBuffer agg) throws HiveException { Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; + myAgg.reset(); } @Override