diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index a9885d8..5f7c4a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -56,7 +56,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.hash.MurmurHash; /** @@ -83,6 +83,7 @@ private boolean skipTag = false; private transient InspectableObject tempInspectableObject = new InspectableObject(); private transient int[] valueIndex; // index for value(+ from keys, - from values) + private transient short hashSeed = 31; protected transient OutputCollector out; /** @@ -243,6 +244,10 @@ protected void initializeOp(Configuration hconf) throws HiveException { useUniformHash = conf.getReducerTraits().contains(UNIFORM); + if (hconf.getInt(JobContext.NUM_REDUCES, -1) == 31) { + hashSeed = 131; + } + firstRow = true; } catch (Exception e) { String msg = "Error initializing ReduceSinkOperator: " + e.getMessage(); @@ -490,7 +495,7 @@ private int computeHashCode(Object row, int buckNum) throws HiveException { for(int i = 0; i < partitionEval.length; i++) { bucketFieldValues[i] = partitionEval[i].evaluate(row); } - keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors); + keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors, hashSeed); } int hashCode = buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum; if (isLogTraceEnabled) { diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java index 1ac72c6..77a0a8c 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java @@ -634,6 +634,23 @@ public static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] buc return hashCode; } + /** + * + * Computes the hash code for the given bucketed fields with specified seed + * @param bucketFields + * @param bucketFieldInspectors + * @param hashSeed + * @return + */ + public static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int hashSeed) { + int hashCode = 0; + for (int i = 0; i < bucketFields.length; i++) { + int fieldHash = ObjectInspectorUtils.hashCode(bucketFields[i], bucketFieldInspectors[i]); + hashCode = hashSeed * hashCode + fieldHash; + } + return hashCode; + } + public static int hashCode(Object o, ObjectInspector objIns) { if (o == null) {