diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index 924e233293..6336378b72 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; @@ -185,7 +186,25 @@ protected int getBucket(Object row) throws SerializationError { } ObjectInspector[] inspectors = getBucketObjectInspectors(); Object[] bucketFields = getBucketFields(row); - return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets); + String versionStr = tbl.getParameters().get(hive_metastoreConstants.TABLE_BUCKETING_VERSION); + int bucketingVersion = 1; + if (versionStr != null) { + try { + bucketingVersion = Integer.parseInt(versionStr); + } catch (NumberFormatException e) { + // Do nothing, should not happen. + } + } + + if (bucketingVersion == 2) { + try { + return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets); + } catch (Exception e) { + throw new SerializationError("Failed to get Bucket Number", e); + } + } else { + return ObjectInspectorUtils.getBucketNumberOld(bucketFields, inspectors, totalBuckets); + } } @Override diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java index 5dd0b8ea5b..ca1cfed2f8 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java @@ -17,12 +17,14 @@ */ package org.apache.hive.hcatalog.streaming.mutate.worker; +import org.apache.hadoop.hive.serde2.SerDeException; + /** Computes and appends bucket ids to records that are due to be inserted. */ public interface BucketIdResolver { - Object attachBucketIdToRecord(Object record); + Object attachBucketIdToRecord(Object record) throws SerDeException; /** See: {@link org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, int)}. */ - int computeBucketId(Object record); + int computeBucketId(Object record) throws SerDeException; } diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java index 7c2cadefa7..23b094d5e7 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; @@ -68,7 +69,7 @@ public BucketIdResolverImpl(ObjectInspector objectInspector, int recordIdColumn, } @Override - public Object attachBucketIdToRecord(Object record) { + public Object attachBucketIdToRecord(Object record) throws SerDeException { int bucketId = computeBucketId(record); int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(null).bucket(bucketId)); @@ -78,7 +79,7 @@ public Object attachBucketIdToRecord(Object record) { } @Override - public int computeBucketId(Object record) { + public int computeBucketId(Object record) throws SerDeException { Object[] bucketFieldValues = new Object[bucketFields.length]; ObjectInspector[] bucketFiledInspectors = new ObjectInspector[bucketFields.length]; for (int columnIndex = 0; columnIndex < bucketFields.length; columnIndex++) { diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java index 5e804d7468..5e04a10d94 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable; import org.slf4j.Logger; @@ -212,7 +213,12 @@ private RecordIdentifier extractRecordIdentifier(OperationType operationType, Li RecordIdentifier recordIdentifier = recordInspector.extractRecordIdentifier(record); int bucketIdFromRecord = BucketCodec.determineVersion( recordIdentifier.getBucketProperty()).decodeWriterId(recordIdentifier.getBucketProperty()); - int computedBucketId = bucketIdResolver.computeBucketId(record); + int computedBucketId = -1; + try { + computedBucketId = bucketIdResolver.computeBucketId(record); + } catch (SerDeException e) { + throw new BucketIdException("SerDeException in fetching bucket Id"); + } if (operationType != OperationType.DELETE && bucketIdFromRecord != computedBucketId) { throw new BucketIdException("RecordIdentifier.bucketId != computed bucketId (" + computedBucketId + ") for record " + recordIdentifier + " in partition " + newPartitionValues + "."); diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java index 03c28a33c8..5ad6372e94 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hive.hcatalog.streaming.mutate.MutableRecord; import org.junit.Test; @@ -41,7 +42,11 @@ @Test public void testAttachBucketIdToRecord() { MutableRecord record = new MutableRecord(1, "hello"); - capturingBucketIdResolver.attachBucketIdToRecord(record); + try { + capturingBucketIdResolver.attachBucketIdToRecord(record); + } catch (SerDeException e) { + assert false; + } assertThat(record.rowId, is(new RecordIdentifier(-1L, BucketCodec.V1.encode(new AcidOutputFormat.Options(null).bucket(1)), -1L))); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 395a5f450f..399d0298a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; @@ -40,8 +39,10 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.Serializer; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; @@ -54,7 +55,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.util.hash.MurmurHash; +import org.apache.hive.common.util.Murmur3; /** * Reduce Sink Operator sends output to the reduce stage. @@ -70,7 +71,6 @@ } private static final long serialVersionUID = 1L; - private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance(); private transient ObjectInspector[] partitionObjectInspectors; private transient ObjectInspector[] bucketObjectInspectors; @@ -127,7 +127,7 @@ * This two dimensional array holds key data and a corresponding Union object * which contains the tag identifying the aggregate expression for distinct columns. * - * If there is no distict expression, cachedKeys is simply like this. + * If there is no distinct expression, cachedKeys is simply like this. * cachedKeys[0] = [col0][col1] * * with two distict expression, union(tag:key) is attatched for each distinct expression @@ -145,6 +145,8 @@ protected transient long logEveryNRows = 0; private final transient LongWritable recordCounter = new LongWritable(); + private int bucketingVersion; + /** Kryo ctor. */ protected ReduceSinkOperator() { super(); @@ -322,7 +324,7 @@ public void process(Object row, int tag) throws HiveException { } // Determine distKeyLength (w/o distincts), and then add the first if present. - populateCachedDistributionKeys(row, 0); + populateCachedDistributionKeys(row); // replace bucketing columns with hashcode % numBuckets int bucketNumber = -1; @@ -344,12 +346,16 @@ public void process(Object row, int tag) throws HiveException { final int hashCode; // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0] + LOG.info("UniformHash = " + useUniformHash + " bucketing version = " + bucketingVersion); if (useUniformHash && partitionEval.length > 0) { - hashCode = computeMurmurHash(firstKey); + hashCode = computeMurmurHash(firstKey.getBytes()); } else { - hashCode = computeHashCode(row, bucketNumber); + if (partitionEval.length > 0 && bucketingVersion == 2) { + hashCode = computeMurmurHashForBucketing(row); + } else { + hashCode = computeHashCode(row, bucketNumber); + } } - firstKey.setHashCode(hashCode); /* @@ -390,21 +396,37 @@ public void process(Object row, int tag) throws HiveException { } } - private int computeBucketNumber(Object row, int numBuckets) throws HiveException { + private int computeBucketNumber(Object row, int numBuckets) + throws HiveException, SerDeException { Object[] bucketFieldValues = new Object[bucketEval.length]; for (int i = 0; i < bucketEval.length; i++) { bucketFieldValues[i] = bucketEval[i].evaluate(row); } - return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketObjectInspectors, numBuckets); + if (bucketingVersion == 2) { + return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketObjectInspectors, numBuckets); + } else { + return ObjectInspectorUtils.getBucketNumberOld(bucketFieldValues, bucketObjectInspectors, numBuckets); + } } - private void populateCachedDistributionKeys(Object row, int index) throws HiveException { + private void populateCachedDistributionKeys(Object row) throws HiveException { for (int i = 0; i < numDistributionKeys; i++) { - cachedKeys[index][i] = keyEval[i].evaluate(row); + cachedKeys[0][i] = keyEval[i].evaluate(row); } if (cachedKeys[0].length > numDistributionKeys) { - cachedKeys[index][numDistributionKeys] = null; + cachedKeys[0][numDistributionKeys] = null; + } + } + + private int computeMurmurHashForBucketing(Object row) throws HiveException, SerDeException { + ByteStream.Output output = new ByteStream.Output(); + for (int i = 0; i < partitionEval.length; i++) { + Object obj = partitionEval[i].evaluate(row); + ObjectInspectorUtils.getSerializedBucketKey(output, partitionEval[i].evaluate(row), + partitionObjectInspectors[i]); } + // Compute the hash + return computeMurmurHash(output.getData()); } /** @@ -424,8 +446,8 @@ private void populateCachedDistinctKeys(Object row, int index) throws HiveExcept union.setTag((byte) index); } - protected final int computeMurmurHash(HiveKey firstKey) { - return hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0); + protected final int computeMurmurHash(byte[] keyBytes) { + return Murmur3.hash32(keyBytes); } /** @@ -597,4 +619,12 @@ public String getReduceOutputName() { public void setOutputCollector(OutputCollector _out) { this.out = _out; } + + public void setBucketingVersion(int bucketingVersion) { + this.bucketingVersion = bucketingVersion; + } + + public int getBucketingVersion() { + return bucketingVersion; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSerializedImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSerializedImpl.java index 86f466fc4e..a3b076fa0a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSerializedImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSerializedImpl.java @@ -24,6 +24,7 @@ import org.apache.hive.common.util.HashCodeUtil; import com.google.common.base.Preconditions; +import org.apache.hive.common.util.Murmur3; /** * Implementation of base serialization interface. @@ -103,7 +104,7 @@ protected void computeSerializedHashCodes() { byte[] bytes = output.getData(); for (int i = 0; i < nonNullKeyCount; i++) { keyLength = serializedKeyLengths[i]; - hashCodes[i] = HashCodeUtil.murmurHash(bytes, offset, keyLength); + hashCodes[i] = Murmur3.hash32(bytes, offset, keyLength, 0); offset += keyLength; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java index 8dd7cfe58c..1f6fd44132 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java @@ -147,6 +147,9 @@ // Debug display. protected transient long batchCounter; + // Bucketing version + protected int bucketingVersion; + //--------------------------------------------------------------------------- /** Kryo ctor. */ @@ -455,4 +458,8 @@ public VectorizationContext getInputVectorizationContext() { public VectorDesc getVectorDesc() { return vectorDesc; } + + public void setBucketingVersion(int bucketingVersion) { + this.bucketingVersion = bucketingVersion; + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java index 1eb72ce4d9..0e1eacfd81 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; @@ -68,6 +69,7 @@ import org.apache.hive.common.util.HashCodeUtil; import com.google.common.base.Preconditions; +import org.apache.hive.common.util.Murmur3; /** * This class is the object hash (not Uniform Hash) operator class for native vectorized reduce sink. @@ -254,61 +256,121 @@ public void process(Object row, int tag) throws HiveException { int[] selected = batch.selected; final int size = batch.size; - for (int logical = 0; logical < size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - - final int hashCode; - if (isEmptyBuckets) { - if (isEmptyPartitions) { - hashCode = nonPartitionRandom.nextInt(); - } else { - partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); - hashCode = - ObjectInspectorUtils.getBucketHashCode( - partitionFieldValues, partitionObjectInspectors); + + // EmptyBuckets = true + if (isEmptyBuckets) { + if (isEmptyPartitions) { + for (int logical = 0; logical< size; logical++) { + final int batchIndex = (selectedInUse ? selected[logical] : logical); + final int hashCode = nonPartitionRandom.nextInt(); + postProcess(batch, batchIndex, tag, hashCode); } - } else { - bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); - final int bucketNum = - ObjectInspectorUtils.getBucketNumber( - bucketFieldValues, bucketObjectInspectors, numBuckets); - if (isEmptyPartitions) { - hashCode = nonPartitionRandom.nextInt() * 31 + bucketNum; - } else { + } else { // isEmptyPartition = false + for (int logical = 0; logical< size; logical++) { + final int batchIndex = (selectedInUse ? selected[logical] : logical); partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); - hashCode = - ObjectInspectorUtils.getBucketHashCode( - partitionFieldValues, partitionObjectInspectors) * 31 + bucketNum; + final int hashCode; + if (bucketingVersion == 2) { + ByteStream.Output out = new ByteStream.Output(); + for (int i = 0; i < partitionFieldValues.length; i++) { + ObjectInspectorUtils.getSerializedBucketKey(out, + partitionFieldValues[i], partitionObjectInspectors[i]); + } + hashCode = Murmur3.hash32(out.getData()); + } else { // old bucketing logic + hashCode = ObjectInspectorUtils.getBucketHashCode( + partitionFieldValues, partitionObjectInspectors); + } + postProcess(batch, batchIndex, tag, hashCode); } } - - if (!isEmptyKey) { - keyBinarySortableSerializeWrite.reset(); - keyVectorSerializeRow.serializeWrite(batch, batchIndex); - - // One serialized key for 1 or more rows for the duplicate keys. - final int keyLength = keyOutput.getLength(); - if (tag == -1 || reduceSkipTag) { - keyWritable.set(keyOutput.getData(), 0, keyLength); - } else { - keyWritable.setSize(keyLength + 1); - System.arraycopy(keyOutput.getData(), 0, keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = reduceTagByte; + } else { // EmptyBuckets = false + if (isEmptyPartitions) { + for (int logical = 0; logical< size; logical++) { + final int batchIndex = (selectedInUse ? selected[logical] : logical); + bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); + final int bucketNum; + if (bucketingVersion == 2) { + bucketNum = ObjectInspectorUtils.getBucketNumber(bucketFieldValues, + bucketObjectInspectors, numBuckets); + } else { // old bucketing logic + bucketNum = ObjectInspectorUtils.getBucketNumberOld( + bucketFieldValues, bucketObjectInspectors, numBuckets); + } + final int hashCode = nonPartitionRandom.nextInt() * 31 + bucketNum; + postProcess(batch, batchIndex, tag, hashCode); + } + } else { // isEmptyPartition = false + for (int logical = 0; logical< size; logical++) { + final int batchIndex = (selectedInUse ? selected[logical] : logical); + partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); + final int hashCode; + if (bucketingVersion == 2) { + ByteStream.Output out = new ByteStream.Output(); + for (int i = 0; i < partitionFieldValues.length; i++) { + ObjectInspectorUtils.getSerializedBucketKey(out, + partitionFieldValues[i], partitionObjectInspectors[i]); + } + hashCode = Murmur3.hash32(out.getData()); + } else { // old bucketing logic + bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); + final int bucketNum = + ObjectInspectorUtils.getBucketNumberOld( + bucketFieldValues, bucketObjectInspectors, numBuckets); + hashCode = ObjectInspectorUtils.getBucketHashCode( + partitionFieldValues, partitionObjectInspectors) * 31 + bucketNum; + } + postProcess(batch, batchIndex, tag, hashCode); } - keyWritable.setDistKeyLength(keyLength); } + } + } catch (Exception e) { + throw new HiveException(e); + } + } + + private void processKey(VectorizedRowBatch batch, int batchIndex, int tag) + throws HiveException{ + if (isEmptyKey) return; + + try { + keyBinarySortableSerializeWrite.reset(); + keyVectorSerializeRow.serializeWrite(batch, batchIndex); + + // One serialized key for 1 or more rows for the duplicate keys. + final int keyLength = keyOutput.getLength(); + if (tag == -1 || reduceSkipTag) { + keyWritable.set(keyOutput.getData(), 0, keyLength); + } else { + keyWritable.setSize(keyLength + 1); + System.arraycopy(keyOutput.getData(), 0, keyWritable.get(), 0, keyLength); + keyWritable.get()[keyLength] = reduceTagByte; + } + keyWritable.setDistKeyLength(keyLength); + } catch (Exception e) { + throw new HiveException(e); + } + } - keyWritable.setHashCode(hashCode); + private void processValue(VectorizedRowBatch batch, int batchIndex) throws HiveException { + if (isEmptyValue) return; - if (!isEmptyValue) { - valueLazyBinarySerializeWrite.reset(); - valueVectorSerializeRow.serializeWrite(batch, batchIndex); + try { + valueLazyBinarySerializeWrite.reset(); + valueVectorSerializeRow.serializeWrite(batch, batchIndex); - valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength()); - } + valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength()); + } catch (Exception e) { + throw new HiveException(e); + } + } - collect(keyWritable, valueBytesWritable); - } + private void postProcess(VectorizedRowBatch batch, int batchIndex, int tag, int hashCode) throws HiveException { + try { + processKey(batch, batchIndex, tag); + keyWritable.setHashCode(hashCode); + processValue(batch, batchIndex); + collect(keyWritable, valueBytesWritable); } catch (Exception e) { throw new HiveException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index 632a21390d..d8946f0cec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -189,6 +189,9 @@ public void setTTable(org.apache.hadoop.hive.metastore.api.Table tTable) { // set create time t.setCreateTime((int) (System.currentTimeMillis() / 1000)); } + // Explictly set the bucketing version + t.getParameters().put(hive_metastoreConstants.TABLE_BUCKETING_VERSION, + "2"); return t; } @@ -449,6 +452,19 @@ public StructField getField(String fld) { } } + public int getBucketingVersion() { + String versionStr = getProperty(hive_metastoreConstants.TABLE_BUCKETING_VERSION); + int bucketingVersion = 1; + if (versionStr != null) { + try { + bucketingVersion = Integer.parseInt(versionStr); + } catch (NumberFormatException e) { + // Do nothing + } + } + return bucketingVersion; + } + @Override public String toString() { return tTable.getTableName(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index a235f3fbf4..6db7593337 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -180,7 +180,8 @@ MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos, true); // map join operator by default has no bucket cols and num of reduce sinks // reduced by 1 - mapJoinOp.setOpTraits(new OpTraits(null, -1, null, joinOp.getOpTraits().getNumReduceSinks())); + mapJoinOp.setOpTraits(new OpTraits(null, -1, null, joinOp.getOpTraits().getNumReduceSinks(), + joinOp.getOpTraits().getBucketingVersion())); mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS for (Operator childOp : mapJoinOp.getChildOperators()) { @@ -378,7 +379,7 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont joinOp.getSchema()); int numReduceSinks = joinOp.getOpTraits().getNumReduceSinks(); OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, - joinOp.getOpTraits().getSortCols(), numReduceSinks); + joinOp.getOpTraits().getSortCols(), numReduceSinks, joinOp.getOpTraits().getBucketingVersion()); mergeJoinOp.setOpTraits(opTraits); mergeJoinOp.setStatistics(joinOp.getStatistics()); @@ -445,7 +446,8 @@ private void setAllChildrenTraits(Operator currentOp, Op return; } currentOp.setOpTraits(new OpTraits(opTraits.getBucketColNames(), - opTraits.getNumBuckets(), opTraits.getSortCols(), opTraits.getNumReduceSinks())); + opTraits.getNumBuckets(), opTraits.getSortCols(), opTraits.getNumReduceSinks(), + opTraits.getBucketingVersion())); for (Operator childOp : currentOp.getChildOperators()) { if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) { break; @@ -498,7 +500,8 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon // we can set the traits for this join operator opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), - tezBucketJoinProcCtx.getNumBuckets(), null, joinOp.getOpTraits().getNumReduceSinks()); + tezBucketJoinProcCtx.getNumBuckets(), null, joinOp.getOpTraits().getNumReduceSinks(), + joinOp.getOpTraits().getBucketingVersion()); mapJoinOp.setOpTraits(opTraits); mapJoinOp.setStatistics(joinOp.getStatistics()); setNumberOfBucketsOnChildren(mapJoinOp); @@ -1169,7 +1172,8 @@ private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, Optim joinOp.getOpTraits().getBucketColNames(), numReducers, null, - joinOp.getOpTraits().getNumReduceSinks()); + joinOp.getOpTraits().getNumReduceSinks(), + joinOp.getOpTraits().getBucketingVersion()); mapJoinOp.setOpTraits(opTraits); mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java index 5f65f638ca..160efb2c44 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; @@ -84,8 +85,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, @Override protected void generatePredicate(NodeProcessorCtx procCtx, - FilterOperator fop, TableScanOperator top) throws SemanticException, - UDFArgumentException { + FilterOperator fop, TableScanOperator top) throws SemanticException { FixedBucketPruningOptimizerCtxt ctxt = ((FixedBucketPruningOptimizerCtxt) procCtx); Table tbl = top.getConf().getTableMetadata(); if (tbl.getNumBuckets() > 0) { @@ -122,8 +122,7 @@ protected void generatePredicate(NodeProcessorCtx procCtx, @Override protected void generatePredicate(NodeProcessorCtx procCtx, - FilterOperator fop, TableScanOperator top) throws SemanticException, - UDFArgumentException { + FilterOperator fop, TableScanOperator top) throws SemanticException { FixedBucketPruningOptimizerCtxt ctxt = ((FixedBucketPruningOptimizerCtxt) procCtx); if (ctxt.getNumBuckets() <= 0 || ctxt.getBucketCols().size() != 1) { // bucketing isn't consistent or there are >1 bucket columns @@ -225,6 +224,8 @@ protected void generatePredicate(NodeProcessorCtx procCtx, bs.clear(); PrimitiveObjectInspector bucketOI = (PrimitiveObjectInspector)bucketField.getFieldObjectInspector(); PrimitiveObjectInspector constOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(bucketOI.getPrimitiveCategory()); + int bucketingVersion = top.getOpTraits().getBucketingVersion(); + for (Object literal: literals) { PrimitiveObjectInspector origOI = PrimitiveObjectInspectorFactory.getPrimitiveObjectInspectorFromClass(literal.getClass()); Converter conv = ObjectInspectorConverters.getConverter(origOI, constOI); @@ -233,7 +234,16 @@ protected void generatePredicate(NodeProcessorCtx procCtx, return; } Object convCols[] = new Object[] {conv.convert(literal)}; - int n = ObjectInspectorUtils.getBucketNumber(convCols, new ObjectInspector[]{constOI}, ctxt.getNumBuckets()); + int n = 0; + if (bucketingVersion == 2) { + try { + ObjectInspectorUtils.getBucketNumber(convCols, new ObjectInspector[]{constOI}, ctxt.getNumBuckets()); + } catch (SerDeException e) { + throw new SemanticException("Get BucketNumber threw SerDeException", e); + } + } else { + ObjectInspectorUtils.getBucketNumberOld(convCols, new ObjectInspector[]{constOI}, ctxt.getNumBuckets()); + } bs.set(n); if (ctxt.isCompat()) { int h = ObjectInspectorUtils.getBucketHashCode(convCols, new ObjectInspector[]{constOI}); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java index 2be3c9b9a2..1626e26782 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.serde2.SerDeException; /** * Operator factory for pruning processing of operator graph We find @@ -101,7 +102,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, * @throws UDFArgumentException */ protected abstract void generatePredicate(NodeProcessorCtx procCtx, FilterOperator fop, - TableScanOperator top) throws SemanticException, UDFArgumentException; + TableScanOperator top) throws SemanticException; /** * Add pruning predicate. * diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java index 69d9f3125a..095c6b756a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java @@ -92,10 +92,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, List> listBucketCols = new ArrayList>(); int numBuckets = -1; int numReduceSinks = 1; + int bucketingVersion = -1; OpTraits parentOpTraits = rs.getParentOperators().get(0).getOpTraits(); if (parentOpTraits != null) { numBuckets = parentOpTraits.getNumBuckets(); numReduceSinks += parentOpTraits.getNumReduceSinks(); + bucketingVersion = parentOpTraits.getBucketingVersion(); } List bucketCols = new ArrayList<>(); @@ -134,8 +136,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } listBucketCols.add(bucketCols); - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols, numReduceSinks); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, + listBucketCols, numReduceSinks, bucketingVersion); rs.setOpTraits(opTraits); + rs.setBucketingVersion(bucketingVersion); return null; } } @@ -213,7 +217,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, sortedColsList.add(sortCols); } // num reduce sinks hardcoded to 0 because TS has no parents - OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList, 0); + OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, + sortedColsList, 0, table.getBucketingVersion()); ts.setOpTraits(opTraits); return null; } @@ -239,12 +244,15 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, List> listBucketCols = new ArrayList>(); int numReduceSinks = 0; + int bucketingVersion = -1; OpTraits parentOpTraits = gbyOp.getParentOperators().get(0).getOpTraits(); if (parentOpTraits != null) { numReduceSinks = parentOpTraits.getNumReduceSinks(); + bucketingVersion = parentOpTraits.getBucketingVersion(); } listBucketCols.add(gbyKeys); - OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols, numReduceSinks); + OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols, + numReduceSinks, bucketingVersion); gbyOp.setOpTraits(opTraits); return null; } @@ -298,12 +306,15 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, int numBuckets = -1; int numReduceSinks = 0; + int bucketingVersion = -1; OpTraits parentOpTraits = selOp.getParentOperators().get(0).getOpTraits(); if (parentOpTraits != null) { numBuckets = parentOpTraits.getNumBuckets(); numReduceSinks = parentOpTraits.getNumReduceSinks(); + bucketingVersion = parentOpTraits.getBucketingVersion(); } - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols, numReduceSinks); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols, + numReduceSinks, bucketingVersion); selOp.setOpTraits(opTraits); return null; } @@ -319,6 +330,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, List> sortColsList = new ArrayList>(); byte pos = 0; int numReduceSinks = 0; // will be set to the larger of the parents + boolean bucketingVersionSeen = false; for (Operator parentOp : joinOp.getParentOperators()) { if (!(parentOp instanceof ReduceSinkOperator)) { // can be mux operator @@ -338,7 +350,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, pos++; } - joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList, numReduceSinks)); + joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList, numReduceSinks, -1)); return null; } @@ -392,6 +404,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Operator operator = (Operator) nd; int numReduceSinks = 0; + int bucketingVersion = -1; + boolean bucketingVersionSeen = false; for (Operator parentOp : operator.getParentOperators()) { if (parentOp.getOpTraits() == null) { continue; @@ -399,8 +413,17 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (parentOp.getOpTraits().getNumReduceSinks() > numReduceSinks) { numReduceSinks = parentOp.getOpTraits().getNumReduceSinks(); } + // If there is mismatch in bucketingVersion, then it should be set to + // -1, that way SMB will be disabled. + if (bucketingVersion == -1 && !bucketingVersionSeen) { + bucketingVersion = parentOp.getOpTraits().getBucketingVersion(); + bucketingVersionSeen = true; + } else if (bucketingVersion != parentOp.getOpTraits().getBucketingVersion()) { + bucketingVersion = -1; + } } - OpTraits opTraits = new OpTraits(null, -1, null, numReduceSinks); + OpTraits opTraits = new OpTraits(null, -1, + null, numReduceSinks, bucketingVersion); operator.setOpTraits(opTraits); return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 783a672c47..8bb4c85b81 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -44,6 +44,7 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.calcite.util.Pair; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.*; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,11 +73,6 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator; import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFOperator; -import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkEmptyKeyOperator; -import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkLongOperator; -import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkMultiKeyOperator; -import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkObjectHashOperator; -import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkStringOperator; import org.apache.hadoop.hive.ql.exec.vector.udf.VectorUDFAdaptor; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; @@ -3591,6 +3587,9 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi LOG.info("Vectorizer vectorizeOperator reduce sink class " + opClass.getSimpleName()); + // Get the bucketing version + int bucketingVersion = ((ReduceSinkOperator)op).getBucketingVersion(); + Operator vectorOp = null; try { vectorOp = OperatorFactory.getVectorOperator( @@ -3602,6 +3601,10 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi throw new HiveException(e); } + // Set the bucketing version + Preconditions.checkArgument(vectorOp instanceof VectorReduceSinkCommonOperator); + ((VectorReduceSinkCommonOperator)vectorOp).setBucketingVersion(bucketingVersion); + return vectorOp; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index bacc44482a..39d2370435 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -121,7 +121,8 @@ } // we can set the traits for this join operator - OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null, joinOp.getOpTraits().getNumReduceSinks()); + OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null, + joinOp.getOpTraits().getNumReduceSinks(), joinOp.getOpTraits().getBucketingVersion()); mapJoinOp.setOpTraits(opTraits); mapJoinOp.setStatistics(joinOp.getStatistics()); setNumberOfBucketsOnChildren(mapJoinOp); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java index 9621c3be53..d3b62ce799 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java @@ -22,17 +22,20 @@ public class OpTraits { - List> bucketColNames; - List> sortColNames; - int numBuckets; - int numReduceSinks; + private List> bucketColNames; + private List> sortColNames; + private int numBuckets; + private int numReduceSinks; + private int bucketingVersion; public OpTraits(List> bucketColNames, int numBuckets, - List> sortColNames, int numReduceSinks) { + List> sortColNames, int numReduceSinks, + int bucketingVersion) { this.bucketColNames = bucketColNames; this.numBuckets = numBuckets; this.sortColNames = sortColNames; this.numReduceSinks = numReduceSinks; + this.bucketingVersion = bucketingVersion; } public List> getBucketColNames() { @@ -68,10 +71,17 @@ public int getNumReduceSinks() { return this.numReduceSinks; } - + public void setBucketingVersion(int bucketingVersion) { + this.bucketingVersion = bucketingVersion; + } + + public int getBucketingVersion() { + return bucketingVersion; + } + @Override public String toString() { return "{ bucket column names: " + bucketColNames + "; sort column names: " - + sortColNames + "; bucket count: " + numBuckets + " }"; + + sortColNames + "; bucket count: " + numBuckets + "; bucketing version: " + bucketingVersion + " }"; } } diff --git a/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_3.q b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_3.q index ff4cde2c30..3db1563f88 100644 --- a/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_3.q +++ b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_3.q @@ -14,8 +14,8 @@ limit 1; set hive.auto.convert.join=true; set hive.optimize.dynamic.partition.hashjoin=true; -set hive.auto.convert.join.noconditionaltask.size=200000; -set hive.exec.reducers.bytes.per.reducer=200000; +set hive.auto.convert.join.noconditionaltask.size=20000; +set hive.exec.reducers.bytes.per.reducer=2000; explain select a.* diff --git a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_2.q.out b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_2.q.out index 054b0d00be..942738dae1 100644 --- a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_2.q.out +++ b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_2.q.out @@ -147,6 +147,7 @@ STAGE PLANS: SORTBUCKETCOLSPREFIX TRUE bucket_count 4 bucket_field_name key + bucketing_version 2 column.name.delimiter , columns key,value columns.comments @@ -234,6 +235,7 @@ STAGE PLANS: SORTBUCKETCOLSPREFIX TRUE bucket_count 2 bucket_field_name key + bucketing_version 2 column.name.delimiter , columns key,value columns.comments @@ -282,6 +284,7 @@ STAGE PLANS: SORTBUCKETCOLSPREFIX TRUE bucket_count 2 bucket_field_name key + bucketing_version 2 column.name.delimiter , columns key,value columns.comments @@ -421,6 +424,7 @@ STAGE PLANS: SORTBUCKETCOLSPREFIX TRUE bucket_count 4 bucket_field_name key + bucketing_version 2 column.name.delimiter , columns key,value columns.comments @@ -508,6 +512,7 @@ STAGE PLANS: SORTBUCKETCOLSPREFIX TRUE bucket_count 2 bucket_field_name key + bucketing_version 2 column.name.delimiter , columns key,value columns.comments @@ -556,6 +561,7 @@ STAGE PLANS: SORTBUCKETCOLSPREFIX TRUE bucket_count 2 bucket_field_name key + bucketing_version 2 column.name.delimiter , columns key,value columns.comments diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java index e5b90c25e9..1dc9de7273 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java @@ -700,7 +700,7 @@ public static void writeByte(RandomAccessOutput buffer, byte b, boolean invert) buffer.write(b); } - static void serialize(ByteStream.Output buffer, Object o, ObjectInspector oi, + public static void serialize(ByteStream.Output buffer, Object o, ObjectInspector oi, boolean invert, byte nullMarker, byte notNullMarker) throws SerDeException { // Is this field a null? if (o == null) { diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java index efb1df6495..e0ca05f7e2 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java @@ -27,9 +27,12 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; import org.apache.hadoop.hive.serde2.io.TimestampLocalTZWritable; import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableTimestampLocalTZObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampLocalTZObjectInspector; +import org.apache.hive.common.util.Murmur3; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.serde.serdeConstants; @@ -615,7 +618,26 @@ public static String getObjectInspectorName(ObjectInspector oi) { * @param totalBuckets the number of buckets in the table * @return the bucket number */ - public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int totalBuckets) { + public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int totalBuckets) + throws SerDeException { + ByteStream.Output output = new ByteStream.Output(); + for (int i = 0; i < bucketFields.length; i++) { + getSerializedBucketKey(output, bucketFields[i], bucketFieldInspectors[i]); + } + + + return ((Murmur3.hash32(output.getData())) & Integer.MAX_VALUE) % totalBuckets; + } + + /** + * Computes the bucket number to which the bucketFields belong to + * @param bucketFields the bucketed fields of the row + * @param bucketFieldInspectors the ObjectInpsectors for each of the bucketed fields + * @param totalBuckets the number of buckets in the table + * @return the bucket number + */ + @Deprecated + public static int getBucketNumberOld(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int totalBuckets) { return getBucketNumber(getBucketHashCode(bucketFields, bucketFieldInspectors), totalBuckets); } @@ -631,6 +653,13 @@ public static int getBucketNumber(int hashCode, int numberOfBuckets) { } return (hashCode & Integer.MAX_VALUE) % numberOfBuckets; } + + public static void getSerializedBucketKey(ByteStream.Output output, + Object field, ObjectInspector oi) + throws SerDeException { + BinarySortableSerDe.serialize(output, field, oi,false, (byte)0, (byte)1); + } + /** * Computes the hash code for the given bucketed fields * @param bucketFields diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java index eca74057a3..315106ff7a 100644 --- a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java @@ -152,7 +152,7 @@ public void testBucketIdGeneration() { int hashCode = ObjectInspectorUtils.getBucketHashCode(struct.toArray(), fieldObjectInspectors.toArray(new ObjectInspector[fieldObjectInspectors.size()])); assertEquals("", 3574518, hashCode); - int bucketId = ObjectInspectorUtils.getBucketNumber(struct.toArray(), fieldObjectInspectors.toArray(new ObjectInspector[fieldObjectInspectors.size()]), 16); + int bucketId = ObjectInspectorUtils.getBucketNumberOld(struct.toArray(), fieldObjectInspectors.toArray(new ObjectInspector[fieldObjectInspectors.size()]), 16); assertEquals("", 6, bucketId); assertEquals("", bucketId, ObjectInspectorUtils.getBucketNumber(hashCode, 16)); } diff --git a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java index cb1d40a4a8..6d4457945b 100644 --- a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java +++ b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java @@ -84,4 +84,7 @@ public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"; + public static final String TABLE_BUCKETING_VERSION = "bucketing_version"; + + public static final String BUCKETED_TABLE_LOADED_IN_EXPERT_MODE = "bucketed_table_loaded_in_expert_mode"; } diff --git a/storage-api/src/java/org/apache/hive/common/util/Murmur3.java b/storage-api/src/java/org/apache/hive/common/util/Murmur3.java index 6848a59563..1ceeeb3868 100644 --- a/storage-api/src/java/org/apache/hive/common/util/Murmur3.java +++ b/storage-api/src/java/org/apache/hive/common/util/Murmur3.java @@ -61,7 +61,7 @@ * @return - hashcode */ public static int hash32(byte[] data) { - return hash32(data, data.length, DEFAULT_SEED); + return hash32(data, 0, data.length, DEFAULT_SEED); } /** @@ -73,16 +73,29 @@ public static int hash32(byte[] data) { * @return - hashcode */ public static int hash32(byte[] data, int length, int seed) { + return hash32(data, 0, length, seed); + } + + /** + * Murmur3 32-bit variant. + * + * @param data - input byte array + * @param offset - offset of data + * @param length - length of array + * @param seed - seed. (default 0) + * @return - hashcode + */ + public static int hash32(byte[] data, int offset, int length, int seed) { int hash = seed; final int nblocks = length >> 2; // body for (int i = 0; i < nblocks; i++) { int i_4 = i << 2; - int k = (data[i_4] & 0xff) - | ((data[i_4 + 1] & 0xff) << 8) - | ((data[i_4 + 2] & 0xff) << 16) - | ((data[i_4 + 3] & 0xff) << 24); + int k = (data[offset + i_4] & 0xff) + | ((data[offset + i_4 + 1] & 0xff) << 8) + | ((data[offset + i_4 + 2] & 0xff) << 16) + | ((data[offset + i_4 + 3] & 0xff) << 24); // mix functions k *= C1_32; @@ -97,11 +110,11 @@ public static int hash32(byte[] data, int length, int seed) { int k1 = 0; switch (length - idx) { case 3: - k1 ^= data[idx + 2] << 16; + k1 ^= data[offset + idx + 2] << 16; case 2: - k1 ^= data[idx + 1] << 8; + k1 ^= data[offset + idx + 1] << 8; case 1: - k1 ^= data[idx]; + k1 ^= data[offset + idx]; // mix functions k1 *= C1_32;