diff --git standalone-metastore/pom.xml standalone-metastore/pom.xml index 5c536c90fa..1bbb434a58 100644 --- standalone-metastore/pom.xml +++ standalone-metastore/pom.xml @@ -76,6 +76,7 @@ 2.9.4 5.5.1 4.11 + 0.4.3 0.9.3 0.9.3 2.8.2 @@ -320,6 +321,12 @@ ${junit.version} test + + net.jodah + concurrentunit + ${concurrentunit.version} + test + org.mockito mockito-core diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java index 8bdb47b431..a14dab135f 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java @@ -211,34 +211,58 @@ public void addBytes(byte[] val) { add(Murmur3.hash64(val)); } - public void addShort(short val) { - SHORT_BUFFER.putShort(0, val); - add(Murmur3.hash64(SHORT_BUFFER.array())); + public long addShort(short val) { + long hash = 0; + synchronized (HyperLogLog.class) { + SHORT_BUFFER.putShort(0, val); + hash = Murmur3.hash64(SHORT_BUFFER.array()); + } + return add(hash); } - public void addInt(int val) { - INT_BUFFER.putInt(0, val); - add(Murmur3.hash64(INT_BUFFER.array())); + public long addInt(int val) { + long hash = 0; + synchronized (HyperLogLog.class) { + INT_BUFFER.putInt(0, val); + hash = Murmur3.hash64(INT_BUFFER.array()); + } + return add(hash); } - public void addLong(long val) { - LONG_BUFFER.putLong(0, val); - add(Murmur3.hash64(LONG_BUFFER.array())); + public long addLong(long val) { + long hash = 0; + synchronized (HyperLogLog.class) { + LONG_BUFFER.putLong(0, val); + hash = Murmur3.hash64(LONG_BUFFER.array()); + } + return add(hash); } - public void addFloat(float val) { - INT_BUFFER.putFloat(0, val); - add(Murmur3.hash64(INT_BUFFER.array())); + public long addFloat(float val) { + long hash = 0; + synchronized (HyperLogLog.class) { + INT_BUFFER.putFloat(0, val); + hash = Murmur3.hash64(INT_BUFFER.array()); + } + return add(hash); } - public void addDouble(double val) { - LONG_BUFFER.putDouble(0, val); - add(Murmur3.hash64(LONG_BUFFER.array())); + public long addDouble(double val) { + long hash = 0; + synchronized (HyperLogLog.class) { + LONG_BUFFER.putDouble(0, val); + hash = Murmur3.hash64(LONG_BUFFER.array()); + } + return add(hash); } - public void addChar(char val) { - SHORT_BUFFER.putChar(0, val); - add(Murmur3.hash64(SHORT_BUFFER.array())); + public long addChar(char val) { + long hash = 0; + synchronized (HyperLogLog.class) { + SHORT_BUFFER.putChar(0, val); + hash = Murmur3.hash64(SHORT_BUFFER.array()); + } + return add(hash); } /** @@ -254,7 +278,7 @@ public void addString(String val, Charset charset) { add(Murmur3.hash64(val.getBytes(charset))); } - public void add(long hashcode) { + public long add(long hashcode) { if (encoding.equals(EncodingType.SPARSE)) { if (sparseRegister.add(hashcode)) { invalidateCount = true; @@ -273,6 +297,7 @@ public void add(long hashcode) { invalidateCount = true; } } + return hashcode; } public long estimateNumDistinctValues() { diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/common/ndv/hll/TestHyperLogLog.java standalone-metastore/src/test/java/org/apache/hadoop/hive/common/ndv/hll/TestHyperLogLog.java index 617d9c3627..c1e799e29c 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/common/ndv/hll/TestHyperLogLog.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/common/ndv/hll/TestHyperLogLog.java @@ -19,16 +19,36 @@ import static org.junit.Assert.assertEquals; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; + import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog.EncodingType; import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.apache.hive.common.util.Murmur3; import org.junit.Test; import org.junit.experimental.categories.Category; +import net.jodah.concurrentunit.Waiter; + +import org.junit.BeforeClass; + @Category(MetastoreUnitTest.class) public class TestHyperLogLog { // 5% tolerance for estimated count private float longRangeTolerance = 5.0f; private float shortRangeTolerance = 2.0f; + private static final int THREAD_SAFETY_CHECK_LOOP_COUNT = 100; + private static final long[] hashes = new long[THREAD_SAFETY_CHECK_LOOP_COUNT]; + + @BeforeClass + public static void setupHashes() { + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); + for (int i = 0; i < THREAD_SAFETY_CHECK_LOOP_COUNT; ++i) { + buffer.putInt(0, i); + hashes[i] = Murmur3.hash64(buffer.array()); + } + } @Test(expected = IllegalArgumentException.class) public void testHLLDenseMerge() { @@ -227,4 +247,33 @@ public void testHLLSparseMoreRegisterBits() { double delta = threshold * size / 100; assertEquals((double) size, (double) hll.count(), delta); } + + @Test + public void testBufferThreadSafety() throws TimeoutException { + HyperLogLog hll = HyperLogLog.builder().setEncoding(EncodingType.DENSE).build(); + + // to make sure all exceptions are propagated back to Main thread + final Waiter waiter = new Waiter(); + + // to make sure all threads are starting "at the same time" + final CountDownLatch latch = new CountDownLatch(1); + + for (int i = 0; i < THREAD_SAFETY_CHECK_LOOP_COUNT; ++i) { + final int index = i; + Runnable runner = new Runnable() { + public void run() { + try { + latch.await(); + waiter.assertEquals(hashes[index], hll.addInt(index)); + waiter.resume(); + } catch (InterruptedException ie) { + // don't care + } + } + }; + new Thread(runner, "TestThread" + i).start(); + } + latch.countDown(); // release the latch + waiter.await(1000, THREAD_SAFETY_CHECK_LOOP_COUNT); + } }