diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 6368548..74b22f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -49,6 +49,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.util.hash.MurmurHash; /** * Reduce Sink Operator sends output to the reduce stage. @@ -95,6 +96,8 @@ transient protected int numDistinctExprs; transient String[] inputAliases; // input aliases of this RS for join (used for PPD) private boolean skipTag = false; + + protected static final MurmurHash hash = (MurmurHash)MurmurHash.getInstance(); public void setInputAliases(String[] inputAliases) { this.inputAliases = inputAliases; @@ -294,25 +297,22 @@ public void processOp(Object row, int tag) throws HiveException { populateCachedDistinctKeys(row, 0); firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength); } - + // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0] + int hashCode = hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength()); + + firstKey.setHashCode(hashCode); + // Try to store the first key. If it's not excluded, we will proceed. int firstIndex = reducerHash.tryStoreKey(firstKey); if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do. // Compute value and hashcode - we'd either store or forward them. BytesWritable value = makeValueWritable(row); - int hashCode = 0; - if (bucketEval == null) { - hashCode = computeHashCode(row); - } else { - hashCode = computeHashCode(row, buckNum); - } - + if (firstIndex == TopNHash.FORWARD) { - firstKey.setHashCode(hashCode); collect(firstKey, value); } else { assert firstIndex >= 0; - reducerHash.storeValue(firstIndex, value, hashCode, false); + reducerHash.storeValue(firstIndex, value, false); } // All other distinct keys will just be forwarded. This could be optimized... @@ -370,34 +370,6 @@ private void populateCachedDistinctKeys(Object row, int index) throws HiveExcept union.setTag((byte) index); } - private int computeHashCode(Object row) throws HiveException { - // Evaluate the HashCode - int keyHashCode = 0; - if (partitionEval.length == 0) { - // If no partition cols, just distribute the data uniformly to provide better - // load balance. If the requirement is to have a single reducer, we should set - // the number of reducers to 1. - // Use a constant seed to make the code deterministic. - if (random == null) { - random = new Random(12345); - } - keyHashCode = random.nextInt(); - } else { - for (int i = 0; i < partitionEval.length; i++) { - Object o = partitionEval[i].evaluate(row); - keyHashCode = keyHashCode * 31 - + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); - } - } - return keyHashCode; - } - - private int computeHashCode(Object row, int buckNum) throws HiveException { - int keyHashCode = computeHashCode(row); - keyHashCode = keyHashCode * 31 + buckNum; - return keyHashCode; - } - // Serialize the keys and append the tag protected HiveKey toHiveKey(Object obj, int tag, Integer distLength) throws SerDeException { BinaryComparable key = (BinaryComparable)keySerializer.serialize(obj, keyObjectInspector); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java index 978a749..bc81467 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java @@ -198,6 +198,7 @@ public void tryStoreVectorizedKey(HiveKey key, int batchIndex) int index = size < topN ? size : evicted; keys[index] = Arrays.copyOf(key.getBytes(), key.getLength()); distKeyLengths[index] = key.getDistKeyLength(); + hashes[index] = key.hashCode(); Integer collisionIndex = indexes.store(index); if (null != collisionIndex) { // forward conditional on the survival of the corresponding key currently in indexes. @@ -256,6 +257,7 @@ public HiveKey getVectorizedKeyToForward(int batchIndex) { int index = MAY_FORWARD - batchIndexToResult[batchIndex]; HiveKey hk = new HiveKey(); hk.set(keys[index], 0, keys[index].length); + hk.setHashCode(hashes[index]); hk.setDistKeyLength(distKeyLengths[index]); return hk; } @@ -270,15 +272,23 @@ public int getVectorizedKeyDistLength(int batchIndex) { } /** + * After vectorized batch is processed, can return hashCode of a key. + * @param batchIndex index of the key in the batch. + * @return The hashCode corresponding to the key. + */ + public int getVectorizedKeyHashCode(int batchIndex) { + return hashes[batchIndexToResult[batchIndex]]; + } + + /** * Stores the value for the key in the heap. * @param index The index, either from tryStoreKey or from tryStoreVectorizedKey result. * @param value The value to store. * @param keyHash The key hash to store. * @param vectorized Whether the result is coming from a vectorized batch. */ - public void storeValue(int index, BytesWritable value, int keyHash, boolean vectorized) { + public void storeValue(int index, BytesWritable value, boolean vectorized) { values[index] = Arrays.copyOf(value.getBytes(), value.getLength()); - hashes[index] = keyHash; // Vectorized doesn't adjust usage for the keys while processing the batch usage += values[index].length + (vectorized ? keys[index].length : 0); } @@ -317,6 +327,7 @@ private int insertKeyIntoHeap(HiveKey key) throws IOException, HiveException { int index = size < topN ? size : evicted; keys[index] = Arrays.copyOf(key.getBytes(), key.getLength()); distKeyLengths[index] = key.getDistKeyLength(); + hashes[index] = key.hashCode(); if (null != indexes.store(index)) { // it's only for GBY which should forward all values associated with the key in the range // of limit. new value should be attatched with the key but in current implementation, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java index e234465..65518cf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java @@ -95,6 +95,8 @@ private transient VectorExpressionWriter[] partitionWriters; private transient VectorExpressionWriter[] bucketWriters = null; + private static final boolean isDebugEnabled = LOG.isDebugEnabled(); + public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf) throws HiveException { this(); @@ -142,15 +144,15 @@ public void assign(VectorExpressionWriter[] writers, } }); - String colNames = ""; - for(String colName : conf.getOutputKeyColumnNames()) { - colNames = String.format("%s %s", colNames, colName); - } + if (isDebugEnabled) { + String colNames = ""; + for (String colName : conf.getOutputKeyColumnNames()) { + colNames = String.format("%s %s", colNames, colName); + } - LOG.debug(String.format("keyObjectInspector [%s]%s => %s", - keyObjectInspector.getClass(), - keyObjectInspector, - colNames)); + LOG.debug(String.format("keyObjectInspector [%s]%s => %s", + keyObjectInspector.getClass(), keyObjectInspector, colNames)); + } partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols()); if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) { @@ -177,15 +179,15 @@ public void assign(VectorExpressionWriter[] writers, } }); - colNames = ""; - for(String colName : conf.getOutputValueColumnNames()) { - colNames = String.format("%s %s", colNames, colName); - } + if (isDebugEnabled) { + String colNames = ""; + for (String colName : conf.getOutputValueColumnNames()) { + colNames = String.format("%s %s", colNames, colName); + } - LOG.debug(String.format("valueObjectInspector [%s]%s => %s", - valueObjectInspector.getClass(), - valueObjectInspector, - colNames)); + LOG.debug(String.format("valueObjectInspector [%s]%s => %s", + valueObjectInspector.getClass(), valueObjectInspector, colNames)); + } int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : @@ -211,11 +213,10 @@ public void assign(VectorExpressionWriter[] writers, public void processOp(Object row, int tag) throws HiveException { VectorizedRowBatch vrg = (VectorizedRowBatch) row; - LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts", - vrg.size, - valueEval.length, - keyEval.length, - partitionEval.length)); + if (isDebugEnabled) { + LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts", + vrg.size, valueEval.length, keyEval.length, partitionEval.length)); + } try { // Evaluate the keys @@ -267,18 +268,13 @@ public void processOp(Object row, int tag) throws HiveException { populateCachedDistinctKeys(vrg, rowIndex, 0); firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength); } - + int hashCode = hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength()); + firstKey.setHashCode(hashCode); + if (useTopN) { reducerHash.tryStoreVectorizedKey(firstKey, batchIndex); } else { // No TopN, just forward the first key and all others. - int hashCode = 0; - if (bucketEval != null && bucketEval.length != 0) { - hashCode = computeHashCode(vrg, rowIndex, buckNum); - } else { - hashCode = computeHashCode(vrg, rowIndex); - } - firstKey.setHashCode(hashCode); BytesWritable value = makeValueWritable(vrg, rowIndex); collect(firstKey, value); forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0); @@ -296,17 +292,18 @@ public void processOp(Object row, int tag) throws HiveException { rowIndex = vrg.selected[batchIndex]; } // Compute value and hashcode - we'd either store or forward them. - int hashCode = computeHashCode(vrg, rowIndex); BytesWritable value = makeValueWritable(vrg, rowIndex); int distKeyLength = -1; + int hashCode; if (result == TopNHash.FORWARD) { HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex); - firstKey.setHashCode(hashCode); distKeyLength = firstKey.getDistKeyLength(); + hashCode = firstKey.hashCode(); collect(firstKey, value); } else { - reducerHash.storeValue(result, value, hashCode, true); + reducerHash.storeValue(result, value, true); distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex); + hashCode = reducerHash.getVectorizedKeyHashCode(batchIndex); } // Now forward other the rows if there's multi-distinct (but see TODO in forward...). // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1. @@ -401,38 +398,6 @@ private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex) return (BytesWritable)valueSerializer.serialize(cachedValues, valueObjectInspector); } - private int computeHashCode(VectorizedRowBatch vrg, int rowIndex) throws HiveException { - // Evaluate the HashCode - int keyHashCode = 0; - if (partitionEval.length == 0) { - // If no partition cols, just distribute the data uniformly to provide better - // load balance. If the requirement is to have a single reducer, we should set - // the number of reducers to 1. - // Use a constant seed to make the code deterministic. - if (random == null) { - random = new Random(12345); - } - keyHashCode = random.nextInt(); - } else { - for (int p = 0; p < partitionEval.length; p++) { - ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()]; - Object partitionValue = partitionWriters[p].writeValue(columnVector, rowIndex); - keyHashCode = keyHashCode - * 31 - + ObjectInspectorUtils.hashCode( - partitionValue, - partitionWriters[p].getObjectInspector()); - } - } - return keyHashCode; - } - - private int computeHashCode(VectorizedRowBatch vrg, int rowIndex, int buckNum) throws HiveException { - int keyHashCode = computeHashCode(vrg, rowIndex); - keyHashCode = keyHashCode * 31 + buckNum; - return keyHashCode; - } - private int computeBucketNumber(VectorizedRowBatch vrg, int rowIndex, int numBuckets) throws HiveException { int bucketNum = 0; for (int p = 0; p < bucketEval.length; p++) {