diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8104e84..d26573e 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -190,8 +190,8 @@
COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),
COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""),
- BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 1000)),
- MAXREDUCERS("hive.exec.reducers.max", 999),
+ BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (256 * 1000 * 1000)),
+ MAXREDUCERS("hive.exec.reducers.max", 1009), // pick a prime
PREEXECHOOKS("hive.exec.pre.hooks", ""),
POSTEXECHOOKS("hive.exec.post.hooks", ""),
ONFAILUREHOOKS("hive.exec.failure.hooks", ""),
@@ -1028,6 +1028,9 @@
HIVE_CHECK_CROSS_PRODUCT("hive.exec.check.crossproducts", true),
HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL("hive.localize.resource.wait.interval", 5000L), // in ms
HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS("hive.localize.resource.num.wait.attempts", 5),
+ TEZ_AUTO_REDUCER_PARALLELISM("hive.tez.auto.reducer.parallelism", false),
+ TEZ_MAX_PARTITION_FACTOR("hive.tez.max.partition.factor", 2f),
+ TEZ_MIN_PARTITION_FACTOR("hive.tez.min.partition.factor", 0.25f)
;
public final String varname;
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index 49a099b..8a74e4e 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -2779,4 +2779,26 @@
joins unnecessary memory will be allocated and then trimmed.
+
+ hive.tez.auto.reducer.parallelism
+ false
+ Turn on Tez' auto reducer parallelism feature. When enabled, Hive will still estimate data sizes
+ and set parallelism estimates. Tez will sample source vertices' output sizes and adjust the estimates at runtime as
+ necessary.
+
+
+
+ hive.tez.max.partition.factor
+ 2f
+ When auto reducer parallelism is enabled this factor will be used to over-partition data in shuffle
+ edges.
+
+
+
+ hive.tez.min.partition.factor
+ 0.25f
+ When auto reducer parallelism is enabled this factor will be used to put a lower limit to the number
+ of reducers that tez specifies.
+
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index f27bde8..8c76c17 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -40,7 +40,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
@@ -94,9 +93,10 @@
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
-import org.apache.tez.dag.api.VertexGroup;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -114,6 +114,7 @@
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
/**
* DagUtils. DagUtils is a collection of helper methods to convert
@@ -210,6 +211,7 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
* @param edgeProp the edge property of connection between the two
* endpoints.
*/
+ @SuppressWarnings("rawtypes")
public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
Vertex w, TezEdgeProperty edgeProp)
throws IOException {
@@ -221,27 +223,31 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
EdgeType edgeType = edgeProp.getEdgeType();
switch (edgeType) {
- case BROADCAST_EDGE:
- mergeInputClass = ConcatenatedMergedKeyValueInput.class;
- break;
- case CUSTOM_EDGE:
- mergeInputClass = ConcatenatedMergedKeyValueInput.class;
- int numBuckets = edgeProp.getNumBuckets();
- VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor(
- CustomPartitionVertex.class.getName());
- byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
- desc.setUserPayload(userPayload);
- w.setVertexManagerPlugin(desc);
- break;
-
- case CUSTOM_SIMPLE_EDGE:
- mergeInputClass = ConcatenatedMergedKeyValueInput.class;
- break;
-
- case SIMPLE_EDGE:
- default:
- mergeInputClass = TezMergedLogicalInput.class;
- break;
+ case BROADCAST_EDGE:
+ mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+ break;
+ case CUSTOM_EDGE: {
+ mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+ int numBuckets = edgeProp.getNumBuckets();
+ VertexManagerPluginDescriptor desc =
+ new VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName());
+ byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
+ desc.setUserPayload(userPayload);
+ w.setVertexManagerPlugin(desc);
+ break;
+ }
+
+ case CUSTOM_SIMPLE_EDGE:
+ mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+ break;
+
+ case SIMPLE_EDGE:
+ setupAutoReducerParallelism(edgeProp, w);
+ // fall through
+
+ default:
+ mergeInputClass = TezMergedLogicalInput.class;
+ break;
}
return new GroupInputEdge(group, w, createEdgeProperty(edgeProp),
@@ -278,13 +284,22 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
updateConfigurationForEdge(vConf, v, wConf, w);
- if (edgeProp.getEdgeType() == EdgeType.CUSTOM_EDGE) {
+ switch(edgeProp.getEdgeType()) {
+ case CUSTOM_EDGE: {
int numBuckets = edgeProp.getNumBuckets();
byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor(
CustomPartitionVertex.class.getName());
desc.setUserPayload(userPayload);
w.setVertexManagerPlugin(desc);
+ break;
+ }
+ case SIMPLE_EDGE: {
+ setupAutoReducerParallelism(edgeProp, w);
+ break;
+ }
+ default:
+ // nothing
}
return new Edge(v, w, createEdgeProperty(edgeProp));
@@ -293,6 +308,7 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
/*
* Helper function to create an edge property from an edge type.
*/
+ @SuppressWarnings("rawtypes")
private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOException {
DataMovementType dataMovementType;
Class logicalInputClass;
@@ -301,45 +317,44 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOExcep
EdgeProperty edgeProperty = null;
EdgeType edgeType = edgeProp.getEdgeType();
switch (edgeType) {
- case BROADCAST_EDGE:
- dataMovementType = DataMovementType.BROADCAST;
- logicalOutputClass = OnFileUnorderedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- break;
-
- case CUSTOM_EDGE:
-
- dataMovementType = DataMovementType.CUSTOM;
- logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
- CustomPartitionEdge.class.getName());
- CustomEdgeConfiguration edgeConf =
- new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
- DataOutputBuffer dob = new DataOutputBuffer();
- edgeConf.write(dob);
- byte[] userPayload = dob.getData();
- edgeDesc.setUserPayload(userPayload);
- edgeProperty =
+ case BROADCAST_EDGE:
+ dataMovementType = DataMovementType.BROADCAST;
+ logicalOutputClass = OnFileUnorderedKVOutput.class;
+ logicalInputClass = ShuffledUnorderedKVInput.class;
+ break;
+
+ case CUSTOM_EDGE:
+ dataMovementType = DataMovementType.CUSTOM;
+ logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
+ logicalInputClass = ShuffledUnorderedKVInput.class;
+ EdgeManagerDescriptor edgeDesc =
+ new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
+ CustomEdgeConfiguration edgeConf =
+ new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ edgeConf.write(dob);
+ byte[] userPayload = dob.getData();
+ edgeDesc.setUserPayload(userPayload);
+ edgeProperty =
new EdgeProperty(edgeDesc,
DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
new OutputDescriptor(logicalOutputClass.getName()),
new InputDescriptor(logicalInputClass.getName()));
- break;
-
- case CUSTOM_SIMPLE_EDGE:
- dataMovementType = DataMovementType.SCATTER_GATHER;
- logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- break;
-
- case SIMPLE_EDGE:
- default:
- dataMovementType = DataMovementType.SCATTER_GATHER;
- logicalOutputClass = OnFileSortedOutput.class;
- logicalInputClass = ShuffledMergedInputLegacy.class;
- break;
+ break;
+
+ case CUSTOM_SIMPLE_EDGE:
+ dataMovementType = DataMovementType.SCATTER_GATHER;
+ logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
+ logicalInputClass = ShuffledUnorderedKVInput.class;
+ break;
+
+ case SIMPLE_EDGE:
+ default:
+ dataMovementType = DataMovementType.SCATTER_GATHER;
+ logicalOutputClass = OnFileSortedOutput.class;
+ logicalInputClass = ShuffledMergedInputLegacy.class;
+ break;
}
if (edgeProperty == null) {
@@ -360,7 +375,6 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOExcep
* container size isn't set.
*/
private Resource getContainerResource(Configuration conf) {
- Resource containerResource;
int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ?
HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) :
conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB);
@@ -414,7 +428,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
boolean useTezGroupedSplits = false;
int numTasks = -1;
- Class amSplitGeneratorClass = null;
+ Class amSplitGeneratorClass = null;
InputSplitInfo inputSplitInfo = null;
Class inputFormatClass = conf.getClass("mapred.input.format.class",
InputFormat.class);
@@ -533,7 +547,8 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork,
Vertex reducer = new Vertex(reduceWork.getName(),
new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
- reduceWork.getNumReduceTasks(), getContainerResource(conf));
+ reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork
+ .getNumReduceTasks(), getContainerResource(conf));
Map environment = new HashMap();
@@ -812,7 +827,7 @@ public LocalResource localizeResource(Path src, Path dest, Configuration conf)
for (int i = 0; i < waitAttempts; i++) {
if (!checkPreExisting(src, dest, conf)) {
try {
- Thread.currentThread().sleep(sleepInterval);
+ Thread.sleep(sleepInterval);
} catch (InterruptedException interruptedException) {
throw new IOException(interruptedException);
}
@@ -1001,6 +1016,25 @@ public static DagUtils getInstance() {
return instance;
}
+ private void setupAutoReducerParallelism(TezEdgeProperty edgeProp, Vertex v)
+ throws IOException {
+ if (edgeProp.isAutoReduce()) {
+ Configuration pluginConf = new Configuration(false);
+ VertexManagerPluginDescriptor desc =
+ new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName());
+ pluginConf.setBoolean(
+ ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+ pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
+ edgeProp.getMinReducer());
+ pluginConf.setLong(
+ ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+ edgeProp.getInputSizePerReducer());
+ ByteString payload = MRHelpers.createByteStringFromConf(pluginConf);
+ desc.setUserPayload(payload.toByteArray());
+ v.setVertexManagerPlugin(desc);
+ }
+ }
+
private DagUtils() {
// don't instantiate
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 169171c..ac34345 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -43,14 +43,14 @@
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
/**
* GenTezUtils is a collection of shared helper methods to produce
@@ -89,6 +89,15 @@ public UnionWork createUnionWork(GenTezProcContext context, Operator> operator
public ReduceWork createReduceWork(GenTezProcContext context, Operator> root, TezWork tezWork) {
assert !root.getParentOperators().isEmpty();
+
+ boolean isAutoReduceParallelism =
+ context.conf.getBoolVar(HiveConf.ConfVars.TEZ_AUTO_REDUCER_PARALLELISM);
+
+ float maxPartitionFactor =
+ context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MAX_PARTITION_FACTOR);
+ float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR);
+ long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+
ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
reduceWork.setReducer(root);
@@ -103,10 +112,38 @@ public ReduceWork createReduceWork(GenTezProcContext context, Operator> root,
reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
+ if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) {
+ reduceWork.setAutoReduceParallelism(true);
+
+ // configured limit for reducers
+ int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+
+ // min we allow tez to pick
+ int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers()
+ * minPartitionFactor));
+ minPartition = (minPartition > maxReducers) ? maxReducers : minPartition;
+
+ // max we allow tez to pick
+ int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor);
+ maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition;
+
+ reduceWork.setMinReduceTasks(minPartition);
+ reduceWork.setMaxReduceTasks(maxPartition);
+ }
+
setupReduceSink(context, reduceWork, reduceSink);
tezWork.add(reduceWork);
- TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+
+ TezEdgeProperty edgeProp;
+ if (reduceWork.isAutoReduceParallelism()) {
+ edgeProp =
+ new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
+ reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer);
+ } else {
+ edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ }
+
tezWork.connect(
context.preceedingWork,
reduceWork, edgeProp);
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 41aeb4c..b304fd3 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.parse;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -28,28 +27,23 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
/**
* GenTezWork separates the operator tree into tez tasks.
@@ -136,7 +130,7 @@ public Object process(Node nd, Stack stack,
if (!context.currentMapJoinOperators.isEmpty()) {
for (MapJoinOperator mj: context.currentMapJoinOperators) {
LOG.debug("Processing map join: " + mj);
- // remember the mapping in case we scan another branch of the
+ // remember the mapping in case we scan another branch of the
// mapjoin later
if (!context.mapJoinWorkMap.containsKey(mj)) {
List workItems = new LinkedList();
@@ -175,7 +169,7 @@ public Object process(Node nd, Stack stack,
LOG.debug("connecting "+parentWork.getName()+" with "+work.getName());
TezEdgeProperty edgeProp = parentWorkMap.getValue();
tezWork.connect(parentWork, work, edgeProp);
-
+
// need to set up output name for reduce sink now that we know the name
// of the downstream work
for (ReduceSinkOperator r:
@@ -206,7 +200,7 @@ public Object process(Node nd, Stack stack,
root.removeParent(parent);
}
- if (!context.currentUnionOperators.isEmpty()) {
+ if (!context.currentUnionOperators.isEmpty()) {
// if there are union all operators we need to add the work to the set
// of union operators.
@@ -249,6 +243,7 @@ public Object process(Node nd, Stack stack,
if (context.leafOperatorToFollowingWork.containsKey(operator)) {
BaseWork followingWork = context.leafOperatorToFollowingWork.get(operator);
+ long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
LOG.debug("Second pass. Leaf operator: "+operator
+" has common downstream work:"+followingWork);
@@ -268,7 +263,14 @@ public Object process(Node nd, Stack stack,
if (!context.connectedReduceSinks.contains(rs)) {
// add dependency between the two work items
- TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ TezEdgeProperty edgeProp;
+ if (rWork.isAutoReduceParallelism()) {
+ edgeProp =
+ new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
+ rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer);
+ } else {
+ edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ }
tezWork.connect(work, rWork, edgeProp);
context.connectedReduceSinks.add(rs);
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
index a68374e..0cef12b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
@@ -20,10 +20,10 @@
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,7 +42,7 @@
* distributed on the cluster. The ExecReducer will ultimately deserialize this
* class on the data nodes and setup it's operator pipeline accordingly.
*
- * This class is also used in the explain command any property with the
+ * This class is also used in the explain command any property with the
* appropriate annotation will be displayed in the explain output.
*/
@SuppressWarnings({"serial", "deprecation"})
@@ -69,12 +69,21 @@ public ReduceWork(String name) {
// desired parallelism of the reduce task.
private Integer numReduceTasks;
- // boolean to signal whether tagging will be used (e.g.: join) or
+ // boolean to signal whether tagging will be used (e.g.: join) or
// not (e.g.: group by)
private boolean needsTagging;
private Map tagToInput = new HashMap();
+ // boolean that says whether tez auto reduce parallelism should be used
+ private boolean isAutoReduceParallelism;
+
+ // for auto reduce parallelism - minimum reducers requested
+ private int minReduceTasks;
+
+ // for auto reduce parallelism - max reducers requested
+ private int maxReduceTasks;
+
/**
* If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing
* to keySerializeInfo of the ReduceSink
@@ -157,6 +166,7 @@ public void setNumReduceTasks(final Integer numReduceTasks) {
this.numReduceTasks = numReduceTasks;
}
+ @Override
public void configureJobConf(JobConf job) {
if (reducer != null) {
for (FileSinkOperator fs : OperatorUtils.findOperators(reducer, FileSinkOperator.class)) {
@@ -164,4 +174,28 @@ public void configureJobConf(JobConf job) {
}
}
}
+
+ public void setAutoReduceParallelism(boolean isAutoReduceParallelism) {
+ this.isAutoReduceParallelism = isAutoReduceParallelism;
+ }
+
+ public boolean isAutoReduceParallelism() {
+ return isAutoReduceParallelism;
+ }
+
+ public void setMinReduceTasks(int minReduceTasks) {
+ this.minReduceTasks = minReduceTasks;
+ }
+
+ public int getMinReduceTasks() {
+ return minReduceTasks;
+ }
+
+ public int getMaxReduceTasks() {
+ return maxReduceTasks;
+ }
+
+ public void setMaxReduceTasks(int maxReduceTasks) {
+ this.maxReduceTasks = maxReduceTasks;
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
index 96adb84..d17ca8c 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
@@ -1,16 +1,12 @@
package org.apache.hadoop.hive.ql.plan;
-import java.util.List;
-import java.util.Map;
-
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
public class TezEdgeProperty {
-
+
public enum EdgeType {
SIMPLE_EDGE,
- BROADCAST_EDGE,
+ BROADCAST_EDGE,
CONTAINS,
CUSTOM_EDGE,
CUSTOM_SIMPLE_EDGE,
@@ -20,13 +16,27 @@
private EdgeType edgeType;
private int numBuckets;
- public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType,
+ private boolean isAutoReduce;
+ private int minReducer;
+ private int maxReducer;
+ private long inputSizePerReducer;
+
+ public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType,
int buckets) {
this.hiveConf = hiveConf;
this.edgeType = edgeType;
this.numBuckets = buckets;
}
+ public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce,
+ int minReducer, int maxReducer, long bytesPerReducer) {
+ this(hiveConf, edgeType, -1);
+ this.minReducer = minReducer;
+ this.maxReducer = maxReducer;
+ this.isAutoReduce = isAutoReduce;
+ this.inputSizePerReducer = bytesPerReducer;
+ }
+
public TezEdgeProperty(EdgeType edgeType) {
this(null, edgeType, -1);
}
@@ -42,4 +52,20 @@ public HiveConf getHiveConf () {
public int getNumBuckets() {
return numBuckets;
}
+
+ public boolean isAutoReduce() {
+ return isAutoReduce;
+ }
+
+ public int getMinReducer() {
+ return minReducer;
+ }
+
+ public int getMaxReducer() {
+ return maxReducer;
+ }
+
+ public long getInputSizePerReducer() {
+ return inputSizePerReducer;
+ }
}