diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 735122e..49e4170 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.KeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; @@ -104,12 +105,12 @@ /** * The number of new entries that must be added to the hashtable before a memory size check. */ - private static final int FLUSH_CHECK_THRESHOLD = 10000; + private transient int checkInterval = 10000; /** * Percent of entries to flush when memory threshold exceeded. */ - private static final float PERCENT_ENTRIES_TO_FLUSH = 0.1f; + private transient float percentEntriesToFlush = 0.1f; /** * The global key-aggregation hash map. @@ -139,6 +140,14 @@ public VectorGroupByOperator() { @Override protected void initializeOp(Configuration hconf) throws HiveException { + + // hconf is null in unit testing + if (null != hconf) { + this.percentEntriesToFlush = HiveConf.getFloatVar(hconf, + HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); + this.checkInterval = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL); + } List objectInspectors = new ArrayList(); @@ -226,8 +235,21 @@ public void processOp(Object row, int tag) throws HiveException { processAggregators(batch); //Flush if memory limits were reached - if (shouldFlush(batch)) { + // We keep flushing until the memory is under threshold + int preFlushEntriesCount = numEntriesHashTable; + while (shouldFlush(batch)) { flush(false); + + //Validate that some progress is being made + if (!(numEntriesHashTable < preFlushEntriesCount)) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Flush did not progress: %d entries before, %d entries after", + preFlushEntriesCount, + numEntriesHashTable)); + } + break; + } + preFlushEntriesCount = numEntriesHashTable; } if (sumBatchSize == 0 && 0 != batch.size) { @@ -247,7 +269,7 @@ public void processOp(Object row, int tag) throws HiveException { private void flush(boolean all) throws HiveException { int entriesToFlush = all ? numEntriesHashTable : - (int)(numEntriesHashTable * PERCENT_ENTRIES_TO_FLUSH); + (int)(numEntriesHashTable * this.percentEntriesToFlush); int entriesFlushed = 0; if (LOG.isDebugEnabled()) { @@ -309,13 +331,16 @@ private void flush(boolean all) throws HiveException { * Returns true if the memory threshold for the hash table was reached. */ private boolean shouldFlush(VectorizedRowBatch batch) { - if (numEntriesSinceCheck < FLUSH_CHECK_THRESHOLD || - batch.size == 0) { + if (batch.size == 0) { return false; } - // Were going to update the average variable row size by sampling the current batch - updateAvgVariableSize(batch); - numEntriesSinceCheck = 0; + //numEntriesSinceCheck is the number of entries added to the hash table + // since the last time we checked the average variable size + if (numEntriesSinceCheck >= this.checkInterval) { + // Were going to update the average variable row size by sampling the current batch + updateAvgVariableSize(batch); + numEntriesSinceCheck = 0; + } return numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory; }