diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8104e84..d26573e 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -190,8 +190,8 @@ COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false), COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""), COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""), - BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 1000)), - MAXREDUCERS("hive.exec.reducers.max", 999), + BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (256 * 1000 * 1000)), + MAXREDUCERS("hive.exec.reducers.max", 1009), // pick a prime PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), ONFAILUREHOOKS("hive.exec.failure.hooks", ""), @@ -1028,6 +1028,9 @@ HIVE_CHECK_CROSS_PRODUCT("hive.exec.check.crossproducts", true), HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL("hive.localize.resource.wait.interval", 5000L), // in ms HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS("hive.localize.resource.num.wait.attempts", 5), + TEZ_AUTO_REDUCER_PARALLELISM("hive.tez.auto.reducer.parallelism", false), + TEZ_MAX_PARTITION_FACTOR("hive.tez.max.partition.factor", 2f), + TEZ_MIN_PARTITION_FACTOR("hive.tez.min.partition.factor", 0.25f) ; public final String varname; diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 49a099b..8a74e4e 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -2779,4 +2779,26 @@ joins unnecessary memory will be allocated and then trimmed. + + hive.tez.auto.reducer.parallelism + false + Turn on Tez' auto reducer parallelism feature. When enabled, Hive will still estimate data sizes + and set parallelism estimates. Tez will sample source vertices' output sizes and adjust the estimates at runtime as + necessary. + + + + hive.tez.max.partition.factor + 2f + When auto reducer parallelism is enabled this factor will be used to over-partition data in shuffle + edges. + + + + hive.tez.min.partition.factor + 0.25f + When auto reducer parallelism is enabled this factor will be used to put a lower limit to the number + of reducers that tez specifies. + + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index f27bde8..8c76c17 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; @@ -94,9 +93,10 @@ import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; -import org.apache.tez.dag.api.VertexGroup; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -114,6 +114,7 @@ import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; /** * DagUtils. DagUtils is a collection of helper methods to convert @@ -210,6 +211,7 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { * @param edgeProp the edge property of connection between the two * endpoints. */ + @SuppressWarnings("rawtypes") public GroupInputEdge createEdge(VertexGroup group, JobConf wConf, Vertex w, TezEdgeProperty edgeProp) throws IOException { @@ -221,27 +223,31 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf wConf, EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { - case BROADCAST_EDGE: - mergeInputClass = ConcatenatedMergedKeyValueInput.class; - break; - case CUSTOM_EDGE: - mergeInputClass = ConcatenatedMergedKeyValueInput.class; - int numBuckets = edgeProp.getNumBuckets(); - VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor( - CustomPartitionVertex.class.getName()); - byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array(); - desc.setUserPayload(userPayload); - w.setVertexManagerPlugin(desc); - break; - - case CUSTOM_SIMPLE_EDGE: - mergeInputClass = ConcatenatedMergedKeyValueInput.class; - break; - - case SIMPLE_EDGE: - default: - mergeInputClass = TezMergedLogicalInput.class; - break; + case BROADCAST_EDGE: + mergeInputClass = ConcatenatedMergedKeyValueInput.class; + break; + case CUSTOM_EDGE: { + mergeInputClass = ConcatenatedMergedKeyValueInput.class; + int numBuckets = edgeProp.getNumBuckets(); + VertexManagerPluginDescriptor desc = + new VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName()); + byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array(); + desc.setUserPayload(userPayload); + w.setVertexManagerPlugin(desc); + break; + } + + case CUSTOM_SIMPLE_EDGE: + mergeInputClass = ConcatenatedMergedKeyValueInput.class; + break; + + case SIMPLE_EDGE: + setupAutoReducerParallelism(edgeProp, w); + // fall through + + default: + mergeInputClass = TezMergedLogicalInput.class; + break; } return new GroupInputEdge(group, w, createEdgeProperty(edgeProp), @@ -278,13 +284,22 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, updateConfigurationForEdge(vConf, v, wConf, w); - if (edgeProp.getEdgeType() == EdgeType.CUSTOM_EDGE) { + switch(edgeProp.getEdgeType()) { + case CUSTOM_EDGE: { int numBuckets = edgeProp.getNumBuckets(); byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array(); VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor( CustomPartitionVertex.class.getName()); desc.setUserPayload(userPayload); w.setVertexManagerPlugin(desc); + break; + } + case SIMPLE_EDGE: { + setupAutoReducerParallelism(edgeProp, w); + break; + } + default: + // nothing } return new Edge(v, w, createEdgeProperty(edgeProp)); @@ -293,6 +308,7 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, /* * Helper function to create an edge property from an edge type. */ + @SuppressWarnings("rawtypes") private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOException { DataMovementType dataMovementType; Class logicalInputClass; @@ -301,45 +317,44 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOExcep EdgeProperty edgeProperty = null; EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { - case BROADCAST_EDGE: - dataMovementType = DataMovementType.BROADCAST; - logicalOutputClass = OnFileUnorderedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; - break; - - case CUSTOM_EDGE: - - dataMovementType = DataMovementType.CUSTOM; - logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; - EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor( - CustomPartitionEdge.class.getName()); - CustomEdgeConfiguration edgeConf = - new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); - DataOutputBuffer dob = new DataOutputBuffer(); - edgeConf.write(dob); - byte[] userPayload = dob.getData(); - edgeDesc.setUserPayload(userPayload); - edgeProperty = + case BROADCAST_EDGE: + dataMovementType = DataMovementType.BROADCAST; + logicalOutputClass = OnFileUnorderedKVOutput.class; + logicalInputClass = ShuffledUnorderedKVInput.class; + break; + + case CUSTOM_EDGE: + dataMovementType = DataMovementType.CUSTOM; + logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; + logicalInputClass = ShuffledUnorderedKVInput.class; + EdgeManagerDescriptor edgeDesc = + new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); + CustomEdgeConfiguration edgeConf = + new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); + DataOutputBuffer dob = new DataOutputBuffer(); + edgeConf.write(dob); + byte[] userPayload = dob.getData(); + edgeDesc.setUserPayload(userPayload); + edgeProperty = new EdgeProperty(edgeDesc, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(logicalOutputClass.getName()), new InputDescriptor(logicalInputClass.getName())); - break; - - case CUSTOM_SIMPLE_EDGE: - dataMovementType = DataMovementType.SCATTER_GATHER; - logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; - break; - - case SIMPLE_EDGE: - default: - dataMovementType = DataMovementType.SCATTER_GATHER; - logicalOutputClass = OnFileSortedOutput.class; - logicalInputClass = ShuffledMergedInputLegacy.class; - break; + break; + + case CUSTOM_SIMPLE_EDGE: + dataMovementType = DataMovementType.SCATTER_GATHER; + logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; + logicalInputClass = ShuffledUnorderedKVInput.class; + break; + + case SIMPLE_EDGE: + default: + dataMovementType = DataMovementType.SCATTER_GATHER; + logicalOutputClass = OnFileSortedOutput.class; + logicalInputClass = ShuffledMergedInputLegacy.class; + break; } if (edgeProperty == null) { @@ -360,7 +375,6 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOExcep * container size isn't set. */ private Resource getContainerResource(Configuration conf) { - Resource containerResource; int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); @@ -414,7 +428,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, boolean useTezGroupedSplits = false; int numTasks = -1; - Class amSplitGeneratorClass = null; + Class amSplitGeneratorClass = null; InputSplitInfo inputSplitInfo = null; Class inputFormatClass = conf.getClass("mapred.input.format.class", InputFormat.class); @@ -533,7 +547,8 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork, Vertex reducer = new Vertex(reduceWork.getName(), new ProcessorDescriptor(ReduceTezProcessor.class.getName()). setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), - reduceWork.getNumReduceTasks(), getContainerResource(conf)); + reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork + .getNumReduceTasks(), getContainerResource(conf)); Map environment = new HashMap(); @@ -812,7 +827,7 @@ public LocalResource localizeResource(Path src, Path dest, Configuration conf) for (int i = 0; i < waitAttempts; i++) { if (!checkPreExisting(src, dest, conf)) { try { - Thread.currentThread().sleep(sleepInterval); + Thread.sleep(sleepInterval); } catch (InterruptedException interruptedException) { throw new IOException(interruptedException); } @@ -1001,6 +1016,25 @@ public static DagUtils getInstance() { return instance; } + private void setupAutoReducerParallelism(TezEdgeProperty edgeProp, Vertex v) + throws IOException { + if (edgeProp.isAutoReduce()) { + Configuration pluginConf = new Configuration(false); + VertexManagerPluginDescriptor desc = + new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName()); + pluginConf.setBoolean( + ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); + pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM, + edgeProp.getMinReducer()); + pluginConf.setLong( + ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, + edgeProp.getInputSizePerReducer()); + ByteString payload = MRHelpers.createByteStringFromConf(pluginConf); + desc.setUserPayload(payload.toByteArray()); + v.setVertexManagerPlugin(desc); + } + } + private DagUtils() { // don't instantiate } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 169171c..ac34345 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -43,14 +43,14 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.UnionWork; -import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; /** * GenTezUtils is a collection of shared helper methods to produce @@ -89,6 +89,15 @@ public UnionWork createUnionWork(GenTezProcContext context, Operator operator public ReduceWork createReduceWork(GenTezProcContext context, Operator root, TezWork tezWork) { assert !root.getParentOperators().isEmpty(); + + boolean isAutoReduceParallelism = + context.conf.getBoolVar(HiveConf.ConfVars.TEZ_AUTO_REDUCER_PARALLELISM); + + float maxPartitionFactor = + context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MAX_PARTITION_FACTOR); + float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR); + long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); + ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber)); LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root); reduceWork.setReducer(root); @@ -103,10 +112,38 @@ public ReduceWork createReduceWork(GenTezProcContext context, Operator root, reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); + if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) { + reduceWork.setAutoReduceParallelism(true); + + // configured limit for reducers + int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); + + // min we allow tez to pick + int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers() + * minPartitionFactor)); + minPartition = (minPartition > maxReducers) ? maxReducers : minPartition; + + // max we allow tez to pick + int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); + maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition; + + reduceWork.setMinReduceTasks(minPartition); + reduceWork.setMaxReduceTasks(maxPartition); + } + setupReduceSink(context, reduceWork, reduceSink); tezWork.add(reduceWork); - TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + + TezEdgeProperty edgeProp; + if (reduceWork.isAutoReduceParallelism()) { + edgeProp = + new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, + reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer); + } else { + edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + } + tezWork.connect( context.preceedingWork, reduceWork, edgeProp); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 41aeb4c..b304fd3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.parse; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -28,28 +27,23 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.UnionOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.UnionWork; -import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; /** * GenTezWork separates the operator tree into tez tasks. @@ -136,7 +130,7 @@ public Object process(Node nd, Stack stack, if (!context.currentMapJoinOperators.isEmpty()) { for (MapJoinOperator mj: context.currentMapJoinOperators) { LOG.debug("Processing map join: " + mj); - // remember the mapping in case we scan another branch of the + // remember the mapping in case we scan another branch of the // mapjoin later if (!context.mapJoinWorkMap.containsKey(mj)) { List workItems = new LinkedList(); @@ -175,7 +169,7 @@ public Object process(Node nd, Stack stack, LOG.debug("connecting "+parentWork.getName()+" with "+work.getName()); TezEdgeProperty edgeProp = parentWorkMap.getValue(); tezWork.connect(parentWork, work, edgeProp); - + // need to set up output name for reduce sink now that we know the name // of the downstream work for (ReduceSinkOperator r: @@ -206,7 +200,7 @@ public Object process(Node nd, Stack stack, root.removeParent(parent); } - if (!context.currentUnionOperators.isEmpty()) { + if (!context.currentUnionOperators.isEmpty()) { // if there are union all operators we need to add the work to the set // of union operators. @@ -249,6 +243,7 @@ public Object process(Node nd, Stack stack, if (context.leafOperatorToFollowingWork.containsKey(operator)) { BaseWork followingWork = context.leafOperatorToFollowingWork.get(operator); + long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); LOG.debug("Second pass. Leaf operator: "+operator +" has common downstream work:"+followingWork); @@ -268,7 +263,14 @@ public Object process(Node nd, Stack stack, if (!context.connectedReduceSinks.contains(rs)) { // add dependency between the two work items - TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + TezEdgeProperty edgeProp; + if (rWork.isAutoReduceParallelism()) { + edgeProp = + new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, + rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + } else { + edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + } tezWork.connect(work, rWork, edgeProp); context.connectedReduceSinks.add(rs); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index a68374e..0cef12b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -20,10 +20,10 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.List; -import java.util.Set; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,7 +42,7 @@ * distributed on the cluster. The ExecReducer will ultimately deserialize this * class on the data nodes and setup it's operator pipeline accordingly. * - * This class is also used in the explain command any property with the + * This class is also used in the explain command any property with the * appropriate annotation will be displayed in the explain output. */ @SuppressWarnings({"serial", "deprecation"}) @@ -69,12 +69,21 @@ public ReduceWork(String name) { // desired parallelism of the reduce task. private Integer numReduceTasks; - // boolean to signal whether tagging will be used (e.g.: join) or + // boolean to signal whether tagging will be used (e.g.: join) or // not (e.g.: group by) private boolean needsTagging; private Map tagToInput = new HashMap(); + // boolean that says whether tez auto reduce parallelism should be used + private boolean isAutoReduceParallelism; + + // for auto reduce parallelism - minimum reducers requested + private int minReduceTasks; + + // for auto reduce parallelism - max reducers requested + private int maxReduceTasks; + /** * If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing * to keySerializeInfo of the ReduceSink @@ -157,6 +166,7 @@ public void setNumReduceTasks(final Integer numReduceTasks) { this.numReduceTasks = numReduceTasks; } + @Override public void configureJobConf(JobConf job) { if (reducer != null) { for (FileSinkOperator fs : OperatorUtils.findOperators(reducer, FileSinkOperator.class)) { @@ -164,4 +174,28 @@ public void configureJobConf(JobConf job) { } } } + + public void setAutoReduceParallelism(boolean isAutoReduceParallelism) { + this.isAutoReduceParallelism = isAutoReduceParallelism; + } + + public boolean isAutoReduceParallelism() { + return isAutoReduceParallelism; + } + + public void setMinReduceTasks(int minReduceTasks) { + this.minReduceTasks = minReduceTasks; + } + + public int getMinReduceTasks() { + return minReduceTasks; + } + + public int getMaxReduceTasks() { + return maxReduceTasks; + } + + public void setMaxReduceTasks(int maxReduceTasks) { + this.maxReduceTasks = maxReduceTasks; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index 96adb84..d17ca8c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -1,16 +1,12 @@ package org.apache.hadoop.hive.ql.plan; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; public class TezEdgeProperty { - + public enum EdgeType { SIMPLE_EDGE, - BROADCAST_EDGE, + BROADCAST_EDGE, CONTAINS, CUSTOM_EDGE, CUSTOM_SIMPLE_EDGE, @@ -20,13 +16,27 @@ private EdgeType edgeType; private int numBuckets; - public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, + private boolean isAutoReduce; + private int minReducer; + private int maxReducer; + private long inputSizePerReducer; + + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, int buckets) { this.hiveConf = hiveConf; this.edgeType = edgeType; this.numBuckets = buckets; } + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce, + int minReducer, int maxReducer, long bytesPerReducer) { + this(hiveConf, edgeType, -1); + this.minReducer = minReducer; + this.maxReducer = maxReducer; + this.isAutoReduce = isAutoReduce; + this.inputSizePerReducer = bytesPerReducer; + } + public TezEdgeProperty(EdgeType edgeType) { this(null, edgeType, -1); } @@ -42,4 +52,20 @@ public HiveConf getHiveConf () { public int getNumBuckets() { return numBuckets; } + + public boolean isAutoReduce() { + return isAutoReduce; + } + + public int getMinReducer() { + return minReducer; + } + + public int getMaxReducer() { + return maxReducer; + } + + public long getInputSizePerReducer() { + return inputSizePerReducer; + } }