diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0dea0996c9..255663cb15 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1748,7 +1748,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVEMAPAGGRMEMORYTHRESHOLD("hive.map.aggr.hash.force.flush.memory.threshold", (float) 0.9, "The max memory to be used by map-side group aggregation hash table.\n" + "If the memory usage is higher than this number, force to flush data"), - HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float) 0.5, + HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float) 0.75, "Hash aggregation will be turned off if the ratio between hash table size and input rows is bigger than this number. \n" + "Set to 1 to make sure hash aggregation is never turned off."), HIVEMULTIGROUPBYSINGLEREDUCER("hive.multigroupby.singlereducer", true, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 583460f2c4..4cd21780cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -390,8 +390,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { // compare every groupbyMapAggrInterval rows numRowsCompareHashAggr = groupbyMapAggrInterval; - minReductionHashAggr = HiveConf.getFloatVar(hconf, - HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION); + minReductionHashAggr = getConf().getMinReductionHashAggr(); } List fieldNames = new ArrayList(conf.getOutputColumnNames()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 0d80c9edce..b5f5933670 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -363,11 +363,9 @@ public void initialize(Configuration hconf) throws HiveException { this.checkInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL); this.maxHtEntries = HiveConf.getIntVar(hconf, - HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES); - this.minReductionHashAggr = HiveConf.getFloatVar(hconf, - HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION); - this.numRowsCompareHashAggr = HiveConf.getIntVar(hconf, - HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL); + HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES); + this.numRowsCompareHashAggr = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL); } else { this.percentEntriesToFlush = @@ -376,12 +374,12 @@ public void initialize(Configuration hconf) throws HiveException { HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL.defaultIntVal; this.maxHtEntries = HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES.defaultIntVal; - this.minReductionHashAggr = - HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION.defaultFloatVal; - this.numRowsCompareHashAggr = + this.numRowsCompareHashAggr = HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL.defaultIntVal; } + minReductionHashAggr = getConf().getMinReductionHashAggr(); + sumBatchSize = 0; mapKeysAggregationBuffers = new HashMap(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index 439fb75aca..605fbba87f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -469,6 +469,9 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars float memoryThreshold = HiveConf.getFloatVar(parseContext.getConf(), HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + float minReductionHashAggr = + HiveConf.getFloatVar(parseContext.getConf(), + ConfVars.HIVEMAPAGGRHASHMINREDUCTION); ArrayList groupByExprs = new ArrayList(); ExprNodeDesc groupByExpr = @@ -478,7 +481,7 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars GroupByDesc groupBy = new GroupByDesc(GroupByDesc.Mode.HASH, outputNames, groupByExprs, new ArrayList(), false, groupByMemoryUsage, memoryThreshold, - null, false, -1, true); + minReductionHashAggr, null, false, -1, true); GroupByOperator groupByOp = (GroupByOperator) OperatorFactory.getAndMakeChild( groupBy, selectOp); @@ -612,6 +615,9 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex float memoryThreshold = HiveConf.getFloatVar(parseContext.getConf(), HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + float minReductionHashAggr = + HiveConf.getFloatVar(parseContext.getConf(), + ConfVars.HIVEMAPAGGRHASHMINREDUCTION); // Add min/max and bloom filter aggregations List aggFnOIs = new ArrayList(); @@ -658,8 +664,8 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(1)); gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(2)); GroupByDesc groupBy = new GroupByDesc(GroupByDesc.Mode.HASH, - gbOutputNames, new ArrayList(), aggs, false, - groupByMemoryUsage, memoryThreshold, null, false, -1, false); + gbOutputNames, new ArrayList(), aggs, false, + groupByMemoryUsage, memoryThreshold, minReductionHashAggr, null, false, -1, false); ArrayList groupbyColInfos = new ArrayList(); groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(0), key.getTypeInfo(), "", false)); @@ -758,7 +764,7 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex GroupByDesc groupByDescFinal = new GroupByDesc(GroupByDesc.Mode.FINAL, gbOutputNames, new ArrayList(), aggsFinal, false, - groupByMemoryUsage, memoryThreshold, null, false, 0, false); + groupByMemoryUsage, memoryThreshold, minReductionHashAggr, null, false, 0, false); GroupByOperator groupByOpFinal = (GroupByOperator)OperatorFactory.getAndMakeChild( groupByDescFinal, new RowSchema(rsOp.getSchema()), rsOp); groupByOpFinal.setColumnExprMap(new HashMap()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetHashGroupByMinReduction.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetHashGroupByMinReduction.java new file mode 100644 index 0000000000..dcd989be48 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetHashGroupByMinReduction.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ColStatistics; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc.Mode; +import org.apache.hadoop.hive.ql.plan.Statistics.State; +import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * SetHashGroupByMinReduction determines the min reduction to perform + * a hash aggregation for a group by. + */ +public class SetHashGroupByMinReduction implements NodeProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(SetHashGroupByMinReduction.class.getName()); + + @SuppressWarnings("unchecked") + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procContext, Object... nodeOutputs) + throws SemanticException { + + GroupByOperator groupByOperator = (GroupByOperator) nd; + GroupByDesc desc = groupByOperator.getConf(); + + if (desc.getMode() != Mode.HASH || groupByOperator.getStatistics().getBasicStatsState() != State.COMPLETE + || groupByOperator.getStatistics().getColumnStatsState() != State.COMPLETE) { + return null; + } + + // compute product of distinct values of grouping columns + List colStats = new ArrayList<>(); + for (int i = 0; i < desc.getKeys().size(); i++) { + ColumnInfo ci = groupByOperator.getSchema().getSignature().get(i); + colStats.add( + groupByOperator.getStatistics().getColumnStatisticsFromColName(ci.getInternalName())); + } + long ndvProduct = StatsUtils.computeNDVGroupingColumns( + colStats, groupByOperator.getParentOperators().get(0).getStatistics()); + // if ndvProduct is 0 then column stats state must be partial and we are missing + if (ndvProduct == 0) { + return null; + } + + long numRows = groupByOperator.getStatistics().getNumRows(); + if (ndvProduct > numRows) { + ndvProduct = numRows; + } + + // change the min reduction for hash group by + float defaultMinReductionHashAggrFactor = desc.getMinReductionHashAggr(); + float minReductionHashAggrFactor = 1f - ((float) ndvProduct / numRows); + if (minReductionHashAggrFactor > defaultMinReductionHashAggrFactor) { + desc.setMinReductionHashAggr(minReductionHashAggrFactor); + if (LOG.isDebugEnabled()) { + LOG.debug("Minimum reduction for hash group by operator {} set to {}", groupByOperator, minReductionHashAggrFactor); + } + } + + return null; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java index 70f83433d8..cc6d57794a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java @@ -117,6 +117,7 @@ float groupByMemoryUsage; float memoryThreshold; + float minReductionHashAggr; private HIVEGBPHYSICALMODE gbPhysicalPipelineMode; }; @@ -277,6 +278,7 @@ private static GBInfo getGBInfo(HiveAggregate aggRel, OpAttr inputOpAf, HiveConf // 4. Gather GB Memory threshold gbInfo.groupByMemoryUsage = HiveConf.getFloatVar(hc, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); gbInfo.memoryThreshold = HiveConf.getFloatVar(hc, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + gbInfo.minReductionHashAggr = HiveConf.getFloatVar(hc, HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION); // 5. Gather GB Physical pipeline (based on user config & Grping Sets size) gbInfo.gbPhysicalPipelineMode = getAggOPMode(hc, gbInfo); @@ -821,7 +823,7 @@ private static OpAttr genReduceSideGB2(OpAttr inputOpAf, GBInfo gbInfo) throws S Operator rsGBOp2 = OperatorFactory.getAndMakeChild(new GroupByDesc(GroupByDesc.Mode.FINAL, outputColNames, gbKeys, aggregations, false, gbInfo.groupByMemoryUsage, - gbInfo.memoryThreshold, null, false, groupingSetsPosition, gbInfo.containsDistinctAggr), + gbInfo.memoryThreshold, gbInfo.minReductionHashAggr, null, false, groupingSetsPosition, gbInfo.containsDistinctAggr), new RowSchema(colInfoLst), rs); rsGBOp2.setColumnExprMap(colExprMap); @@ -959,7 +961,7 @@ private static OpAttr genReduceSideGB1(OpAttr inputOpAf, GBInfo gbInfo, boolean && !finalGB && !(gbInfo.gbPhysicalPipelineMode == HIVEGBPHYSICALMODE.MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT); Operator rsGBOp = OperatorFactory.getAndMakeChild(new GroupByDesc(gbMode, outputColNames, - gbKeys, aggregations, gbInfo.groupByMemoryUsage, gbInfo.memoryThreshold, gbInfo.grpSets, + gbKeys, aggregations, gbInfo.groupByMemoryUsage, gbInfo.memoryThreshold, gbInfo.minReductionHashAggr, gbInfo.grpSets, includeGrpSetInGBDesc, groupingSetsColPosition, gbInfo.containsDistinctAggr), new RowSchema(colInfoLst), rs); @@ -1071,7 +1073,7 @@ private static OpAttr genReduceSideGB1NoMapGB(OpAttr inputOpAf, GBInfo gbInfo, } Operator rsGB1 = OperatorFactory.getAndMakeChild(new GroupByDesc(gbMode, outputColNames, - gbKeys, aggregations, false, gbInfo.groupByMemoryUsage, gbInfo.memoryThreshold, null, + gbKeys, aggregations, false, gbInfo.groupByMemoryUsage, gbInfo.memoryThreshold, gbInfo.minReductionHashAggr, null, false, -1, numDistinctUDFs > 0), new RowSchema(colInfoLst), rs); rsGB1.setColumnExprMap(colExprMap); @@ -1167,7 +1169,7 @@ private static OpAttr genMapSideGB(OpAttr inputOpAf, GBInfo gbAttrs) throws Sema @SuppressWarnings("rawtypes") Operator gbOp = OperatorFactory.getAndMakeChild(new GroupByDesc(GroupByDesc.Mode.HASH, outputColNames, gbKeys, aggregations, false, gbAttrs.groupByMemoryUsage, - gbAttrs.memoryThreshold, gbAttrs.grpSets, inclGrpID, groupingSetsPosition, + gbAttrs.memoryThreshold, gbAttrs.minReductionHashAggr, gbAttrs.grpSets, inclGrpID, groupingSetsPosition, gbAttrs.containsDistinctAggr), new RowSchema(colInfoLst), inputOpAf.inputs.get(0)); // 5. Setup Expr Col Map diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index 6aeb2a856f..3b0b038a1d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -1325,34 +1325,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, stats = parentStats.clone(); stats.setColumnStats(colStats); - long ndvProduct = 1; final long parentNumRows = stats.getNumRows(); // compute product of distinct values of grouping columns - for (ColStatistics cs : colStats) { - if (cs != null) { - long ndv = cs.getCountDistint(); - if (cs.getNumNulls() > 0) { - ndv = StatsUtils.safeAdd(ndv, 1); - } - ndvProduct = StatsUtils.safeMult(ndvProduct, ndv); - } else { - if (parentStats.getColumnStatsState().equals(Statistics.State.COMPLETE)) { - // the column must be an aggregate column inserted by GBY. We - // don't have to account for this column when computing product - // of NDVs - continue; - } else { - // partial column statistics on grouping attributes case. - // if column statistics on grouping attribute is missing, then - // assume worst case. - // GBY rule will emit half the number of rows if ndvProduct is 0 - ndvProduct = 0; - } - break; - } - } - + long ndvProduct = + StatsUtils.computeNDVGroupingColumns(colStats, parentStats); // if ndvProduct is 0 then column stats state must be partial and we are missing // column stats for a group by column if (ndvProduct == 0) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 05257c9aa7..a5edab6d56 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -5034,10 +5034,12 @@ private Operator genGroupByPlanGroupByOperator(QBParseInfo parseInfo, float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + float minReductionHashAggr = HiveConf + .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false, groupByMemoryUsage, memoryThreshold, null, false, -1, numDistinctUDFs > 0), + false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, null, false, -1, numDistinctUDFs > 0), new RowSchema(groupByOutputRowResolver.getColumnInfos()), input), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); @@ -5296,6 +5298,8 @@ private Operator genGroupByPlanGroupByOperator1(QBParseInfo parseInfo, float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + float minReductionHashAggr = HiveConf + .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION); // Nothing special needs to be done for grouping sets if // this is the final group by operator, and multiple rows corresponding to the @@ -5304,7 +5308,7 @@ private Operator genGroupByPlanGroupByOperator1(QBParseInfo parseInfo, // additional rows corresponding to grouping sets need to be created here. Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - groupByMemoryUsage, memoryThreshold, + groupByMemoryUsage, memoryThreshold, minReductionHashAggr, groupingSets, groupingSetsPresent && groupingSetsNeedAdditionalMRJob, groupingSetsPosition, containsDistinctAggr), @@ -5477,9 +5481,11 @@ private Operator genGroupByPlanMapGroupByOperator(QB qb, float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + float minReductionHashAggr = HiveConf + .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false, groupByMemoryUsage, memoryThreshold, + false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, groupingSetKeys, groupingSetsPresent, groupingSetsPosition, containsDistinctAggr), new RowSchema(groupByOutputRowResolver.getColumnInfos()), inputOperatorInfo), groupByOutputRowResolver); @@ -6014,10 +6020,12 @@ private Operator genGroupByPlanGroupByOperator2MR(QBParseInfo parseInfo, float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + float minReductionHashAggr = HiveConf + .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false, groupByMemoryUsage, memoryThreshold, null, false, + false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, null, false, groupingSetsPosition, containsDistinctAggr), new RowSchema(groupByOutputRowResolver2.getColumnInfos()), reduceSinkOperatorInfo2), groupByOutputRowResolver2); @@ -9270,9 +9278,11 @@ private Operator genMapGroupByForSemijoin(QB qb, ArrayList fields, float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + float minReductionHashAggr = HiveConf + .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false, groupByMemoryUsage, memoryThreshold, null, false, -1, false), + false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, null, false, -1, false), new RowSchema(groupByOutputRowResolver.getColumnInfos()), input), groupByOutputRowResolver); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index a917617fb8..fd5293a0b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hive.ql.optimizer.NonBlockingOpDeDupProc; import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc; import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize; +import org.apache.hadoop.hive.ql.optimizer.SetHashGroupByMinReduction; import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism; import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizer; import org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer; @@ -451,7 +452,9 @@ private void runStatsDependentOptimizations(OptimizeTezProcContext procCtx, opRules.put(new RuleRegExp("Set parallelism - ReduceSink", ReduceSinkOperator.getOperatorName() + "%"), new SetReducerParallelism()); - + opRules.put(new RuleRegExp("Set min reduction - GBy (Hash)", + GroupByOperator.getOperatorName() + "%"), + new SetHashGroupByMinReduction()); opRules.put(new RuleRegExp("Convert Join to Map-join", JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java index 31237c8289..ef5f7c5907 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java @@ -73,6 +73,7 @@ private ArrayList outputColumnNames; private float groupByMemoryUsage; private float memoryThreshold; + private float minReductionHashAggr; transient private boolean isDistinct; private boolean dontResetAggrsDistinct; @@ -86,12 +87,13 @@ public GroupByDesc( final ArrayList aggregators, final float groupByMemoryUsage, final float memoryThreshold, + final float minReductionHashAggr, final List listGroupingSets, final boolean groupingSetsPresent, final int groupingSetsPosition, final boolean isDistinct) { this(mode, outputColumnNames, keys, aggregators, - false, groupByMemoryUsage, memoryThreshold, listGroupingSets, + false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, listGroupingSets, groupingSetsPresent, groupingSetsPosition, isDistinct); } @@ -103,6 +105,7 @@ public GroupByDesc( final boolean bucketGroup, final float groupByMemoryUsage, final float memoryThreshold, + final float minReductionHashAggr, final List listGroupingSets, final boolean groupingSetsPresent, final int groupingSetsPosition, @@ -114,6 +117,7 @@ public GroupByDesc( this.bucketGroup = bucketGroup; this.groupByMemoryUsage = groupByMemoryUsage; this.memoryThreshold = memoryThreshold; + this.minReductionHashAggr = minReductionHashAggr; this.listGroupingSets = listGroupingSets; this.groupingSetsPresent = groupingSetsPresent; this.groupingSetPosition = groupingSetsPosition; @@ -209,6 +213,15 @@ public void setMemoryThreshold(float memoryThreshold) { this.memoryThreshold = memoryThreshold; } + @Explain(displayName = "minReductionHashAggr") + public Float getMinReductionHashAggr() { + return mode == Mode.HASH ? minReductionHashAggr : null; + } + + public void setMinReductionHashAggr(float minReductionHashAggr) { + this.minReductionHashAggr = minReductionHashAggr; + } + @Explain(displayName = "aggregations", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @Signature public List getAggregatorStrings() { @@ -320,7 +333,8 @@ public Object clone() { List listGroupingSets = new ArrayList<>(); listGroupingSets.addAll(this.listGroupingSets); return new GroupByDesc(this.mode, outputColumnNames, keys, aggregators, - this.groupByMemoryUsage, this.memoryThreshold, listGroupingSets, this.groupingSetsPresent, + this.groupByMemoryUsage, this.memoryThreshold, this.minReductionHashAggr, + listGroupingSets, this.groupingSetsPresent, this.groupingSetPosition, this.isDistinct); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index 2a7cf8c897..97853d5276 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -1996,4 +1996,35 @@ public static void updateStats(Statistics stats, long newNumRows, stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); } } + + public static long computeNDVGroupingColumns(List colStats, Statistics parentStats) { + long ndvProduct = 1; + + // compute product of distinct values of grouping columns + for (ColStatistics cs : colStats) { + if (cs != null) { + long ndv = cs.getCountDistint(); + if (cs.getNumNulls() > 0) { + ndv = StatsUtils.safeAdd(ndv, 1); + } + ndvProduct = StatsUtils.safeMult(ndvProduct, ndv); + } else { + if (parentStats.getColumnStatsState().equals(Statistics.State.COMPLETE)) { + // the column must be an aggregate column inserted by GBY. We + // don't have to account for this column when computing product + // of NDVs + continue; + } else { + // partial column statistics on grouping attributes case. + // if column statistics on grouping attribute is missing, then + // assume worst case. + // GBY rule will emit half the number of rows if ndvProduct is 0 + ndvProduct = 0; + } + break; + } + } + + return ndvProduct; + } }