Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 725928) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -87,6 +87,8 @@ HIVEALIAS("hive.alias", ""), HIVEMAPSIDEAGGREGATE("hive.map.aggr", "false"), HIVEJOINEMITINTERVAL("hive.join.emit.interval", 1000), + HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hashmemory", Runtime.getRuntime().maxMemory()), + HIVEMAPAGGRHASHNUMROWS("hive.map.aggr.numrows", 100000), // Default file format for CREATE TABLE statement // Options: TextFile, SequenceFile @@ -95,6 +97,7 @@ public final String varname; public final String defaultVal; public final int defaultIntVal; + public final long defaultLongVal; public final Class valClass; public final boolean defaultBoolVal; @@ -103,6 +106,7 @@ this.defaultVal = defaultVal; this.valClass = String.class; this.defaultIntVal = -1; + this.defaultLongVal = -1; this.defaultBoolVal = false; } @@ -110,14 +114,25 @@ this.varname = varname; this.defaultVal = null; this.defaultIntVal = defaultIntVal; + this.defaultLongVal = -1; this.valClass = Integer.class; this.defaultBoolVal = false; } + ConfVars(String varname, long defaultLongVal) { + this.varname = varname; + this.defaultVal = null; + this.defaultIntVal = -1; + this.defaultLongVal = defaultLongVal; + this.valClass = Long.class; + this.defaultBoolVal = false; + } + ConfVars(String varname, boolean defaultBoolVal) { this.varname = varname; this.defaultVal = null; this.defaultIntVal = -1; + this.defaultLongVal = -1; this.valClass = Boolean.class; this.defaultBoolVal = defaultBoolVal; } @@ -136,6 +151,15 @@ return getIntVar(this, var); } + public static long getLongVar(Configuration conf, ConfVars var) { + assert(var.valClass == Long.class); + return conf.getLong(var.varname, var.defaultLongVal); + } + + public long getLongVar(ConfVars var) { + return getLongVar(this, var); + } + public static boolean getBoolVar(Configuration conf, ConfVars var) { assert(var.valClass == Boolean.class); return conf.getBoolean(var.varname, var.defaultBoolVal); Index: ql/src/test/results/clientpositive/groupby4_map.q.out =================================================================== --- ql/src/test/results/clientpositive/groupby4_map.q.out (revision 725928) +++ ql/src/test/results/clientpositive/groupby4_map.q.out (working copy) @@ -24,6 +24,7 @@ value expressions: expr: 0 type: bigint + # Reducers: 1 Reduce Operator Tree: Group By Operator aggregations: Index: ql/src/test/results/clientpositive/groupby5_map.q.out =================================================================== --- ql/src/test/results/clientpositive/groupby5_map.q.out (revision 725928) +++ ql/src/test/results/clientpositive/groupby5_map.q.out (working copy) @@ -27,6 +27,7 @@ value expressions: expr: 0 type: double + # Reducers: 1 Reduce Operator Tree: Group By Operator aggregations: Index: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (revision 725928) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (working copy) @@ -37,6 +37,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.typeinfo.TypeInfo; +import org.apache.hadoop.hive.ql.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.conf.HiveConf; /** * GroupBy operator implementation. @@ -44,6 +47,10 @@ public class GroupByOperator extends Operator implements Serializable { private static final long serialVersionUID = 1L; + + // number of entries that can be accomodated in the hash table - only applicable in case of map-side aggregation + transient protected int maxEntriesHashAggr; + transient protected ExprNodeEvaluator[] keyFields; transient protected ExprNodeEvaluator[][] aggregationParameterFields; // In the future, we may allow both count(DISTINCT a) and sum(DISTINCT a) in the same SQL clause, @@ -162,8 +169,74 @@ } firstRow = true; + // estimate the number of hash table entries based on the size of each entry. Since the size of a entry + // is not known, estimate that based on the number of entries + if (conf.getMode() == groupByDesc.Mode.HASH) + maxEntriesHashAggr = computeMaxEntriesHashAggr(hconf); } + /** + * Estimate the number of entries in map-side hash table. + * The user can specify the total amount of memory to be used by the map-side hash. By default, all available + * memory is used. The size of each row is estimated, rather crudely, and the number of entries are figure out + * based on that. + * @return number of entries that can fit in hash table - useful for map-side aggregation only + **/ + private int computeMaxEntriesHashAggr(Configuration hconf) { + long maxMemory = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); + int rowSize = estimateRowSize(); + int maxRows = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPAGGRHASHNUMROWS); + int maxCompRows = (int)(maxMemory/rowSize); + return (maxRows > maxCompRows) ? maxRows : maxCompRows; + } + + private static final int javaHashEntryOverHead = 64; + private static final int javaSizePrimitiveType = 16; + private static final int javaSizeUnknownType = 256; + + /** + * @return the size of this datatype + **/ + private int getSize(Class c) { + if (c.isPrimitive() || + c.isInstance(new Boolean(true)) || + c.isInstance(new Byte((byte)0)) || + c.isInstance(new Short((short)0)) || + c.isInstance(new Integer(0)) || + c.isInstance(new Long(0)) || + c.isInstance(new Float(0)) || + c.isInstance(new Double(0))) + return javaSizePrimitiveType; + + return javaSizeUnknownType; + } + + /** + * @return the size of this datatype + **/ + private int getSize(TypeInfo typeInfo) { + if (typeInfo instanceof PrimitiveTypeInfo) + return getSize(typeInfo.getPrimitiveClass()); + return javaSizeUnknownType; + } + + /** + * @return the size of each row + **/ + private int estimateRowSize() { + // estimate the size of each entry - + // a datatype with unknown size (String/Struct etc. - is assumed to be 256 bytes for now). + // 64 bytes is the overhead for a reference + int rowSize = javaHashEntryOverHead; + for (exprNodeDesc key : conf.getKeys()) + rowSize += getSize(key.getTypeInfo()); + + for (Method m : aggregationsEvaluateMethods) + rowSize += getSize(m.getReturnType()); + + return rowSize; + } + protected UDAF[] newAggregations() throws HiveException { UDAF[] aggs = new UDAF[aggregationClasses.length]; for(int i=0; i= total) + /** + * based on user-parameters, should the hash table be flushed + **/ + private boolean shouldBeFlushed() { + if (hashAggregations.size() >= maxEntriesHashAggr) return true; return false; } private void flush() throws HiveException { + // Currently, the algorithm flushes 10% of the entries - this can be // changed in the future Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 725928) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -1476,6 +1476,14 @@ private Operator genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo, String dest, Operator inputOperatorInfo) throws SemanticException { + + return genGroupByPlanReduceSinkOperator(parseInfo, dest, inputOperatorInfo, -1, false); + } + + @SuppressWarnings("nls") + private Operator genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo, + String dest, Operator inputOperatorInfo, int numReducers, boolean inferNumReducers) + throws SemanticException { RowResolver reduceSinkInputRowResolver = opParseCtx.get(inputOperatorInfo).getRR(); RowResolver reduceSinkOutputRowResolver = new RowResolver(); reduceSinkOutputRowResolver.setIsExprResolver(true); @@ -1533,7 +1541,7 @@ return putOpInsertMap( OperatorFactory.getAndMakeChild( PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, -1, - (parseInfo.getDistinctFuncExprForClause(dest) == null ? -1 : Integer.MAX_VALUE), -1, false), + (parseInfo.getDistinctFuncExprForClause(dest) == null ? -1 : Integer.MAX_VALUE), numReducers, inferNumReducers), new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver); @@ -1814,12 +1822,13 @@ Operator groupByOperatorInfo = genGroupByPlanMapGroupByOperator(qb, dest, inputOperatorInfo, groupByDesc.Mode.HASH); - // ////// Generate ReduceSink Operator - Operator reduceSinkOperatorInfo = - genGroupByPlanReduceSinkOperator(parseInfo, dest, groupByOperatorInfo); - // Optimize the scenario when there are no grouping keys and no distinct - 2 map-reduce jobs are not needed if (!optimizeMapAggrGroupBy(dest, qb)) { + + // ////// Generate ReduceSink Operator + Operator reduceSinkOperatorInfo = + genGroupByPlanReduceSinkOperator(parseInfo, dest, groupByOperatorInfo); + // ////// Generate GroupbyOperator for a partial aggregation Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator1(parseInfo, dest, reduceSinkOperatorInfo, groupByDesc.Mode.PARTIAL2); @@ -1831,8 +1840,13 @@ // ////// Generate GroupbyOperator3 return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo2, groupByDesc.Mode.FINAL); } - else + else { + // ////// Generate ReduceSink Operator + Operator reduceSinkOperatorInfo = + genGroupByPlanReduceSinkOperator(parseInfo, dest, groupByOperatorInfo, 1, false); + return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo, groupByDesc.Mode.FINAL); + } } @SuppressWarnings("nls")