diff --git standalone-metastore/pom.xml standalone-metastore/pom.xml
index 5c536c90fa..1bbb434a58 100644
--- standalone-metastore/pom.xml
+++ standalone-metastore/pom.xml
@@ -76,6 +76,7 @@
2.9.4
5.5.1
4.11
+ 0.4.3
0.9.3
0.9.3
2.8.2
@@ -320,6 +321,12 @@
${junit.version}
test
+
+ net.jodah
+ concurrentunit
+ ${concurrentunit.version}
+ test
+
org.mockito
mockito-core
diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
index 8bdb47b431..a14dab135f 100644
--- standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
+++ standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
@@ -211,34 +211,58 @@ public void addBytes(byte[] val) {
add(Murmur3.hash64(val));
}
- public void addShort(short val) {
- SHORT_BUFFER.putShort(0, val);
- add(Murmur3.hash64(SHORT_BUFFER.array()));
+ public long addShort(short val) {
+ long hash = 0;
+ synchronized (HyperLogLog.class) {
+ SHORT_BUFFER.putShort(0, val);
+ hash = Murmur3.hash64(SHORT_BUFFER.array());
+ }
+ return add(hash);
}
- public void addInt(int val) {
- INT_BUFFER.putInt(0, val);
- add(Murmur3.hash64(INT_BUFFER.array()));
+ public long addInt(int val) {
+ long hash = 0;
+ synchronized (HyperLogLog.class) {
+ INT_BUFFER.putInt(0, val);
+ hash = Murmur3.hash64(INT_BUFFER.array());
+ }
+ return add(hash);
}
- public void addLong(long val) {
- LONG_BUFFER.putLong(0, val);
- add(Murmur3.hash64(LONG_BUFFER.array()));
+ public long addLong(long val) {
+ long hash = 0;
+ synchronized (HyperLogLog.class) {
+ LONG_BUFFER.putLong(0, val);
+ hash = Murmur3.hash64(LONG_BUFFER.array());
+ }
+ return add(hash);
}
- public void addFloat(float val) {
- INT_BUFFER.putFloat(0, val);
- add(Murmur3.hash64(INT_BUFFER.array()));
+ public long addFloat(float val) {
+ long hash = 0;
+ synchronized (HyperLogLog.class) {
+ INT_BUFFER.putFloat(0, val);
+ hash = Murmur3.hash64(INT_BUFFER.array());
+ }
+ return add(hash);
}
- public void addDouble(double val) {
- LONG_BUFFER.putDouble(0, val);
- add(Murmur3.hash64(LONG_BUFFER.array()));
+ public long addDouble(double val) {
+ long hash = 0;
+ synchronized (HyperLogLog.class) {
+ LONG_BUFFER.putDouble(0, val);
+ hash = Murmur3.hash64(LONG_BUFFER.array());
+ }
+ return add(hash);
}
- public void addChar(char val) {
- SHORT_BUFFER.putChar(0, val);
- add(Murmur3.hash64(SHORT_BUFFER.array()));
+ public long addChar(char val) {
+ long hash = 0;
+ synchronized (HyperLogLog.class) {
+ SHORT_BUFFER.putChar(0, val);
+ hash = Murmur3.hash64(SHORT_BUFFER.array());
+ }
+ return add(hash);
}
/**
@@ -254,7 +278,7 @@ public void addString(String val, Charset charset) {
add(Murmur3.hash64(val.getBytes(charset)));
}
- public void add(long hashcode) {
+ public long add(long hashcode) {
if (encoding.equals(EncodingType.SPARSE)) {
if (sparseRegister.add(hashcode)) {
invalidateCount = true;
@@ -273,6 +297,7 @@ public void add(long hashcode) {
invalidateCount = true;
}
}
+ return hashcode;
}
public long estimateNumDistinctValues() {
diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/common/ndv/hll/TestHyperLogLog.java standalone-metastore/src/test/java/org/apache/hadoop/hive/common/ndv/hll/TestHyperLogLog.java
index 617d9c3627..c1e799e29c 100644
--- standalone-metastore/src/test/java/org/apache/hadoop/hive/common/ndv/hll/TestHyperLogLog.java
+++ standalone-metastore/src/test/java/org/apache/hadoop/hive/common/ndv/hll/TestHyperLogLog.java
@@ -19,16 +19,36 @@
import static org.junit.Assert.assertEquals;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog.EncodingType;
import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hive.common.util.Murmur3;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import net.jodah.concurrentunit.Waiter;
+
+import org.junit.BeforeClass;
+
@Category(MetastoreUnitTest.class)
public class TestHyperLogLog {
// 5% tolerance for estimated count
private float longRangeTolerance = 5.0f;
private float shortRangeTolerance = 2.0f;
+ private static final int THREAD_SAFETY_CHECK_LOOP_COUNT = 100;
+ private static final long[] hashes = new long[THREAD_SAFETY_CHECK_LOOP_COUNT];
+
+ @BeforeClass
+ public static void setupHashes() {
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+ for (int i = 0; i < THREAD_SAFETY_CHECK_LOOP_COUNT; ++i) {
+ buffer.putInt(0, i);
+ hashes[i] = Murmur3.hash64(buffer.array());
+ }
+ }
@Test(expected = IllegalArgumentException.class)
public void testHLLDenseMerge() {
@@ -227,4 +247,33 @@ public void testHLLSparseMoreRegisterBits() {
double delta = threshold * size / 100;
assertEquals((double) size, (double) hll.count(), delta);
}
+
+ @Test
+ public void testBufferThreadSafety() throws TimeoutException {
+ HyperLogLog hll = HyperLogLog.builder().setEncoding(EncodingType.DENSE).build();
+
+ // to make sure all exceptions are propagated back to Main thread
+ final Waiter waiter = new Waiter();
+
+ // to make sure all threads are starting "at the same time"
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ for (int i = 0; i < THREAD_SAFETY_CHECK_LOOP_COUNT; ++i) {
+ final int index = i;
+ Runnable runner = new Runnable() {
+ public void run() {
+ try {
+ latch.await();
+ waiter.assertEquals(hashes[index], hll.addInt(index));
+ waiter.resume();
+ } catch (InterruptedException ie) {
+ // don't care
+ }
+ }
+ };
+ new Thread(runner, "TestThread" + i).start();
+ }
+ latch.countDown(); // release the latch
+ waiter.await(1000, THREAD_SAFETY_CHECK_LOOP_COUNT);
+ }
}