diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 6368548..a0f13be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -49,6 +49,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.util.hash.MurmurHash; /** * Reduce Sink Operator sends output to the reduce stage. @@ -95,6 +96,9 @@ transient protected int numDistinctExprs; transient String[] inputAliases; // input aliases of this RS for join (used for PPD) private boolean skipTag = false; + protected transient boolean autoParallel = false; + + protected static final MurmurHash hash = (MurmurHash)MurmurHash.getInstance(); public void setInputAliases(String[] inputAliases) { this.inputAliases = inputAliases; @@ -172,6 +176,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); } + autoParallel = conf.isAutoParallel(); + firstRow = true; initializeChildren(hconf); } catch (Exception e) { @@ -295,24 +301,30 @@ public void processOp(Object row, int tag) throws HiveException { firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength); } + final int hashCode; + + if(autoParallel && partitionEval.length > 0) { + // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0] + hashCode = hash.hash(firstKey.getBytes(), distKeyLength, 0); + } else if(bucketEval != null && bucketEval.length > 0) { + hashCode = computeHashCode(row, buckNum); + } else { + hashCode = computeHashCode(row); + } + + firstKey.setHashCode(hashCode); + // Try to store the first key. If it's not excluded, we will proceed. int firstIndex = reducerHash.tryStoreKey(firstKey); if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do. // Compute value and hashcode - we'd either store or forward them. BytesWritable value = makeValueWritable(row); - int hashCode = 0; - if (bucketEval == null) { - hashCode = computeHashCode(row); - } else { - hashCode = computeHashCode(row, buckNum); - } if (firstIndex == TopNHash.FORWARD) { - firstKey.setHashCode(hashCode); collect(firstKey, value); } else { assert firstIndex >= 0; - reducerHash.storeValue(firstIndex, value, hashCode, false); + reducerHash.storeValue(firstIndex, value, false); } // All other distinct keys will just be forwarded. This could be optimized... diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java index 978a749..bc81467 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java @@ -198,6 +198,7 @@ public void tryStoreVectorizedKey(HiveKey key, int batchIndex) int index = size < topN ? size : evicted; keys[index] = Arrays.copyOf(key.getBytes(), key.getLength()); distKeyLengths[index] = key.getDistKeyLength(); + hashes[index] = key.hashCode(); Integer collisionIndex = indexes.store(index); if (null != collisionIndex) { // forward conditional on the survival of the corresponding key currently in indexes. @@ -256,6 +257,7 @@ public HiveKey getVectorizedKeyToForward(int batchIndex) { int index = MAY_FORWARD - batchIndexToResult[batchIndex]; HiveKey hk = new HiveKey(); hk.set(keys[index], 0, keys[index].length); + hk.setHashCode(hashes[index]); hk.setDistKeyLength(distKeyLengths[index]); return hk; } @@ -270,15 +272,23 @@ public int getVectorizedKeyDistLength(int batchIndex) { } /** + * After vectorized batch is processed, can return hashCode of a key. + * @param batchIndex index of the key in the batch. + * @return The hashCode corresponding to the key. + */ + public int getVectorizedKeyHashCode(int batchIndex) { + return hashes[batchIndexToResult[batchIndex]]; + } + + /** * Stores the value for the key in the heap. * @param index The index, either from tryStoreKey or from tryStoreVectorizedKey result. * @param value The value to store. * @param keyHash The key hash to store. * @param vectorized Whether the result is coming from a vectorized batch. */ - public void storeValue(int index, BytesWritable value, int keyHash, boolean vectorized) { + public void storeValue(int index, BytesWritable value, boolean vectorized) { values[index] = Arrays.copyOf(value.getBytes(), value.getLength()); - hashes[index] = keyHash; // Vectorized doesn't adjust usage for the keys while processing the batch usage += values[index].length + (vectorized ? keys[index].length : 0); } @@ -317,6 +327,7 @@ private int insertKeyIntoHeap(HiveKey key) throws IOException, HiveException { int index = size < topN ? size : evicted; keys[index] = Arrays.copyOf(key.getBytes(), key.getLength()); distKeyLengths[index] = key.getDistKeyLength(); + hashes[index] = key.hashCode(); if (null != indexes.store(index)) { // it's only for GBY which should forward all values associated with the key in the range // of limit. new value should be attatched with the key but in current implementation, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java index e234465..0b97890 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java @@ -95,6 +95,8 @@ private transient VectorExpressionWriter[] partitionWriters; private transient VectorExpressionWriter[] bucketWriters = null; + private static final boolean isDebugEnabled = LOG.isDebugEnabled(); + public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf) throws HiveException { this(); @@ -142,15 +144,15 @@ public void assign(VectorExpressionWriter[] writers, } }); + if (isDebugEnabled) { String colNames = ""; for (String colName : conf.getOutputKeyColumnNames()) { colNames = String.format("%s %s", colNames, colName); } LOG.debug(String.format("keyObjectInspector [%s]%s => %s", - keyObjectInspector.getClass(), - keyObjectInspector, - colNames)); + keyObjectInspector.getClass(), keyObjectInspector, colNames)); + } partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols()); if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) { @@ -177,15 +179,15 @@ public void assign(VectorExpressionWriter[] writers, } }); - colNames = ""; + if (isDebugEnabled) { + String colNames = ""; for (String colName : conf.getOutputValueColumnNames()) { colNames = String.format("%s %s", colNames, colName); } LOG.debug(String.format("valueObjectInspector [%s]%s => %s", - valueObjectInspector.getClass(), - valueObjectInspector, - colNames)); + valueObjectInspector.getClass(), valueObjectInspector, colNames)); + } int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : @@ -211,11 +213,10 @@ public void assign(VectorExpressionWriter[] writers, public void processOp(Object row, int tag) throws HiveException { VectorizedRowBatch vrg = (VectorizedRowBatch) row; + if (isDebugEnabled) { LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts", - vrg.size, - valueEval.length, - keyEval.length, - partitionEval.length)); + vrg.size, valueEval.length, keyEval.length, partitionEval.length)); + } try { // Evaluate the keys @@ -268,17 +269,22 @@ public void processOp(Object row, int tag) throws HiveException { firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength); } - if (useTopN) { - reducerHash.tryStoreVectorizedKey(firstKey, batchIndex); - } else { - // No TopN, just forward the first key and all others. - int hashCode = 0; - if (bucketEval != null && bucketEval.length != 0) { + final int hashCode; + + if(autoParallel && partitionEval.length > 0) { + hashCode = hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0); + } else if(bucketEval != null && bucketEval.length > 0) { hashCode = computeHashCode(vrg, rowIndex, buckNum); } else { hashCode = computeHashCode(vrg, rowIndex); } + firstKey.setHashCode(hashCode); + + if (useTopN) { + reducerHash.tryStoreVectorizedKey(firstKey, batchIndex); + } else { + // No TopN, just forward the first key and all others. BytesWritable value = makeValueWritable(vrg, rowIndex); collect(firstKey, value); forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0); @@ -296,17 +302,18 @@ public void processOp(Object row, int tag) throws HiveException { rowIndex = vrg.selected[batchIndex]; } // Compute value and hashcode - we'd either store or forward them. - int hashCode = computeHashCode(vrg, rowIndex); BytesWritable value = makeValueWritable(vrg, rowIndex); int distKeyLength = -1; + int hashCode; if (result == TopNHash.FORWARD) { HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex); - firstKey.setHashCode(hashCode); distKeyLength = firstKey.getDistKeyLength(); + hashCode = firstKey.hashCode(); collect(firstKey, value); } else { - reducerHash.storeValue(result, value, hashCode, true); + reducerHash.storeValue(result, value, true); distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex); + hashCode = reducerHash.getVectorizedKeyHashCode(batchIndex); } // Now forward other the rows if there's multi-distinct (but see TODO in forward...). // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1. diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java index b522963..625af85 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java @@ -86,6 +86,7 @@ public Object process(Node nd, Stack stack, maxReducers, false); LOG.info("Set parallelism for reduce sink "+sink+" to: "+numReducers); desc.setNumReducers(numReducers); + desc.setAutoParallel(true); } } else { LOG.info("Number of reducers determined to be: "+desc.getNumReducers()); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 44b318e..8c1d336 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -87,6 +87,7 @@ private float topNMemoryUsage = -1; private boolean mapGroupBy; // for group-by, values with same key on top-K should be forwarded private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable + private boolean autoParallel = false; // Is reducer parallelism automatic or fixed private static transient Log LOG = LogFactory.getLog(ReduceSinkDesc.class); public ReduceSinkDesc() { @@ -139,6 +140,7 @@ public Object clone() { desc.setBucketCols(bucketCols); desc.setStatistics(this.getStatistics()); desc.setSkipTag(skipTag); + desc.setAutoParallel(autoParallel); return desc; } @@ -340,4 +342,12 @@ public void setSkipTag(boolean value) { public boolean getSkipTag() { return skipTag; } + + public final boolean isAutoParallel() { + return autoParallel; + } + + public final void setAutoParallel(final boolean autoParallel) { + this.autoParallel = autoParallel; + } }