diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index e47a6f93ab..255d12951b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -148,6 +148,7 @@ private transient boolean[][] allGroupingSetsOverrideIsNulls; private transient int numEntriesHashTable; + private transient long numFlushedOutEntriesBeforeFinalFlush; private transient long maxHashTblMemory; @@ -421,8 +422,12 @@ public void initialize(Configuration hconf) throws HiveException { mapKeysAggregationBuffers = new HashMap(); if (groupingSets != null && groupingSets.length > 0) { - this.maxHtEntries = this.maxHtEntries / groupingSets.length; - LOG.info("New maxHtEntries: {}, groupingSets len: {}", maxHtEntries, groupingSets.length); + // Adjust for grouping sets (mem intensive). Bail out early if they are not effective. + this.numRowsCompareHashAggr = this.numRowsCompareHashAggr / groupingSets.length; + this.minReductionHashAggr = this.minReductionHashAggr / groupingSets.length; + LOG.info("New maxHtEntries: {}, groupingSets len: {}, numRowsCompareHashAggr: {}, " + + "minReductionHashAggr:{} ", maxHtEntries, groupingSets.length, + numRowsCompareHashAggr, minReductionHashAggr); } computeMemoryLimits(); LOG.debug("using hash aggregation processing mode"); @@ -691,10 +696,15 @@ private void flush(boolean all) throws HiveException { } } + if (!all) { + numFlushedOutEntriesBeforeFinalFlush += entriesFlushed; + } + if (all) { mapKeysAggregationBuffers.clear(); totalAccessCount = 0; numEntriesHashTable = 0; + numFlushedOutEntriesBeforeFinalFlush = 0; } if (all && LOG.isDebugEnabled()) { @@ -772,11 +782,14 @@ private void checkHashModeEfficiency() throws HiveException { * instead of sending 1 from the hash. * */ - final int groupingExpansion = (groupingSets != null) ? groupingSets.length : 1; - final long intermediateKeyCount = sumBatchSize * groupingExpansion; - if (numEntriesHashTable > intermediateKeyCount * minReductionHashAggr) { - flush(true); - changeToStreamingMode(); + final long inputRecords = sumBatchSize; + final long outputRecords = numEntriesHashTable + numFlushedOutEntriesBeforeFinalFlush; + final float ratio = (outputRecords) / (inputRecords * 1.0f); + if (ratio > minReductionHashAggr) { + if (inputRecords > maxHtEntries) { // Don't bail out too soon. + flush(true); + changeToStreamingMode(); + } } } }