diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java index 2c51882..3e1fcdd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.vector; +import org.apache.hive.common.util.Murmur3; + import java.sql.Timestamp; import java.util.Arrays; @@ -41,6 +43,17 @@ */ public class VectorHashKeyWrapper extends KeyWrapper { + public static final class HashContext { + private final Murmur3.IncrementalHash32 bytesHash = new Murmur3.IncrementalHash32(); + + public static Murmur3.IncrementalHash32 getBytesHash(HashContext ctx) { + if (ctx == null) { + return new Murmur3.IncrementalHash32(); + } + return ctx.bytesHash; + } + } + private static final int[] EMPTY_INT_ARRAY = new int[0]; private static final long[] EMPTY_LONG_ARRAY = new long[0]; private static final double[] EMPTY_DOUBLE_ARRAY = new double[0]; @@ -73,10 +86,13 @@ private int hashcode; - private VectorHashKeyWrapper(int longValuesCount, int doubleValuesCount, + private HashContext hashCtx; + + private VectorHashKeyWrapper(HashContext ctx, int longValuesCount, int doubleValuesCount, int byteValuesCount, int decimalValuesCount, int timestampValuesCount, int intervalDayTimeValuesCount, int keyCount) { + hashCtx = ctx; longValues = longValuesCount > 0 ? new long[longValuesCount] : EMPTY_LONG_ARRAY; doubleValues = doubleValuesCount > 0 ? new double[doubleValuesCount] : EMPTY_DOUBLE_ARRAY; decimalValues = decimalValuesCount > 0 ? new HiveDecimalWritable[decimalValuesCount] : EMPTY_DECIMAL_ARRAY; @@ -107,14 +123,14 @@ private VectorHashKeyWrapper(int longValuesCount, int doubleValuesCount, private VectorHashKeyWrapper() { } - public static VectorHashKeyWrapper allocate(int longValuesCount, int doubleValuesCount, + public static VectorHashKeyWrapper allocate(HashContext ctx, int longValuesCount, int doubleValuesCount, int byteValuesCount, int decimalValuesCount, int timestampValuesCount, int intervalDayTimeValuesCount, int keyCount) { if ((longValuesCount + doubleValuesCount + byteValuesCount + decimalValuesCount + timestampValuesCount + intervalDayTimeValuesCount) == 0) { return EMPTY_KEY_WRAPPER; } - return new VectorHashKeyWrapper(longValuesCount, doubleValuesCount, byteValuesCount, + return new VectorHashKeyWrapper(ctx, longValuesCount, doubleValuesCount, byteValuesCount, decimalValuesCount, timestampValuesCount, intervalDayTimeValuesCount, keyCount); } @@ -126,40 +142,44 @@ public void getNewKey(Object row, ObjectInspector rowInspector) throws HiveExcep @Override public void setHashKey() { - hashcode = Arrays.hashCode(longValues) ^ + // compute locally and assign + int hash = Arrays.hashCode(longValues) ^ Arrays.hashCode(doubleValues) ^ Arrays.hashCode(isNull); for (int i = 0; i < decimalValues.length; i++) { // Use the new faster hash code since we are hashing memory objects. - hashcode ^= decimalValues[i].newFasterHashCode(); + hash ^= decimalValues[i].newFasterHashCode(); } for (int i = 0; i < timestampValues.length; i++) { - hashcode ^= timestampValues[i].hashCode(); + hash ^= timestampValues[i].hashCode(); } for (int i = 0; i < intervalDayTimeValues.length; i++) { - hashcode ^= intervalDayTimeValues[i].hashCode(); + hash ^= intervalDayTimeValues[i].hashCode(); } // This code, with branches and all, is not executed if there are no string keys + Murmur3.IncrementalHash32 bytesHash = null; for (int i = 0; i < byteValues.length; ++i) { /* * Hashing the string is potentially expensive so is better to branch. * Additionally not looking at values for nulls allows us not reset the values. */ - if (byteLengths[i] != -1) { - byte[] bytes = byteValues[i]; - int start = byteStarts[i]; - int length = byteLengths[i]; - // Unfortunately there is no Arrays.hashCode(byte[], start, length) - for(int j = start; j < start + length; ++j) { - // use 461 as is a (sexy!) prime. - hashcode ^= 461 * bytes[j]; - } + if (byteLengths[i] == -1) { + continue; } + if (bytesHash == null) { + bytesHash = HashContext.getBytesHash(hashCtx); + bytesHash.start(hash); + } + bytesHash.add(byteValues[i], byteStarts[i], byteLengths[i]); + } + if (bytesHash != null) { + hash = bytesHash.end(); } + this.hashcode = hash; } @Override @@ -171,6 +191,7 @@ public int hashCode() { public boolean equals(Object that) { if (that instanceof VectorHashKeyWrapper) { VectorHashKeyWrapper keyThat = (VectorHashKeyWrapper)that; + // not comparing hashCtx - irrelevant return hashcode == keyThat.hashcode && Arrays.equals(longValues, keyThat.longValues) && Arrays.equals(doubleValues, keyThat.doubleValues) && @@ -211,6 +232,7 @@ protected Object clone() { } public void duplicateTo(VectorHashKeyWrapper clone) { + clone.hashCtx = hashCtx; clone.longValues = (longValues.length > 0) ? longValues.clone() : EMPTY_LONG_ARRAY; clone.doubleValues = (doubleValues.length > 0) ? doubleValues.clone() : EMPTY_DOUBLE_ARRAY; clone.isNull = isNull.clone(); @@ -464,7 +486,7 @@ public boolean isNull(int keyIndex) { public static final class EmptyVectorHashKeyWrapper extends VectorHashKeyWrapper { private EmptyVectorHashKeyWrapper() { - super(0, 0, 0, 0, 0, 0, /* keyCount */ 0); + super(null, 0, 0, 0, 0, 0, 0, /* keyCount */ 0); // no need to override assigns - all assign ops will fail due to 0 size } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java index b235a3e..82e8748 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java @@ -63,6 +63,11 @@ public VectorHashKeyWrapperBatch(int keyCount) { */ private int keysFixedSize; + /** + * Shared hashcontext for all keys in this batch + */ + private final VectorHashKeyWrapper.HashContext hashCtx = new VectorHashKeyWrapper.HashContext(); + /** * Returns the compiled fixed size for the key wrappers. * @return @@ -966,7 +971,7 @@ public static VectorHashKeyWrapperBatch compileKeyWrapperBatch(VectorExpression[ } public VectorHashKeyWrapper allocateKeyWrapper() { - return VectorHashKeyWrapper.allocate( + return VectorHashKeyWrapper.allocate(hashCtx, longIndices.length, doubleIndices.length, stringIndices.length, diff --git storage-api/src/java/org/apache/hive/common/util/Murmur3.java storage-api/src/java/org/apache/hive/common/util/Murmur3.java index 88c3514..1c56765 100644 --- storage-api/src/java/org/apache/hive/common/util/Murmur3.java +++ storage-api/src/java/org/apache/hive/common/util/Murmur3.java @@ -332,4 +332,100 @@ private static long fmix64(long h) { h ^= (h >>> 33); return h; } + + public static class IncrementalHash32 { + byte[] tail = new byte[3]; + int tailLen; + int totalLen; + int hash; + + public final void start(int hash) { + tailLen = totalLen = 0; + this.hash = hash; + } + + public final void add(byte[] data, int offset, int length) { + if (length == 0) return; + totalLen += length; + if (tailLen + length < 4) { + System.arraycopy(data, offset, tail, tailLen, length); + tailLen += length; + return; + } + int offset2 = 0; + if (tailLen > 0) { + offset2 = (4 - tailLen); + int k = -1; + switch (tailLen) { + case 1: + k = orBytes(tail[0], data[0], data[1], data[2]); + break; + case 2: + k = orBytes(tail[0], tail[1], data[0], data[1]); + break; + case 3: + k = orBytes(tail[0], tail[1], tail[2], data[0]); + break; + default: throw new AssertionError(tailLen); + } + // mix functions + k *= C1_32; + k = Integer.rotateLeft(k, R1_32); + k *= C2_32; + hash ^= k; + hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32; + } + int length2 = length - offset2; + offset += offset2; + final int nblocks = length2 >> 2; + + for (int i = 0; i < nblocks; i++) { + int i_4 = (i << 2) + offset; + int k = orBytes(data[i_4], data[i_4 + 1], data[i_4 + 2], data[i_4 + 3]); + + // mix functions + k *= C1_32; + k = Integer.rotateLeft(k, R1_32); + k *= C2_32; + hash ^= k; + hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32; + } + + int consumed = (nblocks << 2); + tailLen = length2 - consumed; + if (consumed == length2) return; + System.arraycopy(data, offset + consumed, tail, 0, tailLen); + } + + public final int end() { + int k1 = 0; + switch (tailLen) { + case 3: + k1 ^= tail[2] << 16; + case 2: + k1 ^= tail[1] << 8; + case 1: + k1 ^= tail[0]; + + // mix functions + k1 *= C1_32; + k1 = Integer.rotateLeft(k1, R1_32); + k1 *= C2_32; + hash ^= k1; + } + + // finalization + hash ^= totalLen; + hash ^= (hash >>> 16); + hash *= 0x85ebca6b; + hash ^= (hash >>> 13); + hash *= 0xc2b2ae35; + hash ^= (hash >>> 16); + return hash; + } + } + + private static int orBytes(byte b1, byte b2, byte b3, byte b4) { + return (b1 & 0xff) | ((b2 & 0xff) << 8) | ((b3 & 0xff) << 16) | ((b4 & 0xff) << 24); + } }