diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java index dbed9e1..bb9462d 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java @@ -56,21 +56,15 @@ public Object attachBucketIdToRecord(Object record) { return record; } - /** Based on: {@link org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, int)}. */ @Override public int computeBucketId(Object record) { - int bucketId = 1; - + Object[] bucketFieldValues = new Object[bucketFields.length]; + ObjectInspector[] bucketFiledInspectors = new ObjectInspector[bucketFields.length]; for (int columnIndex = 0; columnIndex < bucketFields.length; columnIndex++) { - Object columnValue = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]); - bucketId = bucketId * 31 + ObjectInspectorUtils.hashCode(columnValue, bucketFields[columnIndex].getFieldObjectInspector()); - } - - if (bucketId < 0) { - bucketId = -1 * bucketId; + bucketFieldValues[columnIndex] = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]); + bucketFiledInspectors[columnIndex] = bucketFields[columnIndex].getFieldObjectInspector(); } - - return bucketId % totalBuckets; + return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketFiledInspectors, totalBuckets); } } diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java index f81373e..5297c5d 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java @@ -23,7 +23,7 @@ public void testAttachBucketIdToRecord() { MutableRecord record = new MutableRecord(1, "hello"); capturingBucketIdResolver.attachBucketIdToRecord(record); - assertThat(record.rowId, is(new RecordIdentifier(-1L, 8, -1L))); + assertThat(record.rowId, is(new RecordIdentifier(-1L, 1, -1L))); assertThat(record.id, is(1)); assertThat(record.msg.toString(), is("hello")); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 39944a9..e247673 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -791,12 +791,11 @@ private int findWriterOffset(Object row) throws HiveException { if (!multiFileSpray) { return 0; } else { - int keyHashCode = 0; - for (int i = 0; i < partitionEval.length; i++) { - Object o = partitionEval[i].evaluate(row); - keyHashCode = keyHashCode * 31 - + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); + Object[] bucketFieldValues = new Object[partitionEval.length]; + for(int i = 0; i < partitionEval.length; i++) { + bucketFieldValues[i] = partitionEval[i].evaluate(row); } + int keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors); key.setHashCode(keyHashCode); int bucketNum = prtner.getBucket(key, null, totalFiles); return bucketMap.get(bucketNum); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index f1df608..dd08210 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -405,27 +405,24 @@ public void process(Object row, int tag) throws HiveException { } private int computeBucketNumber(Object row, int numBuckets) throws HiveException { - int buckNum = 0; - if (conf.getWriteType() == AcidUtils.Operation.UPDATE || conf.getWriteType() == AcidUtils.Operation.DELETE) { - // We don't need to evalute the hash code. Instead read the bucket number directly from + // We don't need to evaluate the hash code. Instead read the bucket number directly from // the row. I don't need to evaluate any expressions as I know I am reading the ROW__ID // column directly. Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField); - buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField)); + int buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField)); if (isLogTraceEnabled) { LOG.trace("Acid choosing bucket number " + buckNum); } + return buckNum; } else { + Object[] bucketFieldValues = new Object[bucketEval.length]; for (int i = 0; i < bucketEval.length; i++) { - Object o = bucketEval[i].evaluate(row); - buckNum = buckNum * 31 + ObjectInspectorUtils.hashCode(o, bucketObjectInspectors[i]); + bucketFieldValues[i] = bucketEval[i].evaluate(row); } + return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketObjectInspectors, numBuckets); } - - // similar to hive's default partitioner, refer DefaultHivePartitioner - return (buckNum & Integer.MAX_VALUE) % numBuckets; } private void populateCachedDistributionKeys(Object row, int index) throws HiveException { @@ -476,11 +473,11 @@ private int computeHashCode(Object row, int buckNum) throws HiveException { keyHashCode = 1; } } else { - for (int i = 0; i < partitionEval.length; i++) { - Object o = partitionEval[i].evaluate(row); - keyHashCode = keyHashCode * 31 - + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); + Object[] bucketFieldValues = new Object[partitionEval.length]; + for(int i = 0; i < partitionEval.length; i++) { + bucketFieldValues[i] = partitionEval[i].evaluate(row); } + keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors); } int hashCode = buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum; if (isLogTraceEnabled) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java index 6a91cb8..6a14fb8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.mapred.lib.HashPartitioner; /** Partition keys by their {@link Object#hashCode()}. */ @@ -26,7 +27,7 @@ /** Use {@link Object#hashCode()} to partition. */ @Override public int getBucket(K2 key, V2 value, int numBuckets) { - return (key.hashCode() & Integer.MAX_VALUE) % numBuckets; + return ObjectInspectorUtils.getBucketNumber(key.hashCode(), numBuckets); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java index 474f404..fd1fe92 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.udf.generic; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -45,13 +44,11 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { - // See - // http://java.sun.com/j2se/1.5.0/docs/api/java/util/List.html#hashCode() - int r = 0; - for (int i = 0; i < arguments.length; i++) { - r = r * 31 - + ObjectInspectorUtils.hashCode(arguments[i].get(), argumentOIs[i]); + Object[] fieldValues = new Object[arguments.length]; + for(int i = 0; i < arguments.length; i++) { + fieldValues[i] = arguments[i].get(); } + int r = ObjectInspectorUtils.getBucketHashCode(fieldValues, argumentOIs); result.set(r); return result; } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index db119e1..525117d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -20,13 +20,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import org.apache.hadoop.hive.ql.metadata.DummyPartition;import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; +import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService; diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java index 54ae48e..09e9108 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java @@ -502,18 +502,23 @@ public static String getObjectInspectorName(ObjectInspector oi) { * @return the bucket number */ public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int totalBuckets) { - int hashCode = getBucketHashCode(bucketFields, bucketFieldInspectors); - int bucketID = (hashCode & Integer.MAX_VALUE) % totalBuckets; - return bucketID; + return getBucketNumber(getBucketHashCode(bucketFields, bucketFieldInspectors), totalBuckets); } /** + * https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL+BucketedTables + * @param hashCode as produced by {@link #getBucketHashCode(Object[], ObjectInspector[])} + */ + public static int getBucketNumber(int hashCode, int numberOfBuckets) { + return (hashCode & Integer.MAX_VALUE) % numberOfBuckets; + } + /** * Computes the hash code for the given bucketed fields * @param bucketFields * @param bucketFieldInspectors * @return */ - private static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) { + public static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) { int hashCode = 0; for (int i = 0; i < bucketFields.length; i++) { int fieldHash = ObjectInspectorUtils.hashCode(bucketFields[i], bucketFieldInspectors[i]); diff --git serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java index ade0ef7..cf73b28 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java +++ serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java @@ -131,4 +131,29 @@ public void testObjectInspectorUtils() throws Throwable { } } + public void testBucketIdGeneration() { + ArrayList fieldNames = new ArrayList(); + fieldNames.add("firstInteger"); + fieldNames.add("secondString"); + fieldNames.add("thirdBoolean"); + ArrayList fieldObjectInspectors = new ArrayList(); + fieldObjectInspectors + .add(PrimitiveObjectInspectorFactory.javaIntObjectInspector); + fieldObjectInspectors + .add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + fieldObjectInspectors + .add(PrimitiveObjectInspectorFactory.javaBooleanObjectInspector); + + StandardStructObjectInspector soi1 = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldObjectInspectors); + ArrayList struct = new ArrayList(3); + struct.add(1); + struct.add("two"); + struct.add(true); + + int hashCode = ObjectInspectorUtils.getBucketHashCode(struct.toArray(), fieldObjectInspectors.toArray(new ObjectInspector[fieldObjectInspectors.size()])); + assertEquals("", 3574518, hashCode); + int bucketId = ObjectInspectorUtils.getBucketNumber(struct.toArray(), fieldObjectInspectors.toArray(new ObjectInspector[fieldObjectInspectors.size()]), 16); + assertEquals("", 6, bucketId); + assertEquals("", bucketId, ObjectInspectorUtils.getBucketNumber(hashCode, 16)); + } }