diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/BloomFilter.java b/java/kudu-client/src/main/java/org/apache/kudu/util/BloomFilter.java new file mode 100644 index 000000000..bfbef6464 --- /dev/null +++ b/java/kudu-client/src/main/java/org/apache/kudu/util/BloomFilter.java @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.util; + +import org.jboss.netty.util.CharsetUtil; + +public class BloomFilter { + + private int nBits; + private byte[] bitmap; + private int nHashes; + + public BloomFilter(BloomFilterSizing sizing) { + nBits = sizing.nBytes * 8; + bitmap = new byte[sizing.nBytes]; + nHashes = computeOptimalHashCount(nBits, sizing.getExpectedCount()); + } + + public static BloomKeyProbe genBloomKeyProbe(byte[] data) { + return new BloomKeyProbe(data); + } + + public static BloomKeyProbe genBloomKeyProbe(boolean data) { + byte[] bytes; + if (data) { + bytes = new byte[]{1}; + } else { + bytes = new byte[]{0}; + } + return new BloomKeyProbe(bytes); + } + + public static BloomKeyProbe genBloomKeyProbe(byte data) { + byte[] bytes = new byte[]{data}; + return new BloomKeyProbe(bytes); + } + + public static BloomKeyProbe genBloomKeyProbe(short data) { + byte[] bytes = new byte[]{(byte) (data >>> 0), (byte) (data >>> 8)}; + return new BloomKeyProbe(bytes); + } + + public static BloomKeyProbe genBloomKeyProbe(int data) { + byte[] bytes = new byte[]{(byte) (data >>> 0), (byte) (data >>> 8), + (byte) (data >>> 16), (byte) (data >>> 24)}; + return new BloomKeyProbe(bytes); + } + + public static BloomKeyProbe genBloomKeyProbe(Long data) { + byte[] bytes = new byte[]{ + (byte) (data >>> 0), (byte) (data >>> 8), (byte) (data >>> 16), (byte) (data >>> 24), + (byte) (data >>> 32), (byte) (data >>> 40), (byte) (data >>> 48), (byte) (data >>> 56)}; + return new BloomKeyProbe(bytes); + } + + public static BloomKeyProbe genBloomKeyProbe(float data) { + return genBloomKeyProbe(Float.floatToIntBits(data)); + } + + public static BloomKeyProbe genBloomKeyProbe(double data) { + return genBloomKeyProbe(Double.doubleToLongBits(data)); + } + + public static BloomKeyProbe genBloomKeyProbe(String data) { + return genBloomKeyProbe(data.getBytes(CharsetUtil.UTF_8)); + } + + public void addKey(BloomKeyProbe probe) { + long h = probe.initialHash(); + for (int i =0; i < nHashes; i++) { + long bitPos = pickBit(h, nBits); + bitmapSet(bitmap, bitPos); + h = probe.mixHash(h); + } + } + + public boolean mayContainKey(BloomKeyProbe probe) { + long h = probe.initialHash(); + int remHashes = nHashes; + while (remHashes >= 2) { + long bitPos1 = pickBit(h, nBits); + h = probe.mixHash(h); + long bitPos2 = pickBit(h, nBits); + h = probe.mixHash(h); + + if (!bitmapTest(bitmap, bitPos1) || !bitmapTest(bitmap, bitPos2)) { + return false; + } + + remHashes -= 2; + } + + while (remHashes != 0) { + long bitpos = pickBit(h, nBits); + if (!bitmapTest(bitmap, bitpos)) { + return false; + } + h = probe.mixHash(h); + remHashes--; + } + return true; + } + + public static class BloomKeyProbe { + + private long h1 = 0; + private long h2 = 0; + + BloomKeyProbe(byte[] key) { + long h = CityHash.cityHash64(key, 0, key.length); + + h1 = (0xFFFFFFFFL & h); + h2 = (h >>> 32); + } + + public long initialHash() { + return h1; + } + + public long mixHash(long h) { + return h + h2; + } + } + + public static class BloomFilterSizing { + + private int nBytes; + private int expectedCount; + + public BloomFilterSizing(int nBytes, double fpRate) { + this.nBytes = nBytes; + int nBits = nBytes * 8; + this.expectedCount = + (int) (Math.ceil(nBits * kNaturalLog2 * kNaturalLog2 / Math.log(fpRate))); + } + + public int getExpectedCount() { + return this.expectedCount; + } + + } + + private static double kNaturalLog2= 0.69314; + + private int computeOptimalHashCount(int n_bits, int elems) { + int nHashes = (int)(n_bits * kNaturalLog2 / elems); + if (nHashes < 1) nHashes = 1; + return nHashes; + } + + private void bitmapSet(byte[] bitmap, long idx) { + bitmap[(int)(idx >> 3)] |= 1 << (idx & 7); + } + + private boolean bitmapTest(byte[] bitmap, long idx) { + return (bitmap[(int)(idx >> 3)] & (1 << (idx & 7))) != 0; + } + + private long pickBit(long hash, int nBits) { + return hash % nBits; + } +} diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/CityHash.java b/java/kudu-client/src/main/java/org/apache/kudu/util/CityHash.java new file mode 100644 index 000000000..e86e1a641 --- /dev/null +++ b/java/kudu-client/src/main/java/org/apache/kudu/util/CityHash.java @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.util; + +public class CityHash { + + private static final long k0 = 0xa5b85c5e198ed849L; + private static final long k1 = 0x8d58ac26afe12e47L; + private static final long k2 = 0xc47b6e9e3a970ed3L; + private static final long k3 = 0xc70f6907e782aa0bL; + + private static long toLongLE(byte[] b, int i) { + return (((long)b[i + 7] << 56) | + ((long)(b[i + 6] & 0xFF) << 48) | + ((long)(b[i + 5] & 0xFF) << 40) | + ((long)(b[i + 4] & 0xFF) << 32) | + ((long)(b[i + 3] & 0xFF) << 24) | + ((long)(b[i + 2] & 0xFF) << 16) | + ((long)(b[i + 1] & 0xFF) << 8) | + ((long)(b[i] & 0xFF))); + } + + private static int toIntLE(byte[] b, int i) { + return (((b[i + 3] & 0xFF) << 24) | + ((b[i + 2] & 0xFF) << 16) | + ((b[i + 1] & 0xFF) << 8) | + (b[i] & 0xFF)); + } + + private static long fetch64(byte[] s, int pos) { + return toLongLE(s, pos); + } + + private static int fetch32(byte[] s, int pos) { + return toIntLE(s, pos); + } + + private static long rotate(long val, int shift) { + return shift == 0 ? val : (val >>> shift) | (val << (64 - shift)); + } + + private static long rotateByAtLeast1(long val, int shift) { + return (val >>> shift) | (val << (64 - shift)); + } + + private static long shiftMix(long val) { + return val ^ (val >>> 47); + } + + private static long hashLen16(long u, long v) { + return hash128to64(u, v); + } + + private static long hashLen0to16(byte[] s, int pos, int len) { + if (len > 8) { + long a = fetch64(s, pos); + long b = fetch64(s, pos + len - 8); + return hashLen16(a, rotateByAtLeast1(b + len, len)) ^ b; + } + if (len >= 4) { + long a = 0xFFFFFFFFL & fetch32(s, pos); + return hashLen16((a << 3) + len, 0xFFFFFFFFL & fetch32(s, pos + len - 4)); + } + if (len > 0) { + int a = s[pos] & 0xFF; + int b = s[pos + (len >>> 1)] & 0xFF; + int c = s[pos + len - 1] & 0xFF; + int y = a + (b << 8); + int z = len + (c << 2); + return shiftMix(y * k2 ^ z * k3) * k2; + } + return k2; + } + + private static long hashLen17to32(byte[] s, int pos, int len) { + long a = fetch64(s, pos) * k1; + long b = fetch64(s, pos + 8); + long c = fetch64(s, pos + len - 8) * k2; + long d = fetch64(s, pos + len - 16) * k0; + long x = hashLen16( + rotate(a - b, 43) + rotate(c, 30) + d, + a + rotate(b ^ k3, 20) - c + len); + return x; + } + private static long[] weakHashLen32WithSeeds( + long w, long x, long y, long z, + long a, long b) { + + a += w; + b = rotate(b + a + z, 51); + long c = a; + a += x; + a += y; + b += rotate(a, 23); + return new long[]{ a + z, b + c }; + } + + private static long[] weakHashLen32WithSeeds(byte[] s, int pos, long a, long b) { + return weakHashLen32WithSeeds( + fetch64(s, pos), + fetch64(s, pos + 8), + fetch64(s, pos + 16), + fetch64(s, pos + 24), + a, + b + ); + } + + public static long cityHash64(byte[] s, int pos, int len) { + if (len <= 32) { + if (len <= 16) { + return hashLen0to16(s, pos, len); + } else { + return hashLen17to32(s, pos, len); + } + } else if (len <= 64) { + return hashLen33to64(s, pos, len); + } + + long x = fetch64(s, pos + len - 40); + long y = fetch64(s, pos + len - 16) + fetch64(s, pos + len - 56); + long z = hashLen16(fetch64(s, pos + len - 48) + len, fetch64(s, pos + len - 24)); + + long [] v = weakHashLen32WithSeeds(s, pos + len - 64, len, z); + long [] w = weakHashLen32WithSeeds(s, pos + len - 32, y + k1, x); + x = x * k1 + fetch64(s, pos); + + len = (len - 1) & (~63); + do { + x = rotate(x + y + v[0] + fetch64(s, pos + 8), 37) * k1; + y = rotate(y + v[1] + fetch64(s, pos + 48), 42) * k1; + x ^= w[1]; + y += v[0] + fetch64(s, pos + 40); + z = rotate(z + w[0], 33) * k1; + v = weakHashLen32WithSeeds(s, pos, v[1] * k1, x + w[0]); + w = weakHashLen32WithSeeds(s, pos + 32, z + w[1], y + fetch64(s, pos + 16)); + { long swap = z; z = x; x = swap; } + pos += 64; + len -= 64; + } while (len != 0); + + return hashLen16( + hashLen16(v[0], w[0]) + shiftMix(y) * k1 + z, + hashLen16(v[1], w[1]) + x); + } + + public static long cityHash64WithSeed(byte[] s, int pos, int len, long seed) { + return cityHash64WithSeeds(s, pos, len, k2, seed); + } + + public static long cityHash64WithSeeds(byte[] s, int pos, int len, long seed0, long seed1) { + return hashLen16(cityHash64(s, pos, len) - seed0, seed1); + } + public static long[] cityMurmur(byte[] s, int pos, int len, long seed0, long seed1) { + long a = seed0; + long b = seed1; + long c = 0; + long d = 0; + + int l = len - 16; + if (l <= 0) { + c = b * k1 + hashLen0to16(s, pos, len); + d = rotate(a + (len >= 8 ? fetch64(s, pos) : c), 32); + } else { + c = hashLen16(fetch64(s, pos + len - 8) + k1, a); + d = hashLen16(b + len, c + fetch64(s, pos + len - 16)); + a += d; + do { + a ^= shiftMix(fetch64(s, pos) * k1) * k1; + a *= k1; + b ^= a; + c ^= shiftMix(fetch64(s, pos + 8) * k1) * k1; + c *= k1; + d ^= c; + pos += 16; + l -= 16; + } while (l > 0); + } + a = hashLen16(a, c); + b = hashLen16(d, b); + return new long[]{ a ^ b, hashLen16(b, a) }; + } + + private static final long kMul = 0xc6a4a7935bd1e995L; + private static long hash128to64(long u, long v) { + long a = (v ^ u) * kMul; + a ^= (a >>> 47); + long b = (u ^ a) * kMul; + b ^= (b >>> 47); + b *= kMul; + return b; + } + private static long hashLen33to64(byte[] s, int pos, int len) { + long z = fetch64(s, pos + 24); + long a = fetch64(s, pos) + (fetch64(s, pos + len - 16) + len) * k0; + long b = rotate(a + z, 52); + long c = rotate(a, 37); + + a += fetch64(s, pos + 8); + c += rotate(a, 7); + a += fetch64(s, pos + 16); + + long vf = a + z; + long vs = b + rotate(a, 31) + c; + + a = fetch64(s, pos + 16) + fetch64(s, pos + len - 32); + z += fetch64(s, pos + len - 8); + b = rotate(a + z, 52); + c = rotate(a, 37); + a += fetch64(s, pos + len - 24); + c += rotate(a, 7); + a += fetch64(s, pos + len -16); + + long wf = a + z; + long ws = b + rotate(a, 31) + c; + long r = shiftMix((vf + ws) * k2 + (wf + vs) * k0); + + return shiftMix(r * k0 + vs) * k2; + + } + public static long[] cityHash128WithSeed(byte[] s, int pos, int len, long seed0, long seed1) { + if (len < 128) { + return cityMurmur(s, pos, len, seed0, seed1); + } + + long[] v = new long[2], w = new long[2]; + long x = seed0; + long y = seed1; + long z = len * k1; + v[0] = rotate(y ^ k1, 49) * k1 + fetch64(s, pos); + v[1] = rotate(v[0], 42) * k1 + fetch64(s, pos + 8); + w[0] = rotate(y + z, 35) * k1 + x; + w[1] = rotate(x + fetch64(s, pos + 88), 53) * k1; + + do { + x = rotate(x + y + v[0] + fetch64(s, pos + 16), 37) * k1; + y = rotate(y + v[1] + fetch64(s, pos + 48), 42) * k1; + + x ^= w[1]; + y ^= v[0]; + z = rotate(z ^ w[0], 33); + v = weakHashLen32WithSeeds(s, pos, v[1] * k1, x + w[0]); + w = weakHashLen32WithSeeds(s, pos + 32, z + w[1], y); + { long swap = z; z = x; x = swap; } + pos += 64; + x = rotate(x + y + v[0] + fetch64(s, pos + 16), 37) * k1; + y = rotate(y + v[1] + fetch64(s, pos + 48), 42) * k1; + x ^= w[1]; + y ^= v[0]; + z = rotate(z ^ w[0], 33); + v = weakHashLen32WithSeeds(s, pos, v[1] * k1, x + w[0]); + w = weakHashLen32WithSeeds(s, pos + 32, z + w[1], y); + { long swap = z; z = x; x = swap; } + pos += 64; + len -= 128; + } while (len >= 128); + + y += rotate(w[0], 37) * k0 + z; + x += rotate(v[0] + z, 49) * k0; + for (int tail_done = 0; tail_done < len; ) { + tail_done += 32; + y = rotate(y - x, 42) * k0 + v[1]; + w[0] += fetch64(s, pos + len - tail_done + 16); + x = rotate(x, 49) * k0 + w[0]; + w[0] += v[0]; + v = weakHashLen32WithSeeds(s, pos + len - tail_done, v[0], v[1]); + } + + x = hashLen16(x, v[0]); + y = hashLen16(y, w[0]); + + return new long[]{ + hashLen16(x + v[1], w[1]) + y, + hashLen16(x + w[1], y + v[1]) + }; + } + + public static long[] cityHash128(byte[] s, int pos, int len) { + if (len >= 16) { + return cityHash128WithSeed( + s, pos + 16, + len - 16, + fetch64(s, pos) ^ k3, + fetch64(s, pos + 8)); + } else if (len >= 8) { + return cityHash128WithSeed( + new byte[0], 0, 0, + fetch64(s, pos) ^ (len * k0), + fetch64(s, pos + len -8) ^ k1); + } else { + return cityHash128WithSeed(s, pos, len, k0, k1); + } + } +} diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestBloomFilter.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestBloomFilter.java new file mode 100644 index 000000000..c5cdc7235 --- /dev/null +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestBloomFilter.java @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.kudu.client; + + +import static org.junit.Assert.assertTrue; + +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import org.apache.kudu.util.BloomFilter; +import org.junit.Test; + +public class TestBloomFilter { + + @Test + public void testInt() { + int kRandomSeed = 0xdeadbeef; + int nKeys = 2000; + double targetFPRate = 0.01; + BloomFilter bf = + new BloomFilter(new BloomFilter.BloomFilterSizing(32 * 1024, targetFPRate)); + addRandomKeys(kRandomSeed,nKeys, bf, "INT"); + checkRandomKeys(kRandomSeed, nKeys, bf, "INT"); + checkFP(kRandomSeed, nKeys, 10000, bf, targetFPRate, "INT"); + } + + @Test + public void testLong() { + int kRandomSeed = 0xdeadbeef; + int nKeys = 2000; + double targetFPRate = 0.01; + BloomFilter bf = + new BloomFilter(new BloomFilter.BloomFilterSizing(32 * 1024, targetFPRate)); + addRandomKeys(kRandomSeed,nKeys, bf, "LONG"); + checkRandomKeys(kRandomSeed, nKeys, bf, "LONG"); + checkFP(kRandomSeed, nKeys, 10000, bf, targetFPRate, "LONG"); + } + + @Test + public void testFloat() { + int kRandomSeed = 0xdeadbeef; + int nKeys = 2000; + double targetFPRate = 0.01; + BloomFilter bf = + new BloomFilter(new BloomFilter.BloomFilterSizing(32 * 1024, targetFPRate)); + addRandomKeys(kRandomSeed,nKeys, bf, "FLOAT"); + checkRandomKeys(kRandomSeed, nKeys, bf, "FLOAT"); + + // We multiply the fp rate by 10 because there can be precision lost when get probe with float + checkFP(kRandomSeed, nKeys, 10000, bf, targetFPRate * 10, "FLOAT"); + } + + @Test + public void testDouble() { + int kRandomSeed = 0xdeadbeef; + int nKeys = 2000; + double targetFPRate = 0.01; + BloomFilter bf = + new BloomFilter(new BloomFilter.BloomFilterSizing(32 * 1024, targetFPRate)); + addRandomKeys(kRandomSeed,nKeys, bf, "DOUBLE"); + checkRandomKeys(kRandomSeed, nKeys, bf, "DOUBLE"); + + // We multiply the fp rate by 10 because there can be precision lost when get probe with double + checkFP(kRandomSeed, nKeys, 10000, bf, targetFPRate * 10, "DOUBLE"); + } + + private void addRandomKeys(int seed, int nKeys, BloomFilter bf, String tag) { + Random rand = new Random(seed); + for (int i = 0; i < nKeys; i++) { + Object num = genObjByTypeTag(tag, rand); + BloomFilter.BloomKeyProbe probe = genProbeFromObj(num); + bf.addKey(probe); + } + } + + private void checkRandomKeys(int seed, int nKeys, BloomFilter bf, String tag) { + Random rand = new Random(seed); + for (int i = 0; i< nKeys; i++) { + Object num = genObjByTypeTag(tag, rand); + BloomFilter.BloomKeyProbe probe = genProbeFromObj(num); + assertTrue(bf.mayContainKey(probe)); + } + } + + private void checkFP( + int seed, int nKeys, int nQueries, BloomFilter bf, double targetFPRate, String tag) { + Set previousKeys = new HashSet<>(); + Random previousRand = new Random(seed); + for (int i = 0; i< nKeys; i++) { + previousKeys.add(genObjByTypeTag(tag, previousRand)); + } + + Random rand = new Random(); + int numPositive = 0; + for (int i = 0; i < nQueries;) { + Object num = genObjByTypeTag(tag, rand); + if (previousKeys.contains(num)) { + continue; + } else { + if (bf.mayContainKey(genProbeFromObj(num))) + { + numPositive++; + } + i++; + } + } + double fpRate = ((double) numPositive) / ((double) nQueries); + assertTrue(fpRate < targetFPRate); + } + + private Object genObjByTypeTag(String tag, Random rand) { + Object ret = null; + if (tag.equals("BOOLEAN")) { + ret = rand.nextBoolean(); + } else if (tag.equals("BYTE")) { + byte[] bytes = new byte[1]; + rand.nextBytes(bytes); + ret = bytes[0]; + } else if (tag.equals("SHORT")) { + ret = (short) (rand.nextInt()); + } else if (tag.equals("INT")) { + ret = rand.nextInt(); + } else if (tag.equals("LONG")) { + ret = rand.nextLong(); + } else if (tag.equals("FLOAT")) { + ret = rand.nextFloat(); + } else if (tag.equals("DOUBLE")) { + ret = rand.nextDouble(); + } + return ret; + } + + private BloomFilter.BloomKeyProbe genProbeFromObj(Object obj) { + if (obj instanceof Boolean) { + return BloomFilter.genBloomKeyProbe((Boolean) obj); + } + if (obj instanceof Byte) { + return BloomFilter.genBloomKeyProbe((Byte) obj); + } + if (obj instanceof Short) { + return BloomFilter.genBloomKeyProbe((Short) obj); + } + if (obj instanceof Integer) { + return BloomFilter.genBloomKeyProbe((Integer) obj); + } + if (obj instanceof Long) { + return BloomFilter.genBloomKeyProbe((Long) obj); + } + if (obj instanceof Float) { + return BloomFilter.genBloomKeyProbe((Float) obj); + } + if (obj instanceof Double) { + return BloomFilter.genBloomKeyProbe((Double) obj); + } + return null; + } +}