Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1601544) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -190,8 +190,8 @@ COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false), COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""), COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""), - BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 1000)), - MAXREDUCERS("hive.exec.reducers.max", 999), + BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (256 * 1000 * 1000)), + MAXREDUCERS("hive.exec.reducers.max", 1009), // pick a prime PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), ONFAILUREHOOKS("hive.exec.failure.hooks", ""), @@ -1028,6 +1028,9 @@ HIVE_CHECK_CROSS_PRODUCT("hive.exec.check.crossproducts", true), HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL("hive.localize.resource.wait.interval", 5000L), // in ms HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS("hive.localize.resource.num.wait.attempts", 5), + TEZ_AUTO_REDUCER_PARALLELISM("hive.tez.auto.reducer.parallelism", false), + TEZ_MAX_PARTITION_FACTOR("hive.tez.max.partition.factor", 2f), + TEZ_MIN_PARTITION_FACTOR("hive.tez.min.partition.factor", 0.25f) ; public final String varname; Index: conf/hive-default.xml.template =================================================================== --- conf/hive-default.xml.template (revision 1601544) +++ conf/hive-default.xml.template (working copy) @@ -2779,4 +2779,22 @@ joins unnecessary memory will be allocated and then trimmed. + + hive.tez.auto.reducer.parallelism + false + Turn on Tez' auto reducer parallelism feature. When enabled Hive will still estimate data sizes and set parallelism, but tez will sample all vertices output and adjust the estimates as necessary + + + + hive.tez.max.partition.factor + 2f + When auto reducer parallelism is enabled this factor will be used to over partition data in shuffle edges. + + + + hive.tez.min.partition.factor + 0.25f + When auto reducer parallelism is enabled this factor will be used put a lower limit to the number of reducers that tez specifies.. + + Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (revision 1601544) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (working copy) @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; @@ -94,9 +93,10 @@ import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; -import org.apache.tez.dag.api.VertexGroup; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -114,6 +114,7 @@ import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; /** * DagUtils. DagUtils is a collection of helper methods to convert @@ -210,6 +211,7 @@ * @param edgeProp the edge property of connection between the two * endpoints. */ + @SuppressWarnings("rawtypes") public GroupInputEdge createEdge(VertexGroup group, JobConf wConf, Vertex w, TezEdgeProperty edgeProp) throws IOException { @@ -221,27 +223,31 @@ EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { - case BROADCAST_EDGE: - mergeInputClass = ConcatenatedMergedKeyValueInput.class; - break; - case CUSTOM_EDGE: - mergeInputClass = ConcatenatedMergedKeyValueInput.class; - int numBuckets = edgeProp.getNumBuckets(); - VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor( - CustomPartitionVertex.class.getName()); - byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array(); - desc.setUserPayload(userPayload); - w.setVertexManagerPlugin(desc); - break; + case BROADCAST_EDGE: + mergeInputClass = ConcatenatedMergedKeyValueInput.class; + break; + case CUSTOM_EDGE: { + mergeInputClass = ConcatenatedMergedKeyValueInput.class; + int numBuckets = edgeProp.getNumBuckets(); + VertexManagerPluginDescriptor desc = + new VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName()); + byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array(); + desc.setUserPayload(userPayload); + w.setVertexManagerPlugin(desc); + break; + } - case CUSTOM_SIMPLE_EDGE: - mergeInputClass = ConcatenatedMergedKeyValueInput.class; - break; + case CUSTOM_SIMPLE_EDGE: + mergeInputClass = ConcatenatedMergedKeyValueInput.class; + break; - case SIMPLE_EDGE: - default: - mergeInputClass = TezMergedLogicalInput.class; - break; + case SIMPLE_EDGE: + setupAutoReducerParallelism(edgeProp, w); + // fall through + + default: + mergeInputClass = TezMergedLogicalInput.class; + break; } return new GroupInputEdge(group, w, createEdgeProperty(edgeProp), @@ -278,7 +284,8 @@ updateConfigurationForEdge(vConf, v, wConf, w); - if (edgeProp.getEdgeType() == EdgeType.CUSTOM_EDGE) { + switch(edgeProp.getEdgeType()) { + case CUSTOM_EDGE: { int numBuckets = edgeProp.getNumBuckets(); byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array(); VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor( @@ -285,7 +292,15 @@ CustomPartitionVertex.class.getName()); desc.setUserPayload(userPayload); w.setVertexManagerPlugin(desc); + break; } + case SIMPLE_EDGE: { + setupAutoReducerParallelism(edgeProp, w); + break; + } + default: + // nothing + } return new Edge(v, w, createEdgeProperty(edgeProp)); } @@ -293,6 +308,7 @@ /* * Helper function to create an edge property from an edge type. */ + @SuppressWarnings("rawtypes") private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOException { DataMovementType dataMovementType; Class logicalInputClass; @@ -301,45 +317,44 @@ EdgeProperty edgeProperty = null; EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { - case BROADCAST_EDGE: - dataMovementType = DataMovementType.BROADCAST; - logicalOutputClass = OnFileUnorderedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; - break; + case BROADCAST_EDGE: + dataMovementType = DataMovementType.BROADCAST; + logicalOutputClass = OnFileUnorderedKVOutput.class; + logicalInputClass = ShuffledUnorderedKVInput.class; + break; - case CUSTOM_EDGE: - - dataMovementType = DataMovementType.CUSTOM; - logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; - EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor( - CustomPartitionEdge.class.getName()); - CustomEdgeConfiguration edgeConf = - new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); - DataOutputBuffer dob = new DataOutputBuffer(); - edgeConf.write(dob); - byte[] userPayload = dob.getData(); - edgeDesc.setUserPayload(userPayload); - edgeProperty = + case CUSTOM_EDGE: + dataMovementType = DataMovementType.CUSTOM; + logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; + logicalInputClass = ShuffledUnorderedKVInput.class; + EdgeManagerDescriptor edgeDesc = + new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); + CustomEdgeConfiguration edgeConf = + new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); + DataOutputBuffer dob = new DataOutputBuffer(); + edgeConf.write(dob); + byte[] userPayload = dob.getData(); + edgeDesc.setUserPayload(userPayload); + edgeProperty = new EdgeProperty(edgeDesc, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(logicalOutputClass.getName()), new InputDescriptor(logicalInputClass.getName())); - break; + break; - case CUSTOM_SIMPLE_EDGE: - dataMovementType = DataMovementType.SCATTER_GATHER; - logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; - break; + case CUSTOM_SIMPLE_EDGE: + dataMovementType = DataMovementType.SCATTER_GATHER; + logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; + logicalInputClass = ShuffledUnorderedKVInput.class; + break; - case SIMPLE_EDGE: - default: - dataMovementType = DataMovementType.SCATTER_GATHER; - logicalOutputClass = OnFileSortedOutput.class; - logicalInputClass = ShuffledMergedInputLegacy.class; - break; + case SIMPLE_EDGE: + default: + dataMovementType = DataMovementType.SCATTER_GATHER; + logicalOutputClass = OnFileSortedOutput.class; + logicalInputClass = ShuffledMergedInputLegacy.class; + break; } if (edgeProperty == null) { @@ -360,7 +375,6 @@ * container size isn't set. */ private Resource getContainerResource(Configuration conf) { - Resource containerResource; int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); @@ -414,7 +428,7 @@ boolean useTezGroupedSplits = false; int numTasks = -1; - Class amSplitGeneratorClass = null; + Class amSplitGeneratorClass = null; InputSplitInfo inputSplitInfo = null; Class inputFormatClass = conf.getClass("mapred.input.format.class", InputFormat.class); @@ -533,7 +547,8 @@ Vertex reducer = new Vertex(reduceWork.getName(), new ProcessorDescriptor(ReduceTezProcessor.class.getName()). setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), - reduceWork.getNumReduceTasks(), getContainerResource(conf)); + reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork + .getNumReduceTasks(), getContainerResource(conf)); Map environment = new HashMap(); @@ -812,7 +827,7 @@ for (int i = 0; i < waitAttempts; i++) { if (!checkPreExisting(src, dest, conf)) { try { - Thread.currentThread().sleep(sleepInterval); + Thread.sleep(sleepInterval); } catch (InterruptedException interruptedException) { throw new IOException(interruptedException); } @@ -1001,6 +1016,25 @@ return instance; } + private void setupAutoReducerParallelism(TezEdgeProperty edgeProp, Vertex v) + throws IOException { + if (edgeProp.isAutoReduce()) { + Configuration pluginConf = new Configuration(false); + VertexManagerPluginDescriptor desc = + new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName()); + pluginConf.setBoolean( + ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); + pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM, + edgeProp.getMinReducer()); + pluginConf.setLong( + ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, + edgeProp.getInputSizePerReducer()); + ByteString payload = MRHelpers.createByteStringFromConf(pluginConf); + desc.setUserPayload(payload.toByteArray()); + v.setVertexManagerPlugin(desc); + } + } + private DagUtils() { // don't instantiate } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (revision 1601544) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (working copy) @@ -43,14 +43,14 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.UnionWork; -import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; /** * GenTezUtils is a collection of shared helper methods to produce @@ -89,6 +89,15 @@ public ReduceWork createReduceWork(GenTezProcContext context, Operator root, TezWork tezWork) { assert !root.getParentOperators().isEmpty(); + + boolean isAutoReduceParallelism = + context.conf.getBoolVar(HiveConf.ConfVars.TEZ_AUTO_REDUCER_PARALLELISM); + + float maxPartitionFactor = + context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MAX_PARTITION_FACTOR); + float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR); + long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); + ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber)); LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root); reduceWork.setReducer(root); @@ -103,10 +112,37 @@ reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); + if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) { + reduceWork.setAutoReduceParallelism(true); + + // configured limit for reducers + int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); + + // min we allow tez to pick + int minPartition = (int) (reduceSink.getConf().getNumReducers() * minPartitionFactor); + minPartition = (minPartition > maxReducers) ? maxReducers : minPartition; + + // max we allow tez to pick + int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); + maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition; + + reduceWork.setMinReduceTasks(minPartition); + reduceWork.setMaxReduceTasks(maxPartition); + } + setupReduceSink(context, reduceWork, reduceSink); tezWork.add(reduceWork); - TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + + TezEdgeProperty edgeProp; + if (reduceWork.isAutoReduceParallelism()) { + edgeProp = + new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, + reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer); + } else { + edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + } + tezWork.connect( context.preceedingWork, reduceWork, edgeProp); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (revision 1601544) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (working copy) @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.parse; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -28,28 +27,23 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.UnionOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.UnionWork; -import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; /** * GenTezWork separates the operator tree into tez tasks. @@ -136,7 +130,7 @@ if (!context.currentMapJoinOperators.isEmpty()) { for (MapJoinOperator mj: context.currentMapJoinOperators) { LOG.debug("Processing map join: " + mj); - // remember the mapping in case we scan another branch of the + // remember the mapping in case we scan another branch of the // mapjoin later if (!context.mapJoinWorkMap.containsKey(mj)) { List workItems = new LinkedList(); @@ -175,7 +169,7 @@ LOG.debug("connecting "+parentWork.getName()+" with "+work.getName()); TezEdgeProperty edgeProp = parentWorkMap.getValue(); tezWork.connect(parentWork, work, edgeProp); - + // need to set up output name for reduce sink now that we know the name // of the downstream work for (ReduceSinkOperator r: @@ -206,7 +200,7 @@ root.removeParent(parent); } - if (!context.currentUnionOperators.isEmpty()) { + if (!context.currentUnionOperators.isEmpty()) { // if there are union all operators we need to add the work to the set // of union operators. @@ -249,6 +243,7 @@ if (context.leafOperatorToFollowingWork.containsKey(operator)) { BaseWork followingWork = context.leafOperatorToFollowingWork.get(operator); + long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); LOG.debug("Second pass. Leaf operator: "+operator +" has common downstream work:"+followingWork); @@ -268,7 +263,14 @@ if (!context.connectedReduceSinks.contains(rs)) { // add dependency between the two work items - TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + TezEdgeProperty edgeProp; + if (rWork.isAutoReduceParallelism()) { + edgeProp = + new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, + rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + } else { + edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + } tezWork.connect(work, rWork, edgeProp); context.connectedReduceSinks.add(rs); } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java (revision 1601544) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java (working copy) @@ -20,10 +20,10 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.LinkedHashSet; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,7 +42,7 @@ * distributed on the cluster. The ExecReducer will ultimately deserialize this * class on the data nodes and setup it's operator pipeline accordingly. * - * This class is also used in the explain command any property with the + * This class is also used in the explain command any property with the * appropriate annotation will be displayed in the explain output. */ @SuppressWarnings({"serial", "deprecation"}) @@ -69,12 +69,21 @@ // desired parallelism of the reduce task. private Integer numReduceTasks; - // boolean to signal whether tagging will be used (e.g.: join) or + // boolean to signal whether tagging will be used (e.g.: join) or // not (e.g.: group by) private boolean needsTagging; private Map tagToInput = new HashMap(); + // boolean that says whether tez auto reduce parallelism should be used + private boolean isAutoReduceParallelism; + + // for auto reduce parallelism - minimum reducers requested + private int minReduceTasks; + + // for auto reduce parallelism - max reducers requested + private int maxReduceTasks; + /** * If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing * to keySerializeInfo of the ReduceSink @@ -157,6 +166,7 @@ this.numReduceTasks = numReduceTasks; } + @Override public void configureJobConf(JobConf job) { if (reducer != null) { for (FileSinkOperator fs : OperatorUtils.findOperators(reducer, FileSinkOperator.class)) { @@ -164,4 +174,28 @@ } } } + + public void setAutoReduceParallelism(boolean isAutoReduceParallelism) { + this.isAutoReduceParallelism = isAutoReduceParallelism; + } + + public boolean isAutoReduceParallelism() { + return isAutoReduceParallelism; + } + + public void setMinReduceTasks(int minReduceTasks) { + this.minReduceTasks = minReduceTasks; + } + + public int getMinReduceTasks() { + return minReduceTasks; + } + + public int getMaxReduceTasks() { + return maxReduceTasks; + } + + public void setMaxReduceTasks(int maxReduceTasks) { + this.maxReduceTasks = maxReduceTasks; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java (revision 1601544) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java (working copy) @@ -1,16 +1,12 @@ package org.apache.hadoop.hive.ql.plan; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; public class TezEdgeProperty { - + public enum EdgeType { SIMPLE_EDGE, - BROADCAST_EDGE, + BROADCAST_EDGE, CONTAINS, CUSTOM_EDGE, CUSTOM_SIMPLE_EDGE, @@ -20,7 +16,12 @@ private EdgeType edgeType; private int numBuckets; - public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, + private boolean isAutoReduce; + private int minReducer; + private int maxReducer; + private long inputSizePerReducer; + + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, int buckets) { this.hiveConf = hiveConf; this.edgeType = edgeType; @@ -27,6 +28,15 @@ this.numBuckets = buckets; } + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce, + int minReducer, int maxReducer, long bytesPerReducer) { + this(hiveConf, edgeType, -1); + this.minReducer = minReducer; + this.maxReducer = maxReducer; + this.isAutoReduce = isAutoReduce; + this.inputSizePerReducer = bytesPerReducer; + } + public TezEdgeProperty(EdgeType edgeType) { this(null, edgeType, -1); } @@ -42,4 +52,20 @@ public int getNumBuckets() { return numBuckets; } + + public boolean isAutoReduce() { + return isAutoReduce; + } + + public int getMinReducer() { + return minReducer; + } + + public int getMaxReducer() { + return maxReducer; + } + + public long getInputSizePerReducer() { + return inputSizePerReducer; + } }