diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 4568496..7fb007e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -20,6 +20,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; +import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -117,6 +118,9 @@ * Percent of entries to flush when memory threshold exceeded. */ private transient float percentEntriesToFlush = 0.1f; + + private transient SoftReference gcCanary = new SoftReference(new Object()); + private transient long gcCanaryFlushes = 0L; /** * The global key-aggregation hash map. @@ -248,6 +252,10 @@ public void processOp(Object row, int tag) throws HiveException { while (shouldFlush(batch)) { flush(false); + if(gcCanary.get() == null) { + gcCanaryFlushes++; + gcCanary = new SoftReference(new Object()); + } //Validate that some progress is being made if (!(numEntriesHashTable < preFlushEntriesCount)) { if (LOG.isDebugEnabled()) { @@ -281,11 +289,11 @@ private void flush(boolean all) throws HiveException { int entriesFlushed = 0; if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb)", + LOG.debug(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb gcCanary: %s)", entriesToFlush, all ? "(all)" : "", numEntriesHashTable, fixedHashEntrySize, avgVariableSize, numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024, - maxHashTblMemory/1024/1024)); + maxHashTblMemory/1024/1024, gcCanary.get() == null ? "dead" : "alive")); } Object[] forwardCache = new Object[keyExpressions.length + aggregators.length]; @@ -349,8 +357,12 @@ private boolean shouldFlush(VectorizedRowBatch batch) { updateAvgVariableSize(batch); numEntriesSinceCheck = 0; } - return numEntriesHashTable > this.maxHtEntries || - numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory; + if(numEntriesHashTable > this.maxHtEntries || + numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory) { + return true; + } else { + return (gcCanary.get() == null); + } } /** @@ -441,6 +453,9 @@ private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveExcept @Override public void closeOp(boolean aborted) throws HiveException { + if(LOG.isDebugEnabled()) { + LOG.debug(String.format("GC canary caused %d flushes", gcCanaryFlushes)); + } if (!aborted) { flush(true); }