diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2d74387..3ae3dae 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -841,6 +841,10 @@
//Vectorization enabled
HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false),
+ HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL("hive.vectorized.groupby.checkinterval", 100000),
+ HIVE_VECTORIZATION_GROUPBY_MAXENTRIES("hive.vectorized.groupby.maxentries", 1000000),
+ HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT("hive.vectorized.groupby.flush.percent", (float) 0.1),
+
HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true),
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index c574ab5..193babb 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -2056,6 +2056,24 @@
+ hive.vectorized.groupby.maxentries
+ 1000000
+ Max number of entries in the vector group by aggregation hashtables. Exceeding this will trigger a flush irrelevant of memory pressure condition.
+
+
+
+ hive.vectorized.groupby.checkinterval
+ 100000
+ Number of entries added to the group by aggregation hash before a reocmputation of average entry size is performed.
+
+
+
+ hive.vectorized.groupby.flush.percent
+ 0.1
+ Percent of entries in the group by aggregation hash flushed when the memory treshold is exceeded.
+
+
+
hive.compute.query.using.stats
false
@@ -2065,7 +2083,6 @@
-
hive.metastore.schema.verification
false
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 735122e..4568496 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -30,6 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.KeyWrapper;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
@@ -100,16 +101,22 @@
* Sum of batch size processed (ie. rows).
*/
private transient long sumBatchSize;
+
+ /**
+ * Max number of entries in the vector group by aggregation hashtables.
+ * Exceeding this will trigger a flush irrelevant of memory pressure condition.
+ */
+ private transient int maxHtEntries = 1000000;
/**
* The number of new entries that must be added to the hashtable before a memory size check.
*/
- private static final int FLUSH_CHECK_THRESHOLD = 10000;
+ private transient int checkInterval = 10000;
/**
* Percent of entries to flush when memory threshold exceeded.
*/
- private static final float PERCENT_ENTRIES_TO_FLUSH = 0.1f;
+ private transient float percentEntriesToFlush = 0.1f;
/**
* The global key-aggregation hash map.
@@ -139,6 +146,16 @@ public VectorGroupByOperator() {
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
+
+ // hconf is null in unit testing
+ if (null != hconf) {
+ this.percentEntriesToFlush = HiveConf.getFloatVar(hconf,
+ HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT);
+ this.checkInterval = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL);
+ this.maxHtEntries = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES);
+ }
List objectInspectors = new ArrayList();
@@ -226,8 +243,21 @@ public void processOp(Object row, int tag) throws HiveException {
processAggregators(batch);
//Flush if memory limits were reached
- if (shouldFlush(batch)) {
+ // We keep flushing until the memory is under threshold
+ int preFlushEntriesCount = numEntriesHashTable;
+ while (shouldFlush(batch)) {
flush(false);
+
+ //Validate that some progress is being made
+ if (!(numEntriesHashTable < preFlushEntriesCount)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Flush did not progress: %d entries before, %d entries after",
+ preFlushEntriesCount,
+ numEntriesHashTable));
+ }
+ break;
+ }
+ preFlushEntriesCount = numEntriesHashTable;
}
if (sumBatchSize == 0 && 0 != batch.size) {
@@ -247,7 +277,7 @@ public void processOp(Object row, int tag) throws HiveException {
private void flush(boolean all) throws HiveException {
int entriesToFlush = all ? numEntriesHashTable :
- (int)(numEntriesHashTable * PERCENT_ENTRIES_TO_FLUSH);
+ (int)(numEntriesHashTable * this.percentEntriesToFlush);
int entriesFlushed = 0;
if (LOG.isDebugEnabled()) {
@@ -309,14 +339,18 @@ private void flush(boolean all) throws HiveException {
* Returns true if the memory threshold for the hash table was reached.
*/
private boolean shouldFlush(VectorizedRowBatch batch) {
- if (numEntriesSinceCheck < FLUSH_CHECK_THRESHOLD ||
- batch.size == 0) {
+ if (batch.size == 0) {
return false;
}
- // Were going to update the average variable row size by sampling the current batch
- updateAvgVariableSize(batch);
- numEntriesSinceCheck = 0;
- return numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory;
+ //numEntriesSinceCheck is the number of entries added to the hash table
+ // since the last time we checked the average variable size
+ if (numEntriesSinceCheck >= this.checkInterval) {
+ // Were going to update the average variable row size by sampling the current batch
+ updateAvgVariableSize(batch);
+ numEntriesSinceCheck = 0;
+ }
+ return numEntriesHashTable > this.maxHtEntries ||
+ numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory;
}
/**
diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
index d1d2ea9..aaed36d 100644
--- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
@@ -23,12 +23,15 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
import java.lang.reflect.Constructor;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -155,6 +158,88 @@ private static GroupByDesc buildKeyGroupByDesc(
return desc;
}
+
+ long outputRowCount = 0;
+
+ @Test
+ public void testMemoryPressureFlush() throws HiveException {
+
+ Map mapColumnNames = new HashMap();
+ mapColumnNames.put("Key", 0);
+ mapColumnNames.put("Value", 1);
+ VectorizationContext ctx = new VectorizationContext(mapColumnNames, 2);
+
+ GroupByDesc desc = buildKeyGroupByDesc (ctx, "max",
+ "Value", TypeInfoFactory.longTypeInfo,
+ "Key", TypeInfoFactory.longTypeInfo);
+
+ // Set the memory treshold so that we get 100Kb before we need to flush.
+ MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+ long maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
+
+ float treshold = 100.0f*1024.0f/maxMemory;
+ desc.setMemoryThreshold(treshold);
+
+ VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc);
+
+ FakeCaptureOutputOperator out = FakeCaptureOutputOperator.addCaptureOutputChild(vgo);
+ vgo.initialize(null, null);
+
+ this.outputRowCount = 0;
+ out.setOutputInspector(new FakeCaptureOutputOperator.OutputInspector() {
+ @Override
+ public void inspectRow(Object row, int tag) throws HiveException {
+ ++outputRowCount;
+ }
+ });
+
+ Iterable