diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index aa2dfc7..8e72b00 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -336,6 +336,11 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgePr setupAutoReducerParallelism(edgeProp, w); break; } + case CUSTOM_SIMPLE_EDGE: { + setupFastStart(edgeProp, v); + break; + } + default: // nothing } @@ -1265,6 +1270,20 @@ private void setupAutoReducerParallelism(TezEdgeProperty edgeProp, Vertex v) } } + private void setupFastStart(TezEdgeProperty edgeProp, Vertex v) + throws IOException { + if (!edgeProp.isSlowStart()) { + Configuration pluginConf = new Configuration(false); + VertexManagerPluginDescriptor desc = + VertexManagerPluginDescriptor.create(ShuffleVertexManager.class.getName()); + pluginConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 0); + pluginConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 0); + UserPayload payload = TezUtils.createUserPayloadFromConf(pluginConf); + desc.setUserPayload(payload); + v.setVertexManagerPlugin(desc); + } + } + public String createDagName(Configuration conf, QueryPlan plan) { String name = getUserSpecifiedDagName(conf); if (name == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index b9f5912..2956fd6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -18,13 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Stack; +import java.util.*; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -560,6 +554,8 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex Map columnExprMap = new HashMap(); rsOp.setColumnExprMap(columnExprMap); + rsOp.getConf().setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.QUICKSTART)); + // Create the final Group By Operator ArrayList aggsFinal = new ArrayList(); try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 905431f..d58f447 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -95,6 +95,7 @@ public static ReduceWork createReduceWork( ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot; reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); + reduceWork.setSlowStart(reduceSink.getConf().isSlowStart()); if (isAutoReduceParallelism && reduceSink.getConf().getReducerTraits().contains(AUTOPARALLEL)) { @@ -132,10 +133,11 @@ public static ReduceWork createReduceWork( EdgeType edgeType = determineEdgeType(context.preceedingWork, reduceWork, reduceSink); if (reduceWork.isAutoReduceParallelism()) { edgeProp = - new TezEdgeProperty(context.conf, edgeType, true, + new TezEdgeProperty(context.conf, edgeType, true, reduceWork.isSlowStart(), reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer); } else { edgeProp = new TezEdgeProperty(edgeType); + edgeProp.setSlowStart(reduceWork.isSlowStart()); } tezWork.connect( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 97f3300..c87de16 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -455,10 +455,11 @@ public Object process(Node nd, Stack stack, EdgeType edgeType = GenTezUtils.determineEdgeType(work, followingWork, rs); if (rWork.isAutoReduceParallelism()) { edgeProp = - new TezEdgeProperty(context.conf, edgeType, true, + new TezEdgeProperty(context.conf, edgeType, true, rWork.isSlowStart(), rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); } else { edgeProp = new TezEdgeProperty(edgeType); + edgeProp.setSlowStart(rWork.isSlowStart()); } tezWork.connect(work, followingWork, edgeProp); context.connectedReduceSinks.add(rs); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 38461d5..e30298b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -109,7 +109,8 @@ UNSET(0), // unset FIXED(1), // distribution of keys is fixed AUTOPARALLEL(2), // can change reducer count (ORDER BY can concat adjacent buckets) - UNIFORM(3); // can redistribute into buckets uniformly (GROUP BY can) + UNIFORM(3), // can redistribute into buckets uniformly (GROUP BY can) + QUICKSTART(4); // do not wait for downstream tasks private final int trait; @@ -441,6 +442,11 @@ public final boolean isAutoParallel() { return (this.reduceTraits.contains(ReducerTraits.AUTOPARALLEL)); } + @Explain(displayName = "slow start", explainLevels = {Level.EXTENDED }) + public final boolean isSlowStart() { + return !(this.reduceTraits.contains(ReducerTraits.QUICKSTART)); + } + public final EnumSet getReducerTraits() { return this.reduceTraits; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index ee784dc..dfed017 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -86,6 +86,9 @@ public ReduceWork(String name) { // boolean that says whether tez auto reduce parallelism should be used private boolean isAutoReduceParallelism; + // boolean that says whether to slow start or not + private boolean isSlowStart = true; + // for auto reduce parallelism - minimum reducers requested private int minReduceTasks; @@ -217,6 +220,14 @@ public boolean isAutoReduceParallelism() { return isAutoReduceParallelism; } + public boolean isSlowStart() { + return isSlowStart; + } + + public void setSlowStart(boolean isSlowStart) { + this.isSlowStart = isSlowStart; + } + public void setMinReduceTasks(int minReduceTasks) { this.minReduceTasks = minReduceTasks; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index a3aa12f..d87bee3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -35,6 +35,7 @@ private int numBuckets; private boolean isAutoReduce; + private boolean isSlowStart = true; private int minReducer; private int maxReducer; private long inputSizePerReducer; @@ -47,12 +48,13 @@ public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, } public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce, - int minReducer, int maxReducer, long bytesPerReducer) { + boolean isSlowStart, int minReducer, int maxReducer, long bytesPerReducer) { this(hiveConf, edgeType, -1); this.minReducer = minReducer; this.maxReducer = maxReducer; this.isAutoReduce = isAutoReduce; this.inputSizePerReducer = bytesPerReducer; + this.isSlowStart = isSlowStart; } public TezEdgeProperty(EdgeType edgeType) { @@ -86,4 +88,12 @@ public int getMaxReducer() { public long getInputSizePerReducer() { return inputSizePerReducer; } + + public boolean isSlowStart() { + return isSlowStart; + } + + public void setSlowStart(boolean slowStart) { + this.isSlowStart = slowStart; + } }