diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java index 174685b..5b673df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONMINREDUCER; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE; import java.util.HashSet; import java.util.Set; @@ -39,11 +40,13 @@ // only one reducer if this configuration does not prevents private final int minReducer; private final Set> removedOps; + private final boolean isMapAggr; public AbstractCorrelationProcCtx(ParseContext pctx) { removedOps = new HashSet>(); trustScript = pctx.getConf().getBoolVar(HIVESCRIPTOPERATORTRUST); minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER); + isMapAggr = pctx.getConf().getBoolVar(HIVEMAPSIDEAGGREGATE); this.pctx = pctx; } @@ -70,4 +73,8 @@ public boolean hasBeenRemoved(Operator rsOp) { public boolean addRemovedOperator(Operator rsOp) { return removedOps.add(rsOp); } + + public boolean isMapAggr() { + return isMapAggr; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java index 64bef21..7bb49be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import java.util.Set; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.ForwardOperator; @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.ReduceField; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkDeDuplication.ReduceSinkDeduplicateProcCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; @@ -163,10 +165,10 @@ protected static boolean hasGroupingSet(ReduceSinkOperator cRS) throws SemanticE return type.isInstance(parent) ? (T)parent : null; } - protected static Operator getStartForGroupBy(ReduceSinkOperator cRS) + protected static Operator getStartForGroupBy(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException { Operator parent = getSingleParent(cRS); - return parent instanceof GroupByOperator ? parent : cRS; // skip map-aggr GBY + return parent instanceof GroupByOperator && dedupCtx.isMapAggr() ? parent : cRS; // skip map-aggr GBY } @@ -240,6 +242,7 @@ protected static int indexOf(ExprNodeDesc cexpr, ExprNodeDesc[] pexprs, Operator || cursor instanceof FilterOperator || cursor instanceof ForwardOperator || cursor instanceof ScriptOperator + || cursor instanceof GroupByOperator || cursor instanceof ReduceSinkOperator)) { return null; } @@ -395,7 +398,7 @@ protected static void removeReduceSinkForGroupBy(ReduceSinkOperator cRS, GroupBy Operator parent = getSingleParent(cRS); - if (parent instanceof GroupByOperator) { + if ((parent instanceof GroupByOperator) && procCtx.isMapAggr()) { // pRS-cGBYm-cRS-cGBYr (map aggregation) --> pRS-cGBYr(COMPLETE) // copies desc of cGBYm to cGBYr and remove cGBYm and cRS GroupByOperator cGBYm = (GroupByOperator) parent; @@ -440,7 +443,7 @@ protected static void removeReduceSinkForGroupBy(ReduceSinkOperator cRS, GroupBy removeOperator(cRS, cGBYr, parent, context); procCtx.addRemovedOperator(cRS); - if (parent instanceof GroupByOperator) { + if ((parent instanceof GroupByOperator) && procCtx.isMapAggr()) { removeOperator(parent, cGBYr, getSingleParent(parent), context); procCtx.addRemovedOperator(cGBYr); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java index 7b5f9b2..56334ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java @@ -500,7 +500,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException { - Operator start = CorrelationUtilities.getStartForGroupBy(cRS); + Operator start = CorrelationUtilities.getStartForGroupBy(cRS, dedupCtx); GroupByOperator pGBY = CorrelationUtilities.findPossibleParent( start, GroupByOperator.class, dedupCtx.trustScript()); @@ -547,7 +547,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException { - Operator start = CorrelationUtilities.getStartForGroupBy(cRS); + Operator start = CorrelationUtilities.getStartForGroupBy(cRS, dedupCtx); JoinOperator pJoin = CorrelationUtilities.findPossibleParent( start, JoinOperator.class, dedupCtx.trustScript()); @@ -590,7 +590,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException { - Operator start = CorrelationUtilities.getStartForGroupBy(cRS); + Operator start = CorrelationUtilities.getStartForGroupBy(cRS, dedupCtx); ReduceSinkOperator pRS = CorrelationUtilities.findPossibleParent( start, ReduceSinkOperator.class, dedupCtx.trustScript());