diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e64e8fc..632b211 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -687,6 +687,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { " Orderby without limit."), HIVEALIAS("hive.alias", "", ""), HIVEMAPSIDEAGGREGATE("hive.map.aggr", true, "Whether to use map-side aggregation in Hive Group By queries"), + HIVEMAPSIDEAGGREGATEINITCAPACITY("hive.map.aggr.init.capacity", 256, new RangeValidator(256, 256 << 10), + "Initial capacity for map-side aggregation in Hive Group By queries"), HIVEGROUPBYSKEW("hive.groupby.skewindata", false, "Whether there is skew in data to optimize group by queries"), HIVEJOINEMITINTERVAL("hive.join.emit.interval", 1000, "How many rows in the right-most join operand Hive should buffer before emitting the join result."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index dfee3a5..c463529 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -341,7 +341,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { aggregations = newAggregations(); hashAggr = false; } else { - hashAggregations = new HashMap(256); + int initCapacity = conf.getHashInitCapacity(); + hashAggregations = new HashMap(initCapacity); aggregations = newAggregations(); hashAggr = true; keyPositionsSize = new ArrayList(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index 8546d21..4fe6c17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -300,7 +300,7 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars GroupByDesc groupBy = new GroupByDesc(GroupByDesc.Mode.HASH, outputNames, groupByExprs, - new ArrayList(), false, groupByMemoryUsage, memoryThreshold, + new ArrayList(), groupByMemoryUsage, memoryThreshold, null, false, 0, true); GroupByOperator groupByOp = diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index b515525..66b4257 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.RecordReader; import org.apache.hadoop.hive.ql.exec.RecordWriter; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; @@ -139,6 +140,7 @@ import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec; import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc; import org.apache.hadoop.hive.ql.plan.CreateViewDesc; @@ -181,6 +183,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.ResourceType; import org.apache.hadoop.hive.ql.stats.StatsFactory; +import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; @@ -4277,7 +4280,7 @@ private Operator genGroupByPlanGroupByOperator(QBParseInfo parseInfo, Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false, groupByMemoryUsage, memoryThreshold, null, false, -1, numDistinctUDFs > 0), + groupByMemoryUsage, memoryThreshold, null, false, -1, numDistinctUDFs > 0), new RowSchema(groupByOutputRowResolver.getColumnInfos()), input), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); @@ -4541,7 +4544,7 @@ private Operator genGroupByPlanGroupByOperator1(QBParseInfo parseInfo, // additional rows corresponding to grouping sets need to be created here. Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - groupByMemoryUsage, memoryThreshold, + groupByMemoryUsage, memoryThreshold, groupingSets, groupingSetsPresent && groupingSetsNeedAdditionalMRJob, groupingSetsPosition, containsDistinctAggr), @@ -4580,8 +4583,6 @@ private void createNewGroupingKey(List groupByKeys, * (qb.getParseInfo().getXXX(dest)). The new GroupByOperator will be a child * of the inputOperatorInfo. * - * @param mode - * The mode of the aggregation (HASH) * @param genericUDAFEvaluators * If not null, this function will store the mapping from Aggregation * StringTree to the genericUDAFEvaluator in this parameter, so it @@ -4593,11 +4594,11 @@ private Operator genGroupByPlanMapGroupByOperator(QB qb, String dest, List grpByExprs, Operator inputOperatorInfo, - GroupByDesc.Mode mode, Map genericUDAFEvaluators, List groupingSetKeys, boolean groupingSetsPresent) throws SemanticException { + GroupByDesc.Mode mode = GroupByDesc.Mode.HASH; RowResolver groupByInputRowResolver = opParseCtx.get(inputOperatorInfo) .getRowResolver(); QBParseInfo parseInfo = qb.getParseInfo(); @@ -4709,11 +4710,14 @@ private Operator genGroupByPlanMapGroupByOperator(QB qb, float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + + GroupByDesc groupByDesc = new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + groupByMemoryUsage, memoryThreshold, + groupingSetKeys, groupingSetsPresent, groupingSetsPosition, containsDistinctAggr); + groupByDesc.setHashInitCapacity(getHashInitCapacity(inputOperatorInfo, groupByKeys)); + Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false, groupByMemoryUsage, memoryThreshold, - groupingSetKeys, groupingSetsPresent, groupingSetsPosition, containsDistinctAggr), - new RowSchema(groupByOutputRowResolver.getColumnInfos()), + groupByDesc, new RowSchema(groupByOutputRowResolver.getColumnInfos()), inputOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; @@ -5249,7 +5253,7 @@ private Operator genGroupByPlanGroupByOperator2MR(QBParseInfo parseInfo, Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false, groupByMemoryUsage, memoryThreshold, null, false, + groupByMemoryUsage, memoryThreshold, null, false, groupingSetsPosition, containsDistinctAggr), new RowSchema(groupByOutputRowResolver2.getColumnInfos()), reduceSinkOperatorInfo2), groupByOutputRowResolver2); @@ -5705,7 +5709,6 @@ private Operator genGroupByPlanMapAggrNoSkew(String dest, QB qb, dest, grpByExprs, inputOperatorInfo, - GroupByDesc.Mode.HASH, genericUDAFEvaluators, groupingSets, groupingSetsPresent && !groupingSetsNeedAdditionalMRJob); @@ -5871,7 +5874,7 @@ private Operator genGroupByPlanMapAggr2MR(String dest, QB qb, new LinkedHashMap(); GroupByOperator groupByOperatorInfo = (GroupByOperator) genGroupByPlanMapGroupByOperator( - qb, dest, grpByExprs, inputOperatorInfo, GroupByDesc.Mode.HASH, + qb, dest, grpByExprs, inputOperatorInfo, genericUDAFEvaluators, groupingSets, groupingSetsPresent); groupOpToInputTables.put(groupByOperatorInfo, opParseCtx.get( @@ -7544,8 +7547,7 @@ private Operator genJoinOperator(QB qb, QBJoinTree joinTree, // generate a groupby operator (HASH mode) for a map-side partial // aggregation for semijoin - srcOps[pos++] = genMapGroupByForSemijoin(qb, fields, srcOp, - GroupByDesc.Mode.HASH); + srcOps[pos++] = genMapGroupByForSemijoin(fields, srcOp); } else { srcOps[pos++] = srcOp; } @@ -7613,16 +7615,10 @@ private Operator insertSelectForSemijoin(ArrayList fields, return output; } - private Operator genMapGroupByForSemijoin(QB qb, ArrayList fields, // the - // ASTNode - // of - // the - // join - // key - // "tab.col" - Operator inputOperatorInfo, GroupByDesc.Mode mode) - throws SemanticException { - + // fields : the ASTNode of the join key "tab.col" + private Operator genMapGroupByForSemijoin(ArrayList fields, + Operator inputOperatorInfo) throws SemanticException { + GroupByDesc.Mode mode = GroupByDesc.Mode.HASH; RowResolver groupByInputRowResolver = opParseCtx.get(inputOperatorInfo) .getRowResolver(); RowResolver groupByOutputRowResolver = new RowResolver(); @@ -7630,7 +7626,6 @@ private Operator genMapGroupByForSemijoin(QB qb, ArrayList fields, // t ArrayList outputColumnNames = new ArrayList(); ArrayList aggregations = new ArrayList(); Map colExprMap = new HashMap(); - qb.getParseInfo(); groupByOutputRowResolver.setIsExprResolver(true); // join keys should only // be columns but not be @@ -7658,16 +7653,42 @@ private Operator genMapGroupByForSemijoin(QB qb, ArrayList fields, // t float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + + GroupByDesc groupByDesc = new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + groupByMemoryUsage, memoryThreshold, null, false, -1, false); + groupByDesc.setHashInitCapacity(getHashInitCapacity(inputOperatorInfo, groupByKeys)); + Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false, groupByMemoryUsage, memoryThreshold, null, false, -1, false), - new RowSchema(groupByOutputRowResolver.getColumnInfos()), + groupByDesc, new RowSchema(groupByOutputRowResolver.getColumnInfos()), inputOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; } + private int getHashInitCapacity(Operator input, List exprs) throws SemanticException { + int initCapacity = conf.getIntVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATEINITCAPACITY); + if (exprs.size() != 1) { + return initCapacity; // todo + } + ExprNodeDesc expr = exprs.get(0); + TableScanOperator ts = + OperatorUtils.findSingleOperatorUpstream(input, TableScanOperator.class); + Table table = topToTable.get(ts); + if (table != null) { + ExprNodeDesc column = ExprNodeDescUtils.backtrack(expr, input, ts); + if (column instanceof ExprNodeColumnDesc) { + List stats = StatsUtils.getTableColumnStats( + table, Arrays.asList(((ExprNodeColumnDesc) column).getColumn())); + if (stats != null && stats.size() == 1) { + ColStatistics colStats = stats.get(0); + initCapacity = (int)(colStats.getCountDistint() >> 3); // heuristic + } + } + } + return Math.min(Math.max(initCapacity, 256), 256 << 10); + } + private ExprNodeDesc[][] genJoinOperatorTypeCheck(ExprNodeDesc[][] keys) throws SemanticException { // keys[i] -> ArrayList for the i-th join operator key list diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java index d6aad9f..a1a3fa5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hive.common.util.AnnotationUtils; @@ -69,6 +68,8 @@ transient private boolean isDistinct; private boolean dontResetAggrsDistinct; + private int hashInitCapacity; + // Extra parameters only for vectorization. private VectorGroupByDesc vectorDesc; @@ -78,26 +79,9 @@ public GroupByDesc() { public GroupByDesc( final Mode mode, - final ArrayList outputColumnNames, - final ArrayList keys, - final ArrayList aggregators, - final float groupByMemoryUsage, - final float memoryThreshold, - final List listGroupingSets, - final boolean groupingSetsPresent, - final int groupingSetsPosition, - final boolean isDistinct) { - this(mode, outputColumnNames, keys, aggregators, - false, groupByMemoryUsage, memoryThreshold, listGroupingSets, - groupingSetsPresent, groupingSetsPosition, isDistinct); - } - - public GroupByDesc( - final Mode mode, - final ArrayList outputColumnNames, + final ArrayList outputColumnNames, final ArrayList keys, - final ArrayList aggregators, - final boolean bucketGroup, + final ArrayList aggregators, final float groupByMemoryUsage, final float memoryThreshold, final List listGroupingSets, @@ -109,7 +93,6 @@ public GroupByDesc( this.outputColumnNames = outputColumnNames; this.keys = keys; this.aggregators = aggregators; - this.bucketGroup = bucketGroup; this.groupByMemoryUsage = groupByMemoryUsage; this.memoryThreshold = memoryThreshold; this.listGroupingSets = listGroupingSets; @@ -299,4 +282,11 @@ public void setDistinct(boolean isDistinct) { this.isDistinct = isDistinct; } + public int getHashInitCapacity() { + return hashInitCapacity; + } + + public void setHashInitCapacity(int hashInitCapacity) { + this.hashInitCapacity = hashInitCapacity; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index b23baf3..f893c5f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -257,13 +257,13 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa stats.addToColumnStats(emptyStats); stats.updateColumnStatsState(deriveStatType(emptyStats, referencedColumns)); } else { - List colStats = aggrStats.getColStats(); + List colStats = + convertColStats(table.getTableName(), aggrStats.getColStats()); if (colStats.size() != neededColumns.size()) { LOG.debug("Column stats requested for : " + neededColumns.size() + " columns. Able to" + " retrieve for " + colStats.size() + " columns"); } - List columnStats = convertColStats(colStats, table.getTableName(), - colToTabAlias); + List columnStats = setTableAlias(colStats, colToTabAlias); addParitionColumnStats(conf, neededColumns, referencedColumns, schema, table, partList, columnStats); @@ -611,32 +611,46 @@ public static ColStatistics getColStatistics(ColumnStatisticsObj cso, String tab */ public static List getTableColumnStats( Table table, List schema, List neededColumns) { - String dbName = table.getDbName(); - String tabName = table.getTableName(); Map colToTabAlias = new HashMap(schema.size()); List neededColsInTable = processNeededColumns(schema, neededColumns, colToTabAlias); - List stats = null; + List colStat = getTableColumnStats(table, neededColsInTable); + if (colStat != null) { + return setTableAlias(colStat, colToTabAlias); + } + return null; + } + + public static List getTableColumnStats( + Table table, List neededColumns) { + String dbName = table.getDbName(); + String tabName = table.getTableName(); try { - List colStat = Hive.get().getTableColumnStatistics( - dbName, tabName, neededColsInTable); - stats = convertColStats(colStat, tabName, colToTabAlias); + List colStats = + Hive.get().getTableColumnStatistics(dbName, tabName, neededColumns); + return convertColStats(tabName, colStats); } catch (HiveException e) { LOG.error("Failed to retrieve table statistics: ", e); - stats = null; } - return stats; + return null; } - private static List convertColStats(List colStats, String tabName, - Map colToTabAlias) { + private static List convertColStats( + String tabName, List colStats) { List stats = new ArrayList(colStats.size()); for (ColumnStatisticsObj statObj : colStats) { - ColStatistics cs = getColStatistics(statObj, tabName, statObj.getColName()); - cs.setTableAlias(colToTabAlias.get(cs.getColumnName())); - stats.add(cs); + stats.add(getColStatistics(statObj, tabName, statObj.getColName())); } return stats; } + + private static List setTableAlias( + List colStats, Map colToTabAlias) { + for (ColStatistics cs : colStats) { + cs.setTableAlias(colToTabAlias.get(cs.getColumnName())); + } + return colStats; + } + private static List processNeededColumns(List schema, List neededColumns, Map colToTabAlias) { for (ColumnInfo col : schema) {