diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java index dbed9e1..22a6354 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java @@ -56,21 +56,15 @@ public Object attachBucketIdToRecord(Object record) { return record; } - /** Based on: {@link org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, int)}. */ @Override public int computeBucketId(Object record) { - int bucketId = 1; - + Object[] bucketFieldValues = new Object[bucketFields.length]; + ObjectInspector[] bucketFiledInspectors = new ObjectInspector[bucketFields.length]; for (int columnIndex = 0; columnIndex < bucketFields.length; columnIndex++) { - Object columnValue = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]); - bucketId = bucketId * 31 + ObjectInspectorUtils.hashCode(columnValue, bucketFields[columnIndex].getFieldObjectInspector()); - } - - if (bucketId < 0) { - bucketId = -1 * bucketId; + bucketFieldValues[columnIndex] = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]); + bucketFiledInspectors[columnIndex] = bucketFields[columnIndex].getFieldObjectInspector(); } - - return bucketId % totalBuckets; + return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketFiledInspectors, bucketFields.length); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 39944a9..e247673 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -791,12 +791,11 @@ private int findWriterOffset(Object row) throws HiveException { if (!multiFileSpray) { return 0; } else { - int keyHashCode = 0; - for (int i = 0; i < partitionEval.length; i++) { - Object o = partitionEval[i].evaluate(row); - keyHashCode = keyHashCode * 31 - + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); + Object[] bucketFieldValues = new Object[partitionEval.length]; + for(int i = 0; i < partitionEval.length; i++) { + bucketFieldValues[i] = partitionEval[i].evaluate(row); } + int keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors); key.setHashCode(keyHashCode); int bucketNum = prtner.getBucket(key, null, totalFiles); return bucketMap.get(bucketNum); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index f1df608..dd08210 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -405,27 +405,24 @@ public void process(Object row, int tag) throws HiveException { } private int computeBucketNumber(Object row, int numBuckets) throws HiveException { - int buckNum = 0; - if (conf.getWriteType() == AcidUtils.Operation.UPDATE || conf.getWriteType() == AcidUtils.Operation.DELETE) { - // We don't need to evalute the hash code. Instead read the bucket number directly from + // We don't need to evaluate the hash code. Instead read the bucket number directly from // the row. I don't need to evaluate any expressions as I know I am reading the ROW__ID // column directly. Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField); - buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField)); + int buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField)); if (isLogTraceEnabled) { LOG.trace("Acid choosing bucket number " + buckNum); } + return buckNum; } else { + Object[] bucketFieldValues = new Object[bucketEval.length]; for (int i = 0; i < bucketEval.length; i++) { - Object o = bucketEval[i].evaluate(row); - buckNum = buckNum * 31 + ObjectInspectorUtils.hashCode(o, bucketObjectInspectors[i]); + bucketFieldValues[i] = bucketEval[i].evaluate(row); } + return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketObjectInspectors, numBuckets); } - - // similar to hive's default partitioner, refer DefaultHivePartitioner - return (buckNum & Integer.MAX_VALUE) % numBuckets; } private void populateCachedDistributionKeys(Object row, int index) throws HiveException { @@ -476,11 +473,11 @@ private int computeHashCode(Object row, int buckNum) throws HiveException { keyHashCode = 1; } } else { - for (int i = 0; i < partitionEval.length; i++) { - Object o = partitionEval[i].evaluate(row); - keyHashCode = keyHashCode * 31 - + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); + Object[] bucketFieldValues = new Object[partitionEval.length]; + for(int i = 0; i < partitionEval.length; i++) { + bucketFieldValues[i] = partitionEval[i].evaluate(row); } + keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors); } int hashCode = buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum; if (isLogTraceEnabled) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java index 6a91cb8..6a14fb8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.mapred.lib.HashPartitioner; /** Partition keys by their {@link Object#hashCode()}. */ @@ -26,7 +27,7 @@ /** Use {@link Object#hashCode()} to partition. */ @Override public int getBucket(K2 key, V2 value, int numBuckets) { - return (key.hashCode() & Integer.MAX_VALUE) % numBuckets; + return ObjectInspectorUtils.getBucketNumber(key.hashCode(), numBuckets); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java index 474f404..fd1fe92 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.udf.generic; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -45,13 +44,11 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { - // See - // http://java.sun.com/j2se/1.5.0/docs/api/java/util/List.html#hashCode() - int r = 0; - for (int i = 0; i < arguments.length; i++) { - r = r * 31 - + ObjectInspectorUtils.hashCode(arguments[i].get(), argumentOIs[i]); + Object[] fieldValues = new Object[arguments.length]; + for(int i = 0; i < arguments.length; i++) { + fieldValues[i] = arguments[i].get(); } + int r = ObjectInspectorUtils.getBucketHashCode(fieldValues, argumentOIs); result.set(r); return result; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java index 54ae48e..09e9108 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java @@ -502,18 +502,23 @@ public static String getObjectInspectorName(ObjectInspector oi) { * @return the bucket number */ public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int totalBuckets) { - int hashCode = getBucketHashCode(bucketFields, bucketFieldInspectors); - int bucketID = (hashCode & Integer.MAX_VALUE) % totalBuckets; - return bucketID; + return getBucketNumber(getBucketHashCode(bucketFields, bucketFieldInspectors), totalBuckets); } /** + * https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL+BucketedTables + * @param hashCode as produced by {@link #getBucketHashCode(Object[], ObjectInspector[])} + */ + public static int getBucketNumber(int hashCode, int numberOfBuckets) { + return (hashCode & Integer.MAX_VALUE) % numberOfBuckets; + } + /** * Computes the hash code for the given bucketed fields * @param bucketFields * @param bucketFieldInspectors * @return */ - private static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) { + public static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) { int hashCode = 0; for (int i = 0; i < bucketFields.length; i++) { int fieldHash = ObjectInspectorUtils.hashCode(bucketFields[i], bucketFieldInspectors[i]);