sparseMap;
-
- // for a better insertion performance values are added to temporary unsorted
- // list which will be merged to sparse map after a threshold
- private int[] tempList;
- private int tempListIdx;
+ private Int2ByteMap sparseMap;
// number of register bits
private final int p;
@@ -47,9 +44,7 @@
public HLLSparseRegister(int p, int pp, int qp) {
this.p = p;
- this.sparseMap = new TreeMap<>();
- this.tempList = new int[HLLConstants.TEMP_LIST_DEFAULT_SIZE];
- this.tempListIdx = 0;
+ this.sparseMap = new Int2ByteOpenHashMap();
this.pPrime = pp;
this.qPrime = qp;
this.mask = ((1 << pPrime) - 1) ^ ((1 << p) - 1);
@@ -60,65 +55,44 @@
public boolean add(long hashcode) {
boolean updated = false;
- // fill the temp list before merging to sparse map
- if (tempListIdx < tempList.length) {
- int encodedHash = encodeHash(hashcode);
- tempList[tempListIdx++] = encodedHash;
- updated = true;
+ int encodedHash = encodeHash(hashcode);
+
+ // int encodedHash = tempList[i];
+ int key = encodedHash & pPrimeMask;
+ byte value = (byte) (encodedHash >>> pPrime);
+ byte nr = 0;
+ // if MSB is set to 1 then next qPrime MSB bits contains the value of
+ // number of zeroes.
+ // if MSB is set to 0 then number of zeroes is contained within pPrime - p
+ // bits.
+ if (encodedHash < 0) {
+ nr = (byte) (value & qPrimeMask);
} else {
- updated = mergeTempListToSparseMap();
+ nr = (byte) (Integer.numberOfTrailingZeros(encodedHash >>> p) + 1);
}
+ updated = set(key, nr);
return updated;
}
/**
- * Adds temp list to sparse map. The key for sparse map entry is the register
- * index determined by pPrime and value is the number of trailing zeroes.
- * @return
- */
- private boolean mergeTempListToSparseMap() {
- boolean updated = false;
- for (int i = 0; i < tempListIdx; i++) {
- int encodedHash = tempList[i];
- int key = encodedHash & pPrimeMask;
- byte value = (byte) (encodedHash >>> pPrime);
- byte nr = 0;
- // if MSB is set to 1 then next qPrime MSB bits contains the value of
- // number of zeroes.
- // if MSB is set to 0 then number of zeroes is contained within pPrime - p
- // bits.
- if (encodedHash < 0) {
- nr = (byte) (value & qPrimeMask);
- } else {
- nr = (byte) (Integer.numberOfTrailingZeros(encodedHash >>> p) + 1);
- }
- updated = set(key, nr);
- }
-
- // reset temp list index
- tempListIdx = 0;
- return updated;
- }
-
- /**
*
* Input: 64 bit hashcode
- *
+ *
* |---------w-------------| |------------p'----------|
* 10101101.......1010101010 10101010101 01010101010101
* |------p-----|
- *
+ *
* Output: 32 bit int
- *
+ *
* |b| |-q'-| |------------p'----------|
* 1 010101 01010101010 10101010101010
* |------p-----|
- *
- *
+ *
+ *
* The default values of p', q' and b are 25, 6, 1 (total 32 bits) respectively.
* This function will return an int encoded in the following format
- *
+ *
* p - LSB p bits represent the register index
* p' - LSB p' bits are used for increased accuracy in estimation
* q' - q' bits after p' are left as such from the hashcode if b = 0 else
@@ -148,8 +122,8 @@
}
}
- public int getSize() {
- return sparseMap.size() + tempListIdx;
+ public boolean isSizeGreaterThan(int s) {
+ return sparseMap.size() > s;
}
public void merge(HLLRegister hllRegister) {
@@ -177,14 +151,11 @@
return false;
}
- public TreeMap getSparseMap() {
+ public Map getSparseMap() {
return getMergedSparseMap();
}
- private TreeMap getMergedSparseMap() {
- if (tempListIdx != 0) {
- mergeTempListToSparseMap();
- }
+ private Map getMergedSparseMap() {
return sparseMap;
}
@@ -195,7 +166,7 @@
byte lr = entry.getValue(); // this can be a max of 65, never > 127
if (lr != 0) {
// should be a no-op for sparse
- dest.add((long) ((1 << (p + lr - 1)) | idx));
+ dest.add((1L << (p + lr - 1)) | idx);
}
}
}
@@ -231,15 +202,8 @@
return false;
}
HLLSparseRegister other = (HLLSparseRegister) obj;
- boolean result = p == other.p && pPrime == other.pPrime && qPrime == other.qPrime
- && tempListIdx == other.tempListIdx;
+ boolean result = p == other.p && pPrime == other.pPrime && qPrime == other.qPrime;
if (result) {
- for (int i = 0; i < tempListIdx; i++) {
- if (tempList[i] != other.tempList[i]) {
- return false;
- }
- }
-
result = result && sparseMap.equals(other.sparseMap);
}
return result;
@@ -251,9 +215,6 @@
hashcode += 31 * p;
hashcode += 31 * pPrime;
hashcode += 31 * qPrime;
- for (int i = 0; i < tempListIdx; i++) {
- hashcode += 31 * tempList[tempListIdx];
- }
hashcode += sparseMap.hashCode();
return hashcode;
}
diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
index 91a6865..edf587f 100644
--- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
+++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
@@ -20,7 +20,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.TreeMap;
@@ -30,17 +29,19 @@
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hive.common.util.Murmur3;
+import com.google.common.annotations.VisibleForTesting;
+
/**
*
* This is an implementation of the following variants of hyperloglog (HLL)
- * algorithm
+ * algorithm
* Original - Original HLL algorithm from Flajolet et. al from
* http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
* HLLNoBias - Google's implementation of bias correction based on lookup table
* http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf
* HLL++ - Google's implementation of HLL++ algorithm that uses SPARSE registers
* http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf
- *
+ *
* Following are the constructor parameters that determines which algorithm is
* used
* numRegisterIndexBits - number of LSB hashcode bits to be used as register index.
@@ -194,7 +195,7 @@
} else if (hashBits <= 64) {
alphaMM = 0.709f;
} else {
- alphaMM = 0.7213f / (float) (1 + 1.079f / m);
+ alphaMM = 0.7213f / (1 + 1.079f / m);
}
// For efficiency alpha is multiplied by m^2
@@ -258,7 +259,7 @@
// if size of sparse map excess the threshold convert the sparse map to
// dense register and switch to DENSE encoding
- if (sparseRegister.getSize() > encodingSwitchThreshold) {
+ if (sparseRegister.isSizeGreaterThan(encodingSwitchThreshold)) {
encoding = EncodingType.DENSE;
denseRegister = sparseToDenseRegister(sparseRegister);
sparseRegister = null;
@@ -386,7 +387,7 @@
}
private long linearCount(int mVal, long numZeros) {
- return (long) (Math.round(mVal * Math.log(mVal / ((double) numZeros))));
+ return (Math.round(mVal * Math.log(mVal / ((double) numZeros))));
}
// refer paper
@@ -459,7 +460,7 @@
sparseRegister.merge(hll.getHLLSparseRegister());
// if after merge the sparse switching threshold is exceeded then change
// to dense encoding
- if (sparseRegister.getSize() > encodingSwitchThreshold) {
+ if (sparseRegister.isSizeGreaterThan(encodingSwitchThreshold)) {
encoding = EncodingType.DENSE;
denseRegister = sparseToDenseRegister(sparseRegister);
sparseRegister = null;
@@ -481,7 +482,7 @@
/**
* Reduces the accuracy of the HLL provided to a smaller size
- * @param p0
+ * @param p0
* - new p size for the new HyperLogLog (smaller or no change)
* @return reduced (or same) HyperLogLog instance
*/
@@ -661,4 +662,9 @@
return o instanceof HyperLogLog;
}
+ @VisibleForTesting
+ public int getEncodingSwitchThreshold() {
+ return encodingSwitchThreshold;
+ }
+
}
diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java
index aeba2e9..703129b 100644
--- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java
+++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java
@@ -25,8 +25,6 @@
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Map;
-import java.util.TreeMap;
-
import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog.EncodingType;
/**
@@ -38,24 +36,24 @@
/**
* HyperLogLog is serialized using the following format
- *
+ *
*
- * |-4 byte-|------varlong----|varint (optional)|----------|
+ * |-4 byte-|------varlong----|varint (optional)|----------|
* ---------------------------------------------------------
* | header | estimated-count | register-length | register |
* ---------------------------------------------------------
- *
+ *
* 4 byte header is encoded like below
* 3 bytes - HLL magic string to identify serialized stream
* 4 bits - p (number of bits to be used as register index)
* 1 - spare bit (not used)
* 3 bits - encoding (000 - sparse, 001..110 - n bit packing, 111 - no bit packing)
- *
+ *
* Followed by header are 3 fields that are required for reconstruction
* of hyperloglog
* Estimated count - variable length long to store last computed estimated count.
* This is just for quick lookup without deserializing registers
- * Register length - number of entries in the register (required only for
+ * Register length - number of entries in the register (required only for
* for sparse representation. For bit-packing, the register
* length can be found from p)
*
@@ -104,7 +102,7 @@
byte[] register = hll.getHLLDenseRegister().getRegister();
bitpackHLLRegister(out, register, bitWidth);
} else if (enc.equals(EncodingType.SPARSE)) {
- TreeMap sparseMap = hll.getHLLSparseRegister().getSparseMap();
+ Map sparseMap = hll.getHLLSparseRegister().getSparseMap();
// write the number of elements in sparse map (required for
// reconstruction)
diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/common/ndv/hll/TestHyperLogLog.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/common/ndv/hll/TestHyperLogLog.java
index e014fb5..e720ec8 100644
--- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/common/ndv/hll/TestHyperLogLog.java
+++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/common/ndv/hll/TestHyperLogLog.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.common.ndv.hll;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog.EncodingType;
import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
@@ -49,27 +50,27 @@
double threshold = size > 40000 ? longRangeTolerance : shortRangeTolerance;
double delta = threshold * size / 100;
double delta4 = threshold * (4*size) / 100;
- assertEquals((double) size, (double) hll.count(), delta);
- assertEquals((double) size, (double) hll2.count(), delta);
+ assertEquals(size, hll.count(), delta);
+ assertEquals(size, hll2.count(), delta);
// merge
hll.merge(hll2);
- assertEquals((double) 2 * size, (double) hll.count(), delta);
+ assertEquals((double) 2 * size, hll.count(), delta);
assertEquals(EncodingType.DENSE, hll.getEncoding());
// merge should update registers and hence the count
hll.merge(hll2);
- assertEquals((double) 2 * size, (double) hll.count(), delta);
+ assertEquals((double) 2 * size, hll.count(), delta);
assertEquals(EncodingType.DENSE, hll.getEncoding());
// new merge
hll.merge(hll3);
- assertEquals((double) 3 * size, (double) hll.count(), delta);
+ assertEquals((double) 3 * size, hll.count(), delta);
assertEquals(EncodingType.DENSE, hll.getEncoding());
- // valid merge -- register set size gets bigger (also 4k items
+ // valid merge -- register set size gets bigger (also 4k items
hll.merge(hll4);
- assertEquals((double) 4 * size, (double) hll.count(), delta4);
+ assertEquals((double) 4 * size, hll.count(), delta4);
assertEquals(EncodingType.DENSE, hll.getEncoding());
// invalid merge -- smaller register merge to bigger
@@ -95,27 +96,27 @@
double threshold = size > 40000 ? longRangeTolerance : shortRangeTolerance;
double delta = threshold * size / 100;
double delta4 = threshold * (4*size) / 100;
- assertEquals((double) size, (double) hll.count(), delta);
- assertEquals((double) size, (double) hll2.count(), delta);
+ assertEquals(size, hll.count(), delta);
+ assertEquals(size, hll2.count(), delta);
// merge
hll.merge(hll2);
- assertEquals((double) 2 * size, (double) hll.count(), delta);
+ assertEquals((double) 2 * size, hll.count(), delta);
assertEquals(EncodingType.SPARSE, hll.getEncoding());
// merge should update registers and hence the count
hll.merge(hll2);
- assertEquals((double) 2 * size, (double) hll.count(), delta);
+ assertEquals((double) 2 * size, hll.count(), delta);
assertEquals(EncodingType.SPARSE, hll.getEncoding());
// new merge
hll.merge(hll3);
- assertEquals((double) 3 * size, (double) hll.count(), delta);
+ assertEquals((double) 3 * size, hll.count(), delta);
assertEquals(EncodingType.SPARSE, hll.getEncoding());
// valid merge -- register set size gets bigger & dense automatically
hll.merge(hll4);
- assertEquals((double) 4 * size, (double) hll.count(), delta4);
+ assertEquals((double) 4 * size, hll.count(), delta4);
assertEquals(EncodingType.DENSE, hll.getEncoding());
// invalid merge -- smaller register merge to bigger
@@ -140,27 +141,27 @@
}
double threshold = size > 40000 ? longRangeTolerance : shortRangeTolerance;
double delta = threshold * size / 100;
- assertEquals((double) size, (double) hll.count(), delta);
- assertEquals((double) size, (double) hll2.count(), delta);
+ assertEquals(size, hll.count(), delta);
+ assertEquals(size, hll2.count(), delta);
// sparse-sparse merge
hll.merge(hll2);
- assertEquals((double) 2 * size, (double) hll.count(), delta);
+ assertEquals((double) 2 * size, hll.count(), delta);
assertEquals(EncodingType.SPARSE, hll.getEncoding());
// merge should update registers and hence the count
hll.merge(hll2);
- assertEquals((double) 2 * size, (double) hll.count(), delta);
+ assertEquals((double) 2 * size, hll.count(), delta);
assertEquals(EncodingType.SPARSE, hll.getEncoding());
// sparse-dense merge
hll.merge(hll3);
- assertEquals((double) 3 * size, (double) hll.count(), delta);
+ assertEquals((double) 3 * size, hll.count(), delta);
assertEquals(EncodingType.DENSE, hll.getEncoding());
// merge should convert hll2 to DENSE
hll2.merge(hll4);
- assertEquals((double) 2 * size, (double) hll2.count(), delta);
+ assertEquals((double) 2 * size, hll2.count(), delta);
assertEquals(EncodingType.DENSE, hll2.getEncoding());
// invalid merge -- smaller register merge to bigger
@@ -185,27 +186,27 @@
}
double threshold = size > 40000 ? longRangeTolerance : shortRangeTolerance;
double delta = threshold * size / 100;
- assertEquals((double) size, (double) hll.count(), delta);
- assertEquals((double) size, (double) hll2.count(), delta);
+ assertEquals(size, hll.count(), delta);
+ assertEquals(size, hll2.count(), delta);
// sparse-sparse merge
hll.merge(hll2);
- assertEquals((double) 2 * size, (double) hll.count(), delta);
+ assertEquals((double) 2 * size, hll.count(), delta);
assertEquals(EncodingType.DENSE, hll.getEncoding());
// merge should update registers and hence the count
hll.merge(hll2);
- assertEquals((double) 2 * size, (double) hll.count(), delta);
+ assertEquals((double) 2 * size, hll.count(), delta);
assertEquals(EncodingType.DENSE, hll.getEncoding());
// sparse-dense merge
hll.merge(hll3);
- assertEquals((double) 3 * size, (double) hll.count(), delta);
+ assertEquals((double) 3 * size, hll.count(), delta);
assertEquals(EncodingType.DENSE, hll.getEncoding());
// merge should convert hll3 to DENSE
hll3.merge(hll4);
- assertEquals((double) 2 * size, (double) hll3.count(), delta);
+ assertEquals((double) 2 * size, hll3.count(), delta);
assertEquals(EncodingType.DENSE, hll3.getEncoding());
// invalid merge -- smaller register merge to bigger
@@ -231,27 +232,27 @@
}
double threshold = size > 40000 ? longRangeTolerance : shortRangeTolerance;
double delta = threshold * size / 100;
- assertEquals((double) size, (double) hll.count(), delta);
- assertEquals((double) size, (double) hll2.count(), delta);
+ assertEquals(size, hll.count(), delta);
+ assertEquals(size, hll2.count(), delta);
// sparse-sparse merge
hll.merge(hll2);
- assertEquals((double) 2 * size, (double) hll.count(), delta);
+ assertEquals((double) 2 * size, hll.count(), delta);
assertEquals(EncodingType.SPARSE, hll.getEncoding());
// merge should update registers and hence the count
hll.merge(hll2);
- assertEquals((double) 2 * size, (double) hll.count(), delta);
+ assertEquals((double) 2 * size, hll.count(), delta);
assertEquals(EncodingType.SPARSE, hll.getEncoding());
// sparse-sparse overload to dense
hll.merge(hll3);
- assertEquals((double) 3 * size, (double) hll.count(), delta);
+ assertEquals((double) 3 * size, hll.count(), delta);
assertEquals(EncodingType.DENSE, hll.getEncoding());
// merge should convert hll2 to DENSE
hll2.merge(hll4);
- assertEquals((double) 2 * size, (double) hll2.count(), delta);
+ assertEquals((double) 2 * size, hll2.count(), delta);
assertEquals(EncodingType.DENSE, hll2.getEncoding());
// invalid merge -- smaller register merge to bigger
@@ -268,7 +269,7 @@
}
double threshold = size > 40000 ? longRangeTolerance : shortRangeTolerance;
double delta = threshold * size / 100;
- assertEquals((double) size, (double) hll.count(), delta);
+ assertEquals(size, hll.count(), delta);
}
@Test
@@ -296,7 +297,7 @@
.squash(small.getNumRegisterIndexBits());
assertEquals(small.count(), mush.count(), 0);
double delta = Math.ceil(small.getStandardError()*size);
- assertEquals((double) size, (double) mush.count(), delta);
+ assertEquals(size, mush.count(), delta);
}
}
}
@@ -316,7 +317,7 @@
}
p14HLL.squash(p10HLL.getNumRegisterIndexBits());
- assertEquals((double) size, p14HLL.count(), longRangeTolerance * size / 100.0);
+ assertEquals(size, p14HLL.count(), longRangeTolerance * size / 100.0);
}
@Test
@@ -333,6 +334,26 @@
}
p14HLL.squash(p10HLL.getNumRegisterIndexBits());
- assertEquals((double) size, p14HLL.count(), longRangeTolerance * size / 100.0);
+ assertEquals(size, p14HLL.count(), longRangeTolerance * size / 100.0);
}
+
+ @Test
+ public void testAbletoRetainAccuracyUpToSwitchThreshold() {
+ int maxThreshold = HyperLogLog.builder().setSizeOptimized().build().getEncodingSwitchThreshold();
+ testRetainAccuracy(70);
+ testRetainAccuracy(maxThreshold / 2);
+ testRetainAccuracy(maxThreshold);
+ }
+
+ private void testRetainAccuracy(int numElements) {
+ HyperLogLog h = HyperLogLog.builder().setSizeOptimized().build();
+ assertTrue(numElements <= h.getEncodingSwitchThreshold());
+ for (int ia = 0; ia <= 10; ia++) {
+ for (int i = 1; i <= numElements; i++) {
+ h.addLong(i);
+ }
+ }
+ assertEquals(numElements, h.estimateNumDistinctValues());
+ }
+
}
diff --git standalone-metastore/pom.xml standalone-metastore/pom.xml
index 0fa6389..054788a 100644
--- standalone-metastore/pom.xml
+++ standalone-metastore/pom.xml
@@ -104,6 +104,7 @@
4.2.0
3.5.5
8.1.1
+ 8.3.1
you-must-set-this-to-run-thrift
@@ -315,6 +316,11 @@
cron-utils
${cron-utils.version}