Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1598829) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -190,8 +190,8 @@ COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false), COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""), COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""), - BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 1000)), - MAXREDUCERS("hive.exec.reducers.max", 999), + BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (256 * 1000 * 1000)), + MAXREDUCERS("hive.exec.reducers.max", 1009), // pick a prime PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), ONFAILUREHOOKS("hive.exec.failure.hooks", ""), @@ -1028,6 +1028,9 @@ HIVE_CHECK_CROSS_PRODUCT("hive.exec.check.crossproducts", true), HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL("hive.localize.resource.wait.interval", 5000L), // in ms HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS("hive.localize.resource.num.wait.attempts", 5), + TEZ_AUTO_REDUCER_PARALLELISM("hive.tez.auto.reducer.parallelism", false), + TEZ_MAX_PARTITION_FACTOR("hive.tez.max.partition.factor", 2f), + TEZ_MIN_PARTITION_FACTOR("hive.tez.min.partition.factor", 0.25f) ; public final String varname; Index: conf/hive-default.xml.template =================================================================== --- conf/hive-default.xml.template (revision 1598829) +++ conf/hive-default.xml.template (working copy) @@ -2755,4 +2755,22 @@ Optimized hashtable (see hive.mapjoin.optimized.hashtable) uses a chain of buffers to store data. This is one buffer size. HT may be slightly faster if this is larger, but for small joins unnecessary memory will be allocated and then trimmed. + + hive.tez.auto.reducer.parallelism + false + Turn on Tez' auto reducer parallelism feature. When enabled Hive will still estimate data sizes and set parallelism, but tez will sample all vertices output and adjust the estimates as necessary + + + + hive.tez.max.partition.factor + 2f + When auto reducer parallelism is enabled this factor will be used to over partition data in shuffle edges. + + + + hive.tez.min.partition.factor + 2f + When auto reducer parallelism is enabled this factor will be used put a lower limit to the number of reducers that tez specifies.. + + Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (revision 1598829) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (working copy) @@ -49,6 +49,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.util.hash.MurmurHash; /** * Reduce Sink Operator sends output to the reduce stage. @@ -95,6 +96,9 @@ transient protected int numDistinctExprs; transient String[] inputAliases; // input aliases of this RS for join (used for PPD) private boolean skipTag = false; + protected transient boolean autoParallel = false; + + protected static final MurmurHash hash = (MurmurHash)MurmurHash.getInstance(); public void setInputAliases(String[] inputAliases) { this.inputAliases = inputAliases; @@ -172,6 +176,8 @@ reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); } + autoParallel = conf.isAutoParallel(); + firstRow = true; initializeChildren(hconf); } catch (Exception e) { @@ -295,24 +301,30 @@ firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength); } + final int hashCode; + + if(autoParallel && partitionEval.length > 0) { + // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0] + hashCode = hash.hash(firstKey.getBytes(), distKeyLength, 0); + } else if(bucketEval != null && bucketEval.length > 0) { + hashCode = computeHashCode(row, buckNum); + } else { + hashCode = computeHashCode(row); + } + + firstKey.setHashCode(hashCode); + // Try to store the first key. If it's not excluded, we will proceed. int firstIndex = reducerHash.tryStoreKey(firstKey); if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do. // Compute value and hashcode - we'd either store or forward them. BytesWritable value = makeValueWritable(row); - int hashCode = 0; - if (bucketEval == null) { - hashCode = computeHashCode(row); - } else { - hashCode = computeHashCode(row, buckNum); - } if (firstIndex == TopNHash.FORWARD) { - firstKey.setHashCode(hashCode); collect(firstKey, value); } else { assert firstIndex >= 0; - reducerHash.storeValue(firstIndex, value, hashCode, false); + reducerHash.storeValue(firstIndex, value, false); } // All other distinct keys will just be forwarded. This could be optimized... Index: ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java (revision 1598829) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java (working copy) @@ -198,6 +198,7 @@ int index = size < topN ? size : evicted; keys[index] = Arrays.copyOf(key.getBytes(), key.getLength()); distKeyLengths[index] = key.getDistKeyLength(); + hashes[index] = key.hashCode(); Integer collisionIndex = indexes.store(index); if (null != collisionIndex) { // forward conditional on the survival of the corresponding key currently in indexes. @@ -256,6 +257,7 @@ int index = MAY_FORWARD - batchIndexToResult[batchIndex]; HiveKey hk = new HiveKey(); hk.set(keys[index], 0, keys[index].length); + hk.setHashCode(hashes[index]); hk.setDistKeyLength(distKeyLengths[index]); return hk; } @@ -270,6 +272,15 @@ } /** + * After vectorized batch is processed, can return hashCode of a key. + * @param batchIndex index of the key in the batch. + * @return The hashCode corresponding to the key. + */ + public int getVectorizedKeyHashCode(int batchIndex) { + return hashes[batchIndexToResult[batchIndex]]; + } + + /** * Stores the value for the key in the heap. * @param index The index, either from tryStoreKey or from tryStoreVectorizedKey result. * @param value The value to store. @@ -276,9 +287,8 @@ * @param keyHash The key hash to store. * @param vectorized Whether the result is coming from a vectorized batch. */ - public void storeValue(int index, BytesWritable value, int keyHash, boolean vectorized) { + public void storeValue(int index, BytesWritable value, boolean vectorized) { values[index] = Arrays.copyOf(value.getBytes(), value.getLength()); - hashes[index] = keyHash; // Vectorized doesn't adjust usage for the keys while processing the batch usage += values[index].length + (vectorized ? keys[index].length : 0); } @@ -317,6 +327,7 @@ int index = size < topN ? size : evicted; keys[index] = Arrays.copyOf(key.getBytes(), key.getLength()); distKeyLengths[index] = key.getDistKeyLength(); + hashes[index] = key.hashCode(); if (null != indexes.store(index)) { // it's only for GBY which should forward all values associated with the key in the range // of limit. new value should be attatched with the key but in current implementation, Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (revision 1598829) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (working copy) @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; @@ -94,9 +93,10 @@ import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; -import org.apache.tez.dag.api.VertexGroup; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -114,6 +114,7 @@ import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; /** * DagUtils. DagUtils is a collection of helper methods to convert @@ -210,9 +211,10 @@ * @param edgeProp the edge property of connection between the two * endpoints. */ + @SuppressWarnings("rawtypes") public GroupInputEdge createEdge(VertexGroup group, JobConf wConf, Vertex w, TezEdgeProperty edgeProp) - throws IOException { + throws IOException { Class mergeInputClass; @@ -221,27 +223,44 @@ EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { - case BROADCAST_EDGE: - mergeInputClass = ConcatenatedMergedKeyValueInput.class; - break; - case CUSTOM_EDGE: - mergeInputClass = ConcatenatedMergedKeyValueInput.class; - int numBuckets = edgeProp.getNumBuckets(); - VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor( - CustomPartitionVertex.class.getName()); - byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array(); - desc.setUserPayload(userPayload); - w.setVertexManagerPlugin(desc); - break; + case BROADCAST_EDGE: + mergeInputClass = ConcatenatedMergedKeyValueInput.class; + break; + case CUSTOM_EDGE: { + mergeInputClass = ConcatenatedMergedKeyValueInput.class; + int numBuckets = edgeProp.getNumBuckets(); + VertexManagerPluginDescriptor desc = + new VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName()); + byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array(); + desc.setUserPayload(userPayload); + w.setVertexManagerPlugin(desc); + break; + } - case CUSTOM_SIMPLE_EDGE: - mergeInputClass = ConcatenatedMergedKeyValueInput.class; - break; + case CUSTOM_SIMPLE_EDGE: + mergeInputClass = ConcatenatedMergedKeyValueInput.class; + break; - case SIMPLE_EDGE: - default: - mergeInputClass = TezMergedLogicalInput.class; - break; + case SIMPLE_EDGE: { + if (edgeProp.isAutoReduce()) { + Configuration pluginConf = new Configuration(false); + VertexManagerPluginDescriptor desc = + new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName()); + pluginConf.setBoolean( + ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); + pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM, + edgeProp.getMinReducer()); + pluginConf.setLong( + ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, + edgeProp.getInputSizePerReducer()); + ByteString payload = MRHelpers.createByteStringFromConf(pluginConf); + desc.setUserPayload(payload.toByteArray()); + w.setVertexManagerPlugin(desc); + } + } // fall through + default: + mergeInputClass = TezMergedLogicalInput.class; + break; } return new GroupInputEdge(group, w, createEdgeProperty(edgeProp), @@ -278,7 +297,8 @@ updateConfigurationForEdge(vConf, v, wConf, w); - if (edgeProp.getEdgeType() == EdgeType.CUSTOM_EDGE) { + switch(edgeProp.getEdgeType()) { + case CUSTOM_EDGE: { int numBuckets = edgeProp.getNumBuckets(); byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array(); VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor( @@ -285,7 +305,28 @@ CustomPartitionVertex.class.getName()); desc.setUserPayload(userPayload); w.setVertexManagerPlugin(desc); + break; } + case SIMPLE_EDGE: { + if (edgeProp.isAutoReduce()) { + Configuration pluginConf = new Configuration(false); + VertexManagerPluginDescriptor desc = + new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName()); + pluginConf.setBoolean(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); + pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM, + edgeProp.getMinReducer()); + pluginConf.setLong( + ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, + edgeProp.getInputSizePerReducer()); + ByteString payload = MRHelpers.createByteStringFromConf(pluginConf); + desc.setUserPayload(payload.toByteArray()); + w.setVertexManagerPlugin(desc); + } + break; + } + default: + // nothing + } return new Edge(v, w, createEdgeProperty(edgeProp)); } @@ -293,6 +334,7 @@ /* * Helper function to create an edge property from an edge type. */ + @SuppressWarnings("rawtypes") private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOException { DataMovementType dataMovementType; Class logicalInputClass; @@ -301,45 +343,44 @@ EdgeProperty edgeProperty = null; EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { - case BROADCAST_EDGE: - dataMovementType = DataMovementType.BROADCAST; - logicalOutputClass = OnFileUnorderedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; - break; + case BROADCAST_EDGE: + dataMovementType = DataMovementType.BROADCAST; + logicalOutputClass = OnFileUnorderedKVOutput.class; + logicalInputClass = ShuffledUnorderedKVInput.class; + break; - case CUSTOM_EDGE: - - dataMovementType = DataMovementType.CUSTOM; - logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; - EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor( - CustomPartitionEdge.class.getName()); - CustomEdgeConfiguration edgeConf = - new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); - DataOutputBuffer dob = new DataOutputBuffer(); - edgeConf.write(dob); - byte[] userPayload = dob.getData(); - edgeDesc.setUserPayload(userPayload); - edgeProperty = + case CUSTOM_EDGE: + dataMovementType = DataMovementType.CUSTOM; + logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; + logicalInputClass = ShuffledUnorderedKVInput.class; + EdgeManagerDescriptor edgeDesc = + new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); + CustomEdgeConfiguration edgeConf = + new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); + DataOutputBuffer dob = new DataOutputBuffer(); + edgeConf.write(dob); + byte[] userPayload = dob.getData(); + edgeDesc.setUserPayload(userPayload); + edgeProperty = new EdgeProperty(edgeDesc, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(logicalOutputClass.getName()), new InputDescriptor(logicalInputClass.getName())); - break; + break; - case CUSTOM_SIMPLE_EDGE: - dataMovementType = DataMovementType.SCATTER_GATHER; - logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; - break; + case CUSTOM_SIMPLE_EDGE: + dataMovementType = DataMovementType.SCATTER_GATHER; + logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; + logicalInputClass = ShuffledUnorderedKVInput.class; + break; - case SIMPLE_EDGE: - default: - dataMovementType = DataMovementType.SCATTER_GATHER; - logicalOutputClass = OnFileSortedOutput.class; - logicalInputClass = ShuffledMergedInputLegacy.class; - break; + case SIMPLE_EDGE: + default: + dataMovementType = DataMovementType.SCATTER_GATHER; + logicalOutputClass = OnFileSortedOutput.class; + logicalInputClass = ShuffledMergedInputLegacy.class; + break; } if (edgeProperty == null) { @@ -360,7 +401,6 @@ * container size isn't set. */ private Resource getContainerResource(Configuration conf) { - Resource containerResource; int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); @@ -414,7 +454,7 @@ boolean useTezGroupedSplits = false; int numTasks = -1; - Class amSplitGeneratorClass = null; + Class amSplitGeneratorClass = null; InputSplitInfo inputSplitInfo = null; Class inputFormatClass = conf.getClass("mapred.input.format.class", InputFormat.class); @@ -533,7 +573,8 @@ Vertex reducer = new Vertex(reduceWork.getName(), new ProcessorDescriptor(ReduceTezProcessor.class.getName()). setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), - reduceWork.getNumReduceTasks(), getContainerResource(conf)); + reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork + .getNumReduceTasks(), getContainerResource(conf)); Map environment = new HashMap(); @@ -812,7 +853,7 @@ for (int i = 0; i < waitAttempts; i++) { if (!checkPreExisting(src, dest, conf)) { try { - Thread.currentThread().sleep(sleepInterval); + Thread.sleep(sleepInterval); } catch (InterruptedException interruptedException) { throw new IOException(interruptedException); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (revision 1598829) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (working copy) @@ -95,6 +95,8 @@ private transient VectorExpressionWriter[] partitionWriters; private transient VectorExpressionWriter[] bucketWriters = null; + private static final boolean isDebugEnabled = LOG.isDebugEnabled(); + public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf) throws HiveException { this(); @@ -147,10 +149,12 @@ colNames = String.format("%s %s", colNames, colName); } - LOG.debug(String.format("keyObjectInspector [%s]%s => %s", + if (isDebugEnabled) { + LOG.debug(String.format("keyObjectInspector [%s]%s => %s", keyObjectInspector.getClass(), keyObjectInspector, colNames)); + } partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols()); if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) { @@ -177,15 +181,19 @@ } }); - colNames = ""; - for(String colName : conf.getOutputValueColumnNames()) { - colNames = String.format("%s %s", colNames, colName); + if (isDebugEnabled) { + colNames = ""; + for(String colName : conf.getOutputValueColumnNames()) { + colNames = String.format("%s %s", colNames, colName); + } } - LOG.debug(String.format("valueObjectInspector [%s]%s => %s", - valueObjectInspector.getClass(), - valueObjectInspector, - colNames)); + if (isDebugEnabled) { + LOG.debug(String.format("valueObjectInspector [%s]%s => %s", + valueObjectInspector.getClass(), + valueObjectInspector, + colNames)); + } int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : @@ -211,11 +219,13 @@ public void processOp(Object row, int tag) throws HiveException { VectorizedRowBatch vrg = (VectorizedRowBatch) row; - LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts", - vrg.size, - valueEval.length, - keyEval.length, - partitionEval.length)); + if (isDebugEnabled) { + LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts", + vrg.size, + valueEval.length, + keyEval.length, + partitionEval.length)); + } try { // Evaluate the keys @@ -268,17 +278,22 @@ firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength); } + final int hashCode; + + if(autoParallel && partitionEval.length > 0) { + hashCode = hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0); + } else if(bucketEval != null && bucketEval.length > 0) { + hashCode = computeHashCode(vrg, rowIndex, buckNum); + } else { + hashCode = computeHashCode(vrg, rowIndex); + } + + firstKey.setHashCode(hashCode); + if (useTopN) { reducerHash.tryStoreVectorizedKey(firstKey, batchIndex); } else { - // No TopN, just forward the first key and all others. - int hashCode = 0; - if (bucketEval != null && bucketEval.length != 0) { - hashCode = computeHashCode(vrg, rowIndex, buckNum); - } else { - hashCode = computeHashCode(vrg, rowIndex); - } - firstKey.setHashCode(hashCode); + // No TopN, just forward the first key and all others. BytesWritable value = makeValueWritable(vrg, rowIndex); collect(firstKey, value); forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0); @@ -296,17 +311,18 @@ rowIndex = vrg.selected[batchIndex]; } // Compute value and hashcode - we'd either store or forward them. - int hashCode = computeHashCode(vrg, rowIndex); BytesWritable value = makeValueWritable(vrg, rowIndex); int distKeyLength = -1; + int hashCode; if (result == TopNHash.FORWARD) { HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex); - firstKey.setHashCode(hashCode); distKeyLength = firstKey.getDistKeyLength(); + hashCode = firstKey.hashCode(); collect(firstKey, value); } else { - reducerHash.storeValue(result, value, hashCode, true); + reducerHash.storeValue(result, value, true); distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex); + hashCode = reducerHash.getVectorizedKeyHashCode(batchIndex); } // Now forward other the rows if there's multi-distinct (but see TODO in forward...). // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1. Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java (revision 1598829) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java (working copy) @@ -86,6 +86,7 @@ maxReducers, false); LOG.info("Set parallelism for reduce sink "+sink+" to: "+numReducers); desc.setNumReducers(numReducers); + desc.setAutoParallel(true); } } else { LOG.info("Number of reducers determined to be: "+desc.getNumReducers()); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (revision 1598829) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (working copy) @@ -23,19 +23,19 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; -import java.util.LinkedList; import java.util.Map; import java.util.Set; -import org.apache.hadoop.fs.Path; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.UnionOperator; @@ -42,14 +42,14 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.UnionWork; -import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; /** * GenTezUtils is a collection of shared helper methods to produce @@ -88,6 +88,15 @@ public ReduceWork createReduceWork(GenTezProcContext context, Operator root, TezWork tezWork) { assert !root.getParentOperators().isEmpty(); + + boolean isAutoReduceParallelism = + context.conf.getBoolVar(HiveConf.ConfVars.TEZ_AUTO_REDUCER_PARALLELISM); + + float maxPartitionFactor = + context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MAX_PARTITION_FACTOR); + float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR); + long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); + ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber)); LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root); reduceWork.setReducer(root); @@ -102,10 +111,37 @@ reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); + if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) { + reduceWork.setAutoReduceParallelism(true); + + // configured limit for reducers + int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); + + // min we allow tez to pick + int minPartition = (int) (reduceSink.getConf().getNumReducers() * minPartitionFactor + 1); + minPartition = (minPartition > maxReducers) ? maxReducers : minPartition; + + // max we allow tez to pick + int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); + maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition; + + reduceWork.setMinReduceTasks(minPartition); + reduceWork.setMaxReduceTasks(maxPartition); + } + setupReduceSink(context, reduceWork, reduceSink); tezWork.add(reduceWork); - TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + + TezEdgeProperty edgeProp; + if (reduceWork.isAutoReduceParallelism()) { + edgeProp = + new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, + reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer); + } else { + edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + } + tezWork.connect( context.preceedingWork, reduceWork, edgeProp); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (revision 1598829) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (working copy) @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.parse; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -28,28 +27,23 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.UnionOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.UnionWork; -import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; /** * GenTezWork separates the operator tree into tez tasks. @@ -136,7 +130,7 @@ if (!context.currentMapJoinOperators.isEmpty()) { for (MapJoinOperator mj: context.currentMapJoinOperators) { LOG.debug("Processing map join: " + mj); - // remember the mapping in case we scan another branch of the + // remember the mapping in case we scan another branch of the // mapjoin later if (!context.mapJoinWorkMap.containsKey(mj)) { List workItems = new LinkedList(); @@ -175,7 +169,7 @@ LOG.debug("connecting "+parentWork.getName()+" with "+work.getName()); TezEdgeProperty edgeProp = parentWorkMap.getValue(); tezWork.connect(parentWork, work, edgeProp); - + // need to set up output name for reduce sink now that we know the name // of the downstream work for (ReduceSinkOperator r: @@ -206,7 +200,7 @@ root.removeParent(parent); } - if (!context.currentUnionOperators.isEmpty()) { + if (!context.currentUnionOperators.isEmpty()) { // if there are union all operators we need to add the work to the set // of union operators. @@ -249,6 +243,7 @@ if (context.leafOperatorToFollowingWork.containsKey(operator)) { BaseWork followingWork = context.leafOperatorToFollowingWork.get(operator); + long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); LOG.debug("Second pass. Leaf operator: "+operator +" has common downstream work:"+followingWork); @@ -268,7 +263,14 @@ if (!context.connectedReduceSinks.contains(rs)) { // add dependency between the two work items - TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + TezEdgeProperty edgeProp; + if (rWork.isAutoReduceParallelism()) { + edgeProp = + new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, + rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + } else { + edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + } tezWork.connect(work, rWork, edgeProp); context.connectedReduceSinks.add(rs); } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (revision 1598829) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (working copy) @@ -87,6 +87,7 @@ private float topNMemoryUsage = -1; private boolean mapGroupBy; // for group-by, values with same key on top-K should be forwarded private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable + private boolean autoParallel = false; // Is reducer parallelism automatic or fixed private static transient Log LOG = LogFactory.getLog(ReduceSinkDesc.class); public ReduceSinkDesc() { @@ -139,6 +140,7 @@ desc.setBucketCols(bucketCols); desc.setStatistics(this.getStatistics()); desc.setSkipTag(skipTag); + desc.setAutoParallel(autoParallel); return desc; } @@ -340,4 +342,12 @@ public boolean getSkipTag() { return skipTag; } + + public final boolean isAutoParallel() { + return autoParallel; + } + + public final void setAutoParallel(final boolean autoParallel) { + this.autoParallel = autoParallel; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java (revision 1598829) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java (working copy) @@ -20,10 +20,10 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.LinkedHashSet; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,7 +42,7 @@ * distributed on the cluster. The ExecReducer will ultimately deserialize this * class on the data nodes and setup it's operator pipeline accordingly. * - * This class is also used in the explain command any property with the + * This class is also used in the explain command any property with the * appropriate annotation will be displayed in the explain output. */ @SuppressWarnings({"serial", "deprecation"}) @@ -69,12 +69,21 @@ // desired parallelism of the reduce task. private Integer numReduceTasks; - // boolean to signal whether tagging will be used (e.g.: join) or + // boolean to signal whether tagging will be used (e.g.: join) or // not (e.g.: group by) private boolean needsTagging; private Map tagToInput = new HashMap(); + // boolean that says whether tez auto reduce parallelism should be used + private boolean isAutoReduceParallelism; + + // for auto reduce parallelism - minimum reducers requested + private int minReduceTasks; + + // for auto reduce parallelism - max reducers requested + private int maxReduceTasks; + /** * If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing * to keySerializeInfo of the ReduceSink @@ -157,6 +166,7 @@ this.numReduceTasks = numReduceTasks; } + @Override public void configureJobConf(JobConf job) { if (reducer != null) { for (FileSinkOperator fs : OperatorUtils.findOperators(reducer, FileSinkOperator.class)) { @@ -164,4 +174,28 @@ } } } + + public void setAutoReduceParallelism(boolean isAutoReduceParallelism) { + this.isAutoReduceParallelism = isAutoReduceParallelism; + } + + public boolean isAutoReduceParallelism() { + return isAutoReduceParallelism; + } + + public void setMinReduceTasks(int minReduceTasks) { + this.minReduceTasks = minReduceTasks; + } + + public int getMinReduceTasks() { + return minReduceTasks; + } + + public int getMaxReduceTasks() { + return maxReduceTasks; + } + + public void setMaxReduceTasks(int maxReduceTasks) { + this.maxReduceTasks = maxReduceTasks; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java (revision 1598829) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java (working copy) @@ -1,16 +1,12 @@ package org.apache.hadoop.hive.ql.plan; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; public class TezEdgeProperty { - + public enum EdgeType { SIMPLE_EDGE, - BROADCAST_EDGE, + BROADCAST_EDGE, CONTAINS, CUSTOM_EDGE, CUSTOM_SIMPLE_EDGE, @@ -20,7 +16,12 @@ private EdgeType edgeType; private int numBuckets; - public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, + private boolean isAutoReduce; + private int minReducer; + private int maxReducer; + private long inputSizePerReducer; + + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, int buckets) { this.hiveConf = hiveConf; this.edgeType = edgeType; @@ -27,6 +28,15 @@ this.numBuckets = buckets; } + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce, + int minReducer, int maxReducer, long bytesPerReducer) { + this(hiveConf, edgeType, -1); + this.minReducer = minReducer; + this.maxReducer = maxReducer; + this.isAutoReduce = isAutoReduce; + this.inputSizePerReducer = bytesPerReducer; + } + public TezEdgeProperty(EdgeType edgeType) { this(null, edgeType, -1); } @@ -42,4 +52,20 @@ public int getNumBuckets() { return numBuckets; } + + public boolean isAutoReduce() { + return isAutoReduce; + } + + public int getMinReducer() { + return minReducer; + } + + public int getMaxReducer() { + return maxReducer; + } + + public long getInputSizePerReducer() { + return inputSizePerReducer; + } }