diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index dcfe29a..c1d0e9c 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -190,8 +190,8 @@
COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),
COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""),
- BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 1000)),
- MAXREDUCERS("hive.exec.reducers.max", 999),
+ BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (256 * 1000 * 1000)),
+ MAXREDUCERS("hive.exec.reducers.max", 1009), // pick a prime
PREEXECHOOKS("hive.exec.pre.hooks", ""),
POSTEXECHOOKS("hive.exec.post.hooks", ""),
ONFAILUREHOOKS("hive.exec.failure.hooks", ""),
@@ -1028,6 +1028,9 @@
HIVE_CHECK_CROSS_PRODUCT("hive.exec.check.crossproducts", true),
HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL("hive.localize.resource.wait.interval", 5000L), // in ms
HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS("hive.localize.resource.num.wait.attempts", 5),
+ TEZ_AUTO_REDUCER_PARALLELISM("hive.tez.auto.reducer.parallelism", false),
+ TEZ_OVER_PARTITION_FACTOR("hive.tez.over.partition.factor", 2f),
+ TEZ_MIN_PARTITION_FACTOR("hive.tez.min.partition.factor", 0.25f)
;
public final String varname;
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index a75f569..f8c3594 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -2755,4 +2755,22 @@
Optimized hashtable (see hive.mapjoin.optimized.hashtable) uses a chain of buffers to store data. This is one buffer size. HT may be slightly faster if this is larger, but for small joins unnecessary memory will be allocated and then trimmed.
+
+ hive.tez.auto.reducer.parallelism
+ false
+ Turn on Tez' auto reducer parallelism feature. When enabled Hive will still estimate data sizes and set parallelism, but tez will sample all vertices output and adjust the estimates as necessary
+
+
+
+ hive.tez.over.partition.factor
+ 2f
+ When auto reducer parallelism is enabled this factor will be used to over partition data in shuffle edges.
+
+
+
+ hive.tez.min.partition.factor
+ 2f
+ When auto reducer parallelism is enabled this factor will be used put a lower limit to the number of reducers that tez specifies..
+
+
diff --git data/conf/tez/hive-site.xml data/conf/tez/hive-site.xml
index 0c99bb6..f7f408d 100644
--- data/conf/tez/hive-site.xml
+++ data/conf/tez/hive-site.xml
@@ -233,4 +233,9 @@
+
+ hive.tez.auto.reducer.parallelism
+ true
+
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 6368548..9fc671f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -49,6 +49,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.hash.MurmurHash;
/**
* Reduce Sink Operator sends output to the reduce stage.
@@ -95,6 +96,9 @@
transient protected int numDistinctExprs;
transient String[] inputAliases; // input aliases of this RS for join (used for PPD)
private boolean skipTag = false;
+ protected transient boolean autoParallel = false;
+
+ protected static final MurmurHash hash = (MurmurHash)MurmurHash.getInstance();
public void setInputAliases(String[] inputAliases) {
this.inputAliases = inputAliases;
@@ -172,6 +176,8 @@ protected void initializeOp(Configuration hconf) throws HiveException {
reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
}
+ autoParallel = conf.isAutoParallel();
+
firstRow = true;
initializeChildren(hconf);
} catch (Exception e) {
@@ -295,24 +301,30 @@ public void processOp(Object row, int tag) throws HiveException {
firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
}
+ final int hashCode;
+
+ if(autoParallel && partitionEval.length > 0) {
+ // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0]
+ hashCode = hash.hash(firstKey.getBytes(), distKeyLength, 0);
+ } else if(bucketEval != null && bucketEval.length > 0) {
+ hashCode = computeHashCode(row, buckNum);
+ } else {
+ hashCode = computeHashCode(row);
+ }
+
+ firstKey.setHashCode(hashCode);
+
// Try to store the first key. If it's not excluded, we will proceed.
int firstIndex = reducerHash.tryStoreKey(firstKey);
if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
// Compute value and hashcode - we'd either store or forward them.
BytesWritable value = makeValueWritable(row);
- int hashCode = 0;
- if (bucketEval == null) {
- hashCode = computeHashCode(row);
- } else {
- hashCode = computeHashCode(row, buckNum);
- }
if (firstIndex == TopNHash.FORWARD) {
- firstKey.setHashCode(hashCode);
collect(firstKey, value);
} else {
assert firstIndex >= 0;
- reducerHash.storeValue(firstIndex, value, hashCode, false);
+ reducerHash.storeValue(firstIndex, value, false);
}
// All other distinct keys will just be forwarded. This could be optimized...
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
index 978a749..bc81467 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
@@ -198,6 +198,7 @@ public void tryStoreVectorizedKey(HiveKey key, int batchIndex)
int index = size < topN ? size : evicted;
keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
distKeyLengths[index] = key.getDistKeyLength();
+ hashes[index] = key.hashCode();
Integer collisionIndex = indexes.store(index);
if (null != collisionIndex) {
// forward conditional on the survival of the corresponding key currently in indexes.
@@ -256,6 +257,7 @@ public HiveKey getVectorizedKeyToForward(int batchIndex) {
int index = MAY_FORWARD - batchIndexToResult[batchIndex];
HiveKey hk = new HiveKey();
hk.set(keys[index], 0, keys[index].length);
+ hk.setHashCode(hashes[index]);
hk.setDistKeyLength(distKeyLengths[index]);
return hk;
}
@@ -270,15 +272,23 @@ public int getVectorizedKeyDistLength(int batchIndex) {
}
/**
+ * After vectorized batch is processed, can return hashCode of a key.
+ * @param batchIndex index of the key in the batch.
+ * @return The hashCode corresponding to the key.
+ */
+ public int getVectorizedKeyHashCode(int batchIndex) {
+ return hashes[batchIndexToResult[batchIndex]];
+ }
+
+ /**
* Stores the value for the key in the heap.
* @param index The index, either from tryStoreKey or from tryStoreVectorizedKey result.
* @param value The value to store.
* @param keyHash The key hash to store.
* @param vectorized Whether the result is coming from a vectorized batch.
*/
- public void storeValue(int index, BytesWritable value, int keyHash, boolean vectorized) {
+ public void storeValue(int index, BytesWritable value, boolean vectorized) {
values[index] = Arrays.copyOf(value.getBytes(), value.getLength());
- hashes[index] = keyHash;
// Vectorized doesn't adjust usage for the keys while processing the batch
usage += values[index].length + (vectorized ? keys[index].length : 0);
}
@@ -317,6 +327,7 @@ private int insertKeyIntoHeap(HiveKey key) throws IOException, HiveException {
int index = size < topN ? size : evicted;
keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
distKeyLengths[index] = key.getDistKeyLength();
+ hashes[index] = key.hashCode();
if (null != indexes.store(index)) {
// it's only for GBY which should forward all values associated with the key in the range
// of limit. new value should be attatched with the key but in current implementation,
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index f27bde8..75c5f30 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -40,7 +40,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
@@ -94,9 +93,10 @@
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
-import org.apache.tez.dag.api.VertexGroup;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -114,6 +114,7 @@
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
/**
* DagUtils. DagUtils is a collection of helper methods to convert
@@ -210,9 +211,10 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
* @param edgeProp the edge property of connection between the two
* endpoints.
*/
+ @SuppressWarnings("rawtypes")
public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
Vertex w, TezEdgeProperty edgeProp)
- throws IOException {
+ throws IOException {
Class mergeInputClass;
@@ -221,27 +223,44 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
EdgeType edgeType = edgeProp.getEdgeType();
switch (edgeType) {
- case BROADCAST_EDGE:
- mergeInputClass = ConcatenatedMergedKeyValueInput.class;
- break;
- case CUSTOM_EDGE:
- mergeInputClass = ConcatenatedMergedKeyValueInput.class;
- int numBuckets = edgeProp.getNumBuckets();
- VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor(
- CustomPartitionVertex.class.getName());
- byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
- desc.setUserPayload(userPayload);
- w.setVertexManagerPlugin(desc);
- break;
-
- case CUSTOM_SIMPLE_EDGE:
- mergeInputClass = ConcatenatedMergedKeyValueInput.class;
- break;
+ case BROADCAST_EDGE:
+ mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+ break;
+ case CUSTOM_EDGE: {
+ mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+ int numBuckets = edgeProp.getNumBuckets();
+ VertexManagerPluginDescriptor desc =
+ new VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName());
+ byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
+ desc.setUserPayload(userPayload);
+ w.setVertexManagerPlugin(desc);
+ break;
+ }
- case SIMPLE_EDGE:
- default:
- mergeInputClass = TezMergedLogicalInput.class;
- break;
+ case CUSTOM_SIMPLE_EDGE:
+ mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+ break;
+
+ case SIMPLE_EDGE: {
+ if (edgeProp.isAutoReduce()) {
+ Configuration pluginConf = new Configuration(false);
+ VertexManagerPluginDescriptor desc =
+ new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName());
+ pluginConf.setBoolean(
+ ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+ pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
+ edgeProp.getMinReducer());
+ pluginConf.setLong(
+ ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+ edgeProp.getInputSizePerReducer());
+ ByteString payload = MRHelpers.createByteStringFromConf(pluginConf);
+ desc.setUserPayload(payload.toByteArray());
+ w.setVertexManagerPlugin(desc);
+ }
+ }
+ default:
+ mergeInputClass = TezMergedLogicalInput.class;
+ break;
}
return new GroupInputEdge(group, w, createEdgeProperty(edgeProp),
@@ -278,13 +297,35 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
updateConfigurationForEdge(vConf, v, wConf, w);
- if (edgeProp.getEdgeType() == EdgeType.CUSTOM_EDGE) {
+ switch(edgeProp.getEdgeType()) {
+ case CUSTOM_EDGE: {
int numBuckets = edgeProp.getNumBuckets();
byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor(
CustomPartitionVertex.class.getName());
desc.setUserPayload(userPayload);
w.setVertexManagerPlugin(desc);
+ break;
+ }
+ case SIMPLE_EDGE: {
+ if (edgeProp.isAutoReduce()) {
+ Configuration pluginConf = new Configuration(false);
+ VertexManagerPluginDescriptor desc =
+ new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName());
+ pluginConf.setBoolean(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+ pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
+ edgeProp.getMinReducer());
+ pluginConf.setLong(
+ ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+ edgeProp.getInputSizePerReducer());
+ ByteString payload = MRHelpers.createByteStringFromConf(pluginConf);
+ desc.setUserPayload(payload.toByteArray());
+ w.setVertexManagerPlugin(desc);
+ }
+ break;
+ }
+ default:
+ // nothing
}
return new Edge(v, w, createEdgeProperty(edgeProp));
@@ -293,6 +334,7 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
/*
* Helper function to create an edge property from an edge type.
*/
+ @SuppressWarnings("rawtypes")
private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOException {
DataMovementType dataMovementType;
Class logicalInputClass;
@@ -301,45 +343,44 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOExcep
EdgeProperty edgeProperty = null;
EdgeType edgeType = edgeProp.getEdgeType();
switch (edgeType) {
- case BROADCAST_EDGE:
- dataMovementType = DataMovementType.BROADCAST;
- logicalOutputClass = OnFileUnorderedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- break;
-
- case CUSTOM_EDGE:
-
- dataMovementType = DataMovementType.CUSTOM;
- logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
- CustomPartitionEdge.class.getName());
- CustomEdgeConfiguration edgeConf =
- new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
- DataOutputBuffer dob = new DataOutputBuffer();
- edgeConf.write(dob);
- byte[] userPayload = dob.getData();
- edgeDesc.setUserPayload(userPayload);
- edgeProperty =
+ case BROADCAST_EDGE:
+ dataMovementType = DataMovementType.BROADCAST;
+ logicalOutputClass = OnFileUnorderedKVOutput.class;
+ logicalInputClass = ShuffledUnorderedKVInput.class;
+ break;
+
+ case CUSTOM_EDGE:
+ dataMovementType = DataMovementType.CUSTOM;
+ logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
+ logicalInputClass = ShuffledUnorderedKVInput.class;
+ EdgeManagerDescriptor edgeDesc =
+ new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
+ CustomEdgeConfiguration edgeConf =
+ new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ edgeConf.write(dob);
+ byte[] userPayload = dob.getData();
+ edgeDesc.setUserPayload(userPayload);
+ edgeProperty =
new EdgeProperty(edgeDesc,
DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
new OutputDescriptor(logicalOutputClass.getName()),
new InputDescriptor(logicalInputClass.getName()));
- break;
-
- case CUSTOM_SIMPLE_EDGE:
- dataMovementType = DataMovementType.SCATTER_GATHER;
- logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- break;
-
- case SIMPLE_EDGE:
- default:
- dataMovementType = DataMovementType.SCATTER_GATHER;
- logicalOutputClass = OnFileSortedOutput.class;
- logicalInputClass = ShuffledMergedInputLegacy.class;
- break;
+ break;
+
+ case CUSTOM_SIMPLE_EDGE:
+ dataMovementType = DataMovementType.SCATTER_GATHER;
+ logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
+ logicalInputClass = ShuffledUnorderedKVInput.class;
+ break;
+
+ case SIMPLE_EDGE:
+ default:
+ dataMovementType = DataMovementType.SCATTER_GATHER;
+ logicalOutputClass = OnFileSortedOutput.class;
+ logicalInputClass = ShuffledMergedInputLegacy.class;
+ break;
}
if (edgeProperty == null) {
@@ -360,7 +401,6 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOExcep
* container size isn't set.
*/
private Resource getContainerResource(Configuration conf) {
- Resource containerResource;
int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ?
HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) :
conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB);
@@ -414,7 +454,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
boolean useTezGroupedSplits = false;
int numTasks = -1;
- Class amSplitGeneratorClass = null;
+ Class amSplitGeneratorClass = null;
InputSplitInfo inputSplitInfo = null;
Class inputFormatClass = conf.getClass("mapred.input.format.class",
InputFormat.class);
@@ -533,7 +573,8 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork,
Vertex reducer = new Vertex(reduceWork.getName(),
new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
- reduceWork.getNumReduceTasks(), getContainerResource(conf));
+ reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork
+ .getNumReduceTasks(), getContainerResource(conf));
Map environment = new HashMap();
@@ -812,7 +853,7 @@ public LocalResource localizeResource(Path src, Path dest, Configuration conf)
for (int i = 0; i < waitAttempts; i++) {
if (!checkPreExisting(src, dest, conf)) {
try {
- Thread.currentThread().sleep(sleepInterval);
+ Thread.sleep(sleepInterval);
} catch (InterruptedException interruptedException) {
throw new IOException(interruptedException);
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
index e234465..11024da 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
@@ -95,6 +95,8 @@
private transient VectorExpressionWriter[] partitionWriters;
private transient VectorExpressionWriter[] bucketWriters = null;
+ private static final boolean isDebugEnabled = LOG.isDebugEnabled();
+
public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
throws HiveException {
this();
@@ -147,10 +149,12 @@ public void assign(VectorExpressionWriter[] writers,
colNames = String.format("%s %s", colNames, colName);
}
- LOG.debug(String.format("keyObjectInspector [%s]%s => %s",
+ if (isDebugEnabled) {
+ LOG.debug(String.format("keyObjectInspector [%s]%s => %s",
keyObjectInspector.getClass(),
keyObjectInspector,
colNames));
+ }
partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols());
if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) {
@@ -177,15 +181,19 @@ public void assign(VectorExpressionWriter[] writers,
}
});
- colNames = "";
- for(String colName : conf.getOutputValueColumnNames()) {
- colNames = String.format("%s %s", colNames, colName);
+ if (isDebugEnabled) {
+ colNames = "";
+ for(String colName : conf.getOutputValueColumnNames()) {
+ colNames = String.format("%s %s", colNames, colName);
+ }
}
- LOG.debug(String.format("valueObjectInspector [%s]%s => %s",
- valueObjectInspector.getClass(),
- valueObjectInspector,
- colNames));
+ if (isDebugEnabled) {
+ LOG.debug(String.format("valueObjectInspector [%s]%s => %s",
+ valueObjectInspector.getClass(),
+ valueObjectInspector,
+ colNames));
+ }
int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
@@ -211,11 +219,13 @@ public void assign(VectorExpressionWriter[] writers,
public void processOp(Object row, int tag) throws HiveException {
VectorizedRowBatch vrg = (VectorizedRowBatch) row;
- LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts",
- vrg.size,
- valueEval.length,
- keyEval.length,
- partitionEval.length));
+ if (isDebugEnabled) {
+ LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts",
+ vrg.size,
+ valueEval.length,
+ keyEval.length,
+ partitionEval.length));
+ }
try {
// Evaluate the keys
@@ -268,17 +278,22 @@ public void processOp(Object row, int tag) throws HiveException {
firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
}
+ final int hashCode;
+
+ if(autoParallel && partitionEval.length > 0) {
+ hashCode = hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0);
+ } else if(bucketEval != null && bucketEval.length > 0) {
+ hashCode = computeHashCode(vrg, rowIndex, buckNum);
+ } else {
+ hashCode = computeHashCode(vrg, rowIndex);
+ }
+
+ firstKey.setHashCode(hashCode);
+
if (useTopN) {
reducerHash.tryStoreVectorizedKey(firstKey, batchIndex);
} else {
- // No TopN, just forward the first key and all others.
- int hashCode = 0;
- if (bucketEval != null && bucketEval.length != 0) {
- hashCode = computeHashCode(vrg, rowIndex, buckNum);
- } else {
- hashCode = computeHashCode(vrg, rowIndex);
- }
- firstKey.setHashCode(hashCode);
+ // No TopN, just forward the first key and all others.
BytesWritable value = makeValueWritable(vrg, rowIndex);
collect(firstKey, value);
forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0);
@@ -296,17 +311,18 @@ public void processOp(Object row, int tag) throws HiveException {
rowIndex = vrg.selected[batchIndex];
}
// Compute value and hashcode - we'd either store or forward them.
- int hashCode = computeHashCode(vrg, rowIndex);
BytesWritable value = makeValueWritable(vrg, rowIndex);
int distKeyLength = -1;
+ int hashCode;
if (result == TopNHash.FORWARD) {
HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex);
- firstKey.setHashCode(hashCode);
distKeyLength = firstKey.getDistKeyLength();
+ hashCode = firstKey.hashCode();
collect(firstKey, value);
} else {
- reducerHash.storeValue(result, value, hashCode, true);
+ reducerHash.storeValue(result, value, true);
distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex);
+ hashCode = reducerHash.getVectorizedKeyHashCode(batchIndex);
}
// Now forward other the rows if there's multi-distinct (but see TODO in forward...).
// Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1.
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
index b522963..625af85 100644
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
@@ -86,6 +86,7 @@ public Object process(Node nd, Stack stack,
maxReducers, false);
LOG.info("Set parallelism for reduce sink "+sink+" to: "+numReducers);
desc.setNumReducers(numReducers);
+ desc.setAutoParallel(true);
}
} else {
LOG.info("Number of reducers determined to be: "+desc.getNumReducers());
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 3f70092..11e4855 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -23,33 +23,33 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.fs.Path;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
/**
* GenTezUtils is a collection of shared helper methods to produce
@@ -88,6 +88,15 @@ public UnionWork createUnionWork(GenTezProcContext context, Operator> operator
public ReduceWork createReduceWork(GenTezProcContext context, Operator> root, TezWork tezWork) {
assert !root.getParentOperators().isEmpty();
+
+ boolean isAutoReduceParallelism =
+ context.conf.getBoolVar(HiveConf.ConfVars.TEZ_AUTO_REDUCER_PARALLELISM);
+
+ float overPartitionFactor =
+ context.conf.getFloatVar(HiveConf.ConfVars.TEZ_OVER_PARTITION_FACTOR);
+ float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR);
+ long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+
ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
reduceWork.setReducer(root);
@@ -102,10 +111,27 @@ public ReduceWork createReduceWork(GenTezProcContext context, Operator> root,
reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
+ if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) {
+ reduceWork.setAutoReduceParallelism(isAutoReduceParallelism);
+ reduceWork.setMinReduceTasks((int) (reduceSink.getConf().getNumReducers()
+ * minPartitionFactor) + 1);
+ reduceWork
+ .setMaxReduceTasks((int) (reduceSink.getConf().getNumReducers() * overPartitionFactor));
+ }
+
setupReduceSink(context, reduceWork, reduceSink);
tezWork.add(reduceWork);
- TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+
+ TezEdgeProperty edgeProp;
+ if (isAutoReduceParallelism) {
+ edgeProp =
+ new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
+ reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer);
+ } else {
+ edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ }
+
tezWork.connect(
context.preceedingWork,
reduceWork, edgeProp);
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 41aeb4c..b304fd3 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.parse;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -28,28 +27,23 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
/**
* GenTezWork separates the operator tree into tez tasks.
@@ -136,7 +130,7 @@ public Object process(Node nd, Stack stack,
if (!context.currentMapJoinOperators.isEmpty()) {
for (MapJoinOperator mj: context.currentMapJoinOperators) {
LOG.debug("Processing map join: " + mj);
- // remember the mapping in case we scan another branch of the
+ // remember the mapping in case we scan another branch of the
// mapjoin later
if (!context.mapJoinWorkMap.containsKey(mj)) {
List workItems = new LinkedList();
@@ -175,7 +169,7 @@ public Object process(Node nd, Stack stack,
LOG.debug("connecting "+parentWork.getName()+" with "+work.getName());
TezEdgeProperty edgeProp = parentWorkMap.getValue();
tezWork.connect(parentWork, work, edgeProp);
-
+
// need to set up output name for reduce sink now that we know the name
// of the downstream work
for (ReduceSinkOperator r:
@@ -206,7 +200,7 @@ public Object process(Node nd, Stack stack,
root.removeParent(parent);
}
- if (!context.currentUnionOperators.isEmpty()) {
+ if (!context.currentUnionOperators.isEmpty()) {
// if there are union all operators we need to add the work to the set
// of union operators.
@@ -249,6 +243,7 @@ public Object process(Node nd, Stack stack,
if (context.leafOperatorToFollowingWork.containsKey(operator)) {
BaseWork followingWork = context.leafOperatorToFollowingWork.get(operator);
+ long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
LOG.debug("Second pass. Leaf operator: "+operator
+" has common downstream work:"+followingWork);
@@ -268,7 +263,14 @@ public Object process(Node nd, Stack stack,
if (!context.connectedReduceSinks.contains(rs)) {
// add dependency between the two work items
- TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ TezEdgeProperty edgeProp;
+ if (rWork.isAutoReduceParallelism()) {
+ edgeProp =
+ new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
+ rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer);
+ } else {
+ edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ }
tezWork.connect(work, rWork, edgeProp);
context.connectedReduceSinks.add(rs);
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
index 44b318e..8c1d336 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
@@ -87,6 +87,7 @@
private float topNMemoryUsage = -1;
private boolean mapGroupBy; // for group-by, values with same key on top-K should be forwarded
private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable
+ private boolean autoParallel = false; // Is reducer parallelism automatic or fixed
private static transient Log LOG = LogFactory.getLog(ReduceSinkDesc.class);
public ReduceSinkDesc() {
@@ -139,6 +140,7 @@ public Object clone() {
desc.setBucketCols(bucketCols);
desc.setStatistics(this.getStatistics());
desc.setSkipTag(skipTag);
+ desc.setAutoParallel(autoParallel);
return desc;
}
@@ -340,4 +342,12 @@ public void setSkipTag(boolean value) {
public boolean getSkipTag() {
return skipTag;
}
+
+ public final boolean isAutoParallel() {
+ return autoParallel;
+ }
+
+ public final void setAutoParallel(final boolean autoParallel) {
+ this.autoParallel = autoParallel;
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
index a68374e..0cef12b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
@@ -20,10 +20,10 @@
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,7 +42,7 @@
* distributed on the cluster. The ExecReducer will ultimately deserialize this
* class on the data nodes and setup it's operator pipeline accordingly.
*
- * This class is also used in the explain command any property with the
+ * This class is also used in the explain command any property with the
* appropriate annotation will be displayed in the explain output.
*/
@SuppressWarnings({"serial", "deprecation"})
@@ -69,12 +69,21 @@ public ReduceWork(String name) {
// desired parallelism of the reduce task.
private Integer numReduceTasks;
- // boolean to signal whether tagging will be used (e.g.: join) or
+ // boolean to signal whether tagging will be used (e.g.: join) or
// not (e.g.: group by)
private boolean needsTagging;
private Map tagToInput = new HashMap();
+ // boolean that says whether tez auto reduce parallelism should be used
+ private boolean isAutoReduceParallelism;
+
+ // for auto reduce parallelism - minimum reducers requested
+ private int minReduceTasks;
+
+ // for auto reduce parallelism - max reducers requested
+ private int maxReduceTasks;
+
/**
* If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing
* to keySerializeInfo of the ReduceSink
@@ -157,6 +166,7 @@ public void setNumReduceTasks(final Integer numReduceTasks) {
this.numReduceTasks = numReduceTasks;
}
+ @Override
public void configureJobConf(JobConf job) {
if (reducer != null) {
for (FileSinkOperator fs : OperatorUtils.findOperators(reducer, FileSinkOperator.class)) {
@@ -164,4 +174,28 @@ public void configureJobConf(JobConf job) {
}
}
}
+
+ public void setAutoReduceParallelism(boolean isAutoReduceParallelism) {
+ this.isAutoReduceParallelism = isAutoReduceParallelism;
+ }
+
+ public boolean isAutoReduceParallelism() {
+ return isAutoReduceParallelism;
+ }
+
+ public void setMinReduceTasks(int minReduceTasks) {
+ this.minReduceTasks = minReduceTasks;
+ }
+
+ public int getMinReduceTasks() {
+ return minReduceTasks;
+ }
+
+ public int getMaxReduceTasks() {
+ return maxReduceTasks;
+ }
+
+ public void setMaxReduceTasks(int maxReduceTasks) {
+ this.maxReduceTasks = maxReduceTasks;
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
index 96adb84..d17ca8c 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
@@ -1,16 +1,12 @@
package org.apache.hadoop.hive.ql.plan;
-import java.util.List;
-import java.util.Map;
-
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
public class TezEdgeProperty {
-
+
public enum EdgeType {
SIMPLE_EDGE,
- BROADCAST_EDGE,
+ BROADCAST_EDGE,
CONTAINS,
CUSTOM_EDGE,
CUSTOM_SIMPLE_EDGE,
@@ -20,13 +16,27 @@
private EdgeType edgeType;
private int numBuckets;
- public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType,
+ private boolean isAutoReduce;
+ private int minReducer;
+ private int maxReducer;
+ private long inputSizePerReducer;
+
+ public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType,
int buckets) {
this.hiveConf = hiveConf;
this.edgeType = edgeType;
this.numBuckets = buckets;
}
+ public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce,
+ int minReducer, int maxReducer, long bytesPerReducer) {
+ this(hiveConf, edgeType, -1);
+ this.minReducer = minReducer;
+ this.maxReducer = maxReducer;
+ this.isAutoReduce = isAutoReduce;
+ this.inputSizePerReducer = bytesPerReducer;
+ }
+
public TezEdgeProperty(EdgeType edgeType) {
this(null, edgeType, -1);
}
@@ -42,4 +52,20 @@ public HiveConf getHiveConf () {
public int getNumBuckets() {
return numBuckets;
}
+
+ public boolean isAutoReduce() {
+ return isAutoReduce;
+ }
+
+ public int getMinReducer() {
+ return minReducer;
+ }
+
+ public int getMaxReducer() {
+ return maxReducer;
+ }
+
+ public long getInputSizePerReducer() {
+ return inputSizePerReducer;
+ }
}