diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2d74387..3ae3dae 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -841,6 +841,10 @@ //Vectorization enabled HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false), + HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL("hive.vectorized.groupby.checkinterval", 100000), + HIVE_VECTORIZATION_GROUPBY_MAXENTRIES("hive.vectorized.groupby.maxentries", 1000000), + HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT("hive.vectorized.groupby.flush.percent", (float) 0.1), + HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true), diff --git conf/hive-default.xml.template conf/hive-default.xml.template index c574ab5..193babb 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -2056,6 +2056,24 @@ + hive.vectorized.groupby.maxentries + 1000000 + Max number of entries in the vector group by aggregation hashtables. Exceeding this will trigger a flush irrelevant of memory pressure condition. + + + + hive.vectorized.groupby.checkinterval + 100000 + Number of entries added to the group by aggregation hash before a reocmputation of average entry size is performed. + + + + hive.vectorized.groupby.flush.percent + 0.1 + Percent of entries in the group by aggregation hash flushed when the memory treshold is exceeded. + + + hive.compute.query.using.stats false @@ -2065,7 +2083,6 @@ - hive.metastore.schema.verification false diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 735122e..4568496 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.KeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; @@ -100,16 +101,22 @@ * Sum of batch size processed (ie. rows). */ private transient long sumBatchSize; + + /** + * Max number of entries in the vector group by aggregation hashtables. + * Exceeding this will trigger a flush irrelevant of memory pressure condition. + */ + private transient int maxHtEntries = 1000000; /** * The number of new entries that must be added to the hashtable before a memory size check. */ - private static final int FLUSH_CHECK_THRESHOLD = 10000; + private transient int checkInterval = 10000; /** * Percent of entries to flush when memory threshold exceeded. */ - private static final float PERCENT_ENTRIES_TO_FLUSH = 0.1f; + private transient float percentEntriesToFlush = 0.1f; /** * The global key-aggregation hash map. @@ -139,6 +146,16 @@ public VectorGroupByOperator() { @Override protected void initializeOp(Configuration hconf) throws HiveException { + + // hconf is null in unit testing + if (null != hconf) { + this.percentEntriesToFlush = HiveConf.getFloatVar(hconf, + HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT); + this.checkInterval = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL); + this.maxHtEntries = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES); + } List objectInspectors = new ArrayList(); @@ -226,8 +243,21 @@ public void processOp(Object row, int tag) throws HiveException { processAggregators(batch); //Flush if memory limits were reached - if (shouldFlush(batch)) { + // We keep flushing until the memory is under threshold + int preFlushEntriesCount = numEntriesHashTable; + while (shouldFlush(batch)) { flush(false); + + //Validate that some progress is being made + if (!(numEntriesHashTable < preFlushEntriesCount)) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Flush did not progress: %d entries before, %d entries after", + preFlushEntriesCount, + numEntriesHashTable)); + } + break; + } + preFlushEntriesCount = numEntriesHashTable; } if (sumBatchSize == 0 && 0 != batch.size) { @@ -247,7 +277,7 @@ public void processOp(Object row, int tag) throws HiveException { private void flush(boolean all) throws HiveException { int entriesToFlush = all ? numEntriesHashTable : - (int)(numEntriesHashTable * PERCENT_ENTRIES_TO_FLUSH); + (int)(numEntriesHashTable * this.percentEntriesToFlush); int entriesFlushed = 0; if (LOG.isDebugEnabled()) { @@ -309,14 +339,18 @@ private void flush(boolean all) throws HiveException { * Returns true if the memory threshold for the hash table was reached. */ private boolean shouldFlush(VectorizedRowBatch batch) { - if (numEntriesSinceCheck < FLUSH_CHECK_THRESHOLD || - batch.size == 0) { + if (batch.size == 0) { return false; } - // Were going to update the average variable row size by sampling the current batch - updateAvgVariableSize(batch); - numEntriesSinceCheck = 0; - return numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory; + //numEntriesSinceCheck is the number of entries added to the hash table + // since the last time we checked the average variable size + if (numEntriesSinceCheck >= this.checkInterval) { + // Were going to update the average variable row size by sampling the current batch + updateAvgVariableSize(batch); + numEntriesSinceCheck = 0; + } + return numEntriesHashTable > this.maxHtEntries || + numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory; } /** diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java index d1d2ea9..aaed36d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java @@ -23,12 +23,15 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; import java.lang.reflect.Constructor; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -155,6 +158,88 @@ private static GroupByDesc buildKeyGroupByDesc( return desc; } + + long outputRowCount = 0; + + @Test + public void testMemoryPressureFlush() throws HiveException { + + Map mapColumnNames = new HashMap(); + mapColumnNames.put("Key", 0); + mapColumnNames.put("Value", 1); + VectorizationContext ctx = new VectorizationContext(mapColumnNames, 2); + + GroupByDesc desc = buildKeyGroupByDesc (ctx, "max", + "Value", TypeInfoFactory.longTypeInfo, + "Key", TypeInfoFactory.longTypeInfo); + + // Set the memory treshold so that we get 100Kb before we need to flush. + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + long maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + + float treshold = 100.0f*1024.0f/maxMemory; + desc.setMemoryThreshold(treshold); + + VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc); + + FakeCaptureOutputOperator out = FakeCaptureOutputOperator.addCaptureOutputChild(vgo); + vgo.initialize(null, null); + + this.outputRowCount = 0; + out.setOutputInspector(new FakeCaptureOutputOperator.OutputInspector() { + @Override + public void inspectRow(Object row, int tag) throws HiveException { + ++outputRowCount; + } + }); + + Iterable it = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator () { + long value = 0; + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Object next() { + return ++value; + } + + @Override + public void remove() { + } + }; + } + }; + + FakeVectorRowBatchFromObjectIterables data = new FakeVectorRowBatchFromObjectIterables( + 100, + new String[] {"long", "long"}, + it, + it); + + // The 'it' data source will produce data w/o ever ending + // We want to see that memory pressure kicks in and some + // entries in the VGBY are flushed. + long countRowsProduced = 0; + for (VectorizedRowBatch unit: data) { + countRowsProduced += 100; + vgo.process(unit, 0); + if (0 < outputRowCount) { + break; + } + // Set an upper bound how much we're willing to push before it should flush + // we've set the memory treshold at 100kb, each key is distinct + // It should not go beyond 100k/16 (key+data) + assertTrue(countRowsProduced < 100*1024/16); + } + + assertTrue(0 < outputRowCount); + } @Test public void testMultiKeyIntStringInt() throws HiveException {