diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index d8698da..ceba28d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -55,6 +55,8 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.hash.MurmurHash; +import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM; + /** * Reduce Sink Operator sends output to the reduce stage. **/ @@ -110,7 +112,7 @@ protected transient int numDistributionKeys; protected transient int numDistinctExprs; protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD) - protected transient boolean autoParallel = false; + protected transient boolean useUniformHash = false; // picks topN K:V pairs from input. protected transient TopNHash reducerHash = new TopNHash(); protected transient HiveKey keyWritable = new HiveKey(); @@ -217,7 +219,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); } - autoParallel = conf.isAutoParallel(); + useUniformHash = conf.getReducerTraits().contains(UNIFORM); firstRow = true; initializeChildren(hconf); @@ -339,7 +341,7 @@ public void processOp(Object row, int tag) throws HiveException { final int hashCode; // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0] - if (autoParallel && partitionEval.length > 0) { + if (useUniformHash && partitionEval.length > 0) { hashCode = computeMurmurHash(firstKey); } else { hashCode = computeHashCode(row); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index 65fb66e..ed6f713 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -55,6 +56,8 @@ import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.stats.StatsUtils; +import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.FIXED; + public class ReduceSinkMapJoinProc implements NodeProcessor { protected transient Log LOG = LogFactory.getLog(this.getClass().getName()); @@ -167,7 +170,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, if (joinConf.isBucketMapJoin()) { // disable auto parallelism for bucket map joins - parentRS.getConf().setAutoParallel(false); + parentRS.getConf().setReducerTraits(EnumSet.of(FIXED)); numBuckets = (Integer) joinConf.getBigTableBucketNumMapping().values().toArray()[0]; if (joinConf.getCustomBucketMapJoin()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java index 625af85..fef2c29 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.optimizer; +import java.util.Collection; +import java.util.EnumSet; import java.util.Stack; import org.apache.commons.logging.Log; @@ -31,9 +33,13 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL; +import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM; + /** * SetReducerParallelism determines how many reducers should * be run for a given reduce sink. @@ -86,7 +92,14 @@ public Object process(Node nd, Stack stack, maxReducers, false); LOG.info("Set parallelism for reduce sink "+sink+" to: "+numReducers); desc.setNumReducers(numReducers); - desc.setAutoParallel(true); + + final Collection keyCols = ExprNodeDescEqualityWrapper.transform(desc.getKeyCols()); + final Collection partCols = ExprNodeDescEqualityWrapper.transform(desc.getPartitionCols()); + if (keyCols != null && keyCols.equals(partCols)) { + desc.setReducerTraits(EnumSet.of(UNIFORM, AUTOPARALLEL)); + } else { + desc.setReducerTraits(EnumSet.of(AUTOPARALLEL)); + } } } else { LOG.info("Number of reducers determined to be: "+desc.getNumReducers()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index f2723ec..627ca7b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -57,6 +57,8 @@ import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; +import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL; + /** * GenTezUtils is a collection of shared helper methods to produce * TezWork @@ -117,7 +119,7 @@ public ReduceWork createReduceWork(GenTezProcContext context, Operator root, reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); - if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) { + if (isAutoReduceParallelism && reduceSink.getConf().getReducerTraits().contains(AUTOPARALLEL)) { reduceWork.setAutoReduceParallelism(true); // configured limit for reducers diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java index 82d86ee..0fe9eda 100755 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.apache.hadoop.hive.ql.lib.Node; @@ -125,5 +127,19 @@ public boolean equals(Object other) { public int hashCode() { return exprNodeDesc == null ? 0 : exprNodeDesc.hashCode(); } + + /* helper function to allow Set()/Collection() operations with ExprNodeDesc */ + public static Collection transform( + Collection descs) { + if (descs == null) { + return null; + } + final Collection wrapped = new ArrayList( + descs.size()); + for (ExprNodeDesc desc : descs) { + wrapped.add(new ExprNodeDescEqualityWrapper(desc)); + } + return wrapped; + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 2c4175a..57beb69 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.plan; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import org.apache.commons.logging.Log; @@ -90,7 +91,22 @@ //flag used to control how TopN handled for PTF/Windowing partitions. private boolean isPTFReduceSink = false; private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable - private Boolean autoParallel = null; // Is reducer auto-parallelism enabled, disabled or unset + + public static enum ReducerTraits { + UNSET(0), // unset + FIXED(1), // distribution of keys is fixed + AUTOPARALLEL(2), // can change reducer count (ORDER BY can concat adjacent buckets) + UNIFORM(3); // can redistribute into buckets uniformly (GROUP BY can) + + private final int trait; + + private ReducerTraits(int trait) { + this.trait = trait; + } + }; + + // Is reducer auto-parallelism unset (FIXED, UNIFORM, PARALLEL) + private EnumSet reduceTraits = EnumSet.of(ReducerTraits.UNSET); // Write type, since this needs to calculate buckets differently for updates and deletes private AcidUtils.Operation writeType; @@ -148,7 +164,7 @@ public Object clone() { desc.setBucketCols(bucketCols); desc.setStatistics(this.getStatistics()); desc.setSkipTag(skipTag); - desc.autoParallel = autoParallel; + desc.reduceTraits = reduceTraits.clone(); return desc; } @@ -361,16 +377,30 @@ public boolean getSkipTag() { @Explain(displayName = "auto parallelism", normalExplain = false) public final boolean isAutoParallel() { - return (autoParallel != null) && autoParallel; + return (this.reduceTraits.contains(ReducerTraits.AUTOPARALLEL)); + } + + public final EnumSet getReducerTraits() { + return this.reduceTraits; } - public final void setAutoParallel(final boolean autoParallel) { + public final void setReducerTraits(EnumSet traits) { // we don't allow turning on auto parallel once it has been // explicitly turned off. That is to avoid scenarios where // auto parallelism could break assumptions about number of // reducers or hash function. - if (this.autoParallel == null || this.autoParallel == true) { - this.autoParallel = autoParallel; + + boolean wasUnset = this.reduceTraits.remove(ReducerTraits.UNSET); + + if (this.reduceTraits.contains(ReducerTraits.FIXED)) { + return; + } else if (traits.contains(ReducerTraits.FIXED)) { + this.reduceTraits.removeAll(EnumSet.of( + ReducerTraits.AUTOPARALLEL, + ReducerTraits.UNIFORM)); + this.reduceTraits.addAll(traits); + } else { + this.reduceTraits.addAll(traits); } }