Index: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (revision 744783) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (working copy) @@ -49,10 +49,11 @@ */ public class GroupByOperator extends Operator implements Serializable { - static final private Log LOG = LogFactory.getLog(JoinOperator.class.getName()); + static final private Log LOG = LogFactory.getLog(GroupByOperator.class.getName()); private static final long serialVersionUID = 1L; private static final int NUMROWSESTIMATESIZE = 1000; + private static final int NUMROWSCHECKHASHAGGR = 100000; transient protected ExprNodeEvaluator[] keyFields; transient protected ExprNodeEvaluator[][] aggregationParameterFields; @@ -78,6 +79,8 @@ transient boolean firstRow; transient long totalMemory; transient boolean hashAggr; + transient long numRowsInput; + transient long numRowsHashTbl; /** * This is used to store the position and field names for variable length fields. @@ -114,6 +117,8 @@ public void initialize(Configuration hconf, Reporter reporter) throws HiveException { super.initialize(hconf, reporter); totalMemory = Runtime.getRuntime().totalMemory(); + numRowsInput = 0; + numRowsHashTbl = 0; // init keyFields keyFields = new ExprNodeEvaluator[conf.getKeys().size()]; @@ -186,6 +191,7 @@ hashAggr = false; } else { hashAggregations = new HashMap, UDAFEvaluator[]>(); + aggregations = newAggregations(); hashAggr = true; keyPositionsSize = new ArrayList(); aggrPositions = new ArrayList(); @@ -393,7 +399,22 @@ } public void process(Object row, ObjectInspector rowInspector) throws HiveException { - + // Total number of input rows is needed for hash aggregation only + if (hashAggr) { + numRowsInput++; + // if hash aggregation is not behvaing properly, disable it + if ((numRowsInput % NUMROWSCHECKHASHAGGR) == 0) { + // map-side aggregation should reduce the entries by atleast half + if ((numRowsHashTbl * 2) > numRowsInput) { + LOG.trace("Disable Hash Aggr: #hash table = " + numRowsHashTbl + " #total = " + numRowsInput); + flush(true); + hashAggr = false; + } + else + LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl + " #total = " + numRowsInput); + } + } + try { // Compute the keys ArrayList newKeys = new ArrayList(keyFields.length); @@ -436,6 +457,7 @@ aggs = newAggregations(); hashAggregations.put(newKeys, aggs); newEntry = true; + numRowsHashTbl++; // new entry in the hash table } // Update the aggs @@ -443,7 +465,7 @@ // based on used-specified pramaters, check if the hash table needs to be flushed if (shouldBeFlushed(newKeys)) { - flush(); + flush(false); } } @@ -522,12 +544,24 @@ return false; } - private void flush() throws HiveException { + private void flush(boolean complete) throws HiveException { // Currently, the algorithm flushes 10% of the entries - this can be // changed in the future + if (complete) { + Iterator iter = hashAggregations.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry, UDAFEvaluator[]> m = (Map.Entry)iter.next(); + forward(m.getKey(), m.getValue()); + } + hashAggregations.clear(); + hashAggregations = null; + return; + } + int oldSize = hashAggregations.size(); + LOG.trace("Hash Tbl flush: #hash table = " + oldSize); Iterator iter = hashAggregations.entrySet().iterator(); int numDel = 0; while (iter.hasNext()) { @@ -571,16 +605,17 @@ public void close(boolean abort) throws HiveException { if (!abort) { try { - if (aggregations != null) { + if (hashAggregations != null) { + // hash-based aggregations + for (ArrayList key: hashAggregations.keySet()) { + forward(key, hashAggregations.get(key)); + } + } + else if (aggregations != null) { // sort-based aggregations if (currentKeys != null) { forward(currentKeys, aggregations); } - } else if (hashAggregations != null) { - // hash-based aggregations - for (ArrayList key: hashAggregations.keySet()) { - forward(key, hashAggregations.get(key)); - } } else { // The GroupByOperator is not initialized, which means there is no data // (since we initialize the operators when we see the first record).