diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2bd2eea..39c2b02 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1051,6 +1051,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Optimized hashtable (see hive.mapjoin.optimized.hashtable) uses a chain of buffers to\n" + "store data. This is one buffer size. HT may be slightly faster if this is larger, but for small\n" + "joins unnecessary memory will be allocated and then trimmed."), + HIVEHYBRIDGRACEHASHJOINBLOOMFILTER("hive.mapjoin.hybridgrace.bloomfilter", true, "Whether to " + + "use BloomFilter in Hybrid grace hash join to minimize unnecessary spilling."), HIVESMBJOINCACHEROWS("hive.smbjoin.cache.rows", 10000, "How many rows with the same key value should be cached in memory per smb joined table."), diff --git common/src/test/org/apache/hive/common/util/TestBloomFilter.java common/src/test/org/apache/hive/common/util/TestBloomFilter.java index 7c2a941..63c7050 100644 --- common/src/test/org/apache/hive/common/util/TestBloomFilter.java +++ common/src/test/org/apache/hive/common/util/TestBloomFilter.java @@ -70,6 +70,12 @@ public void testBloomNumBits() { assertEquals(729844, BloomFilter.optimalNumOfBits(100000, 0.03)); assertEquals(7298440, BloomFilter.optimalNumOfBits(1000000, 0.03)); assertEquals(6235224, BloomFilter.optimalNumOfBits(1000000, 0.05)); + assertEquals(1870567268, BloomFilter.optimalNumOfBits(300000000, 0.05)); + assertEquals(1437758756, BloomFilter.optimalNumOfBits(300000000, 0.1)); + assertEquals(432808512, BloomFilter.optimalNumOfBits(300000000, 0.5)); + assertEquals(1393332198, BloomFilter.optimalNumOfBits(3000000000L, 0.8)); + assertEquals(657882327, BloomFilter.optimalNumOfBits(3000000000L, 0.9)); + assertEquals(0, BloomFilter.optimalNumOfBits(3000000000L, 1)); } @Test diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index e4a2b35..4776f4b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -111,7 +111,9 @@ * This is a cheap exit option to prevent spilling the big-table in such a * scenario. */ - private transient final BloomFilter bloom1; + private transient BloomFilter bloom1 = null; + private final boolean useBloomFilter; + private final int BLOOM_FILTER_MAX_SIZE = 300000000; private final List EMPTY_LIST = new ArrayList(0); @@ -276,16 +278,18 @@ public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryA HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT), + HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINBLOOMFILTER), estimatedTableSize, keyCount, memoryAvailable, nwayConf, HiveUtils.getLocalDirList(hconf)); } private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor, int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, float probePercent, - long estimatedTableSize, long keyCount, long memoryAvailable, HybridHashTableConf nwayConf, - String spillLocalDirs) + boolean useBloomFilter, long estimatedTableSize, long keyCount, long memoryAvailable, + HybridHashTableConf nwayConf, String spillLocalDirs) throws SerDeException, IOException { directWriteHelper = new MapJoinBytesTableContainer.DirectKeyValueWriter(); + this.useBloomFilter = useBloomFilter; int newKeyCount = HashMapWrapper.calculateTableSize( keyCountAdj, threshold, loadFactor, keyCount); @@ -331,18 +335,29 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac // We also want to limit the size of writeBuffer, because we normally have 16 partitions, that // makes spilling prediction (isMemoryFull) to be too defensive which results in unnecessary spilling writeBufferSize = writeBufferSize < minWbSize ? minWbSize : Math.min(maxWbSize / numPartitions, writeBufferSize); + LOG.info("Write buffer size: " + writeBufferSize); + memoryUsed = 0; - this.bloom1 = new BloomFilter(newKeyCount); - - if (LOG.isInfoEnabled()) { + if (useBloomFilter) { + if (newKeyCount <= BLOOM_FILTER_MAX_SIZE) { + this.bloom1 = new BloomFilter(newKeyCount); + } else { + // To avoid having a huge BloomFilter we need to scale up False Positive Probability + double fpp = calcFPP(newKeyCount); + assert fpp < 1 : "Too many keys! BloomFilter False Positive Probability is 1!"; + if (fpp >= 0.5) { + LOG.warn("BloomFilter FPP is greater than 0.5!"); + } + LOG.info("BloomFilter is using FPP: " + fpp); + this.bloom1 = new BloomFilter(newKeyCount, fpp); + } LOG.info(String.format("Using a bloom-1 filter %d keys of size %d bytes", - newKeyCount, bloom1.sizeInBytes())); - LOG.info("Write buffer size: " + writeBufferSize); + newKeyCount, bloom1.sizeInBytes())); + memoryUsed = bloom1.sizeInBytes(); } hashPartitions = new HashPartition[numPartitions]; int numPartitionsSpilledOnCreation = 0; - memoryUsed = bloom1.sizeInBytes(); int initialCapacity = Math.max(newKeyCount / numPartitions, threshold / numPartitions); // maxCapacity should be calculated based on a percentage of memoryThreshold, which is to divide // row size using long size @@ -406,6 +421,22 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac } } + /** + * Calculate the proper False Positive Probability so that the BloomFilter won't grow too big + * @param keyCount number of keys + * @return FPP + */ + private double calcFPP(int keyCount) { + int n = keyCount; + double p = 0.05; + + // Calculation below is consistent with BloomFilter.optimalNumOfBits() + // We are capping the BloomFilter size below 100 MB + while ((-n * Math.log(p) / (Math.log(2) * Math.log(2))) > 800000000) { + p += 0.05; + } + return p; + } public MapJoinBytesTableContainer.KeyValueHelper getWriteHelper() { return writeHelper; @@ -424,7 +455,7 @@ public long getMemoryThreshold() { * @return current memory usage */ private long refreshMemoryUsed() { - long memUsed = bloom1.sizeInBytes(); + long memUsed = useBloomFilter ? bloom1.sizeInBytes() : 0; for (HashPartition hp : hashPartitions) { if (hp.hashMap != null) { memUsed += hp.hashMap.memorySize(); @@ -477,7 +508,9 @@ private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper, int partitionId = keyHash & (hashPartitions.length - 1); HashPartition hashPartition = hashPartitions[partitionId]; - bloom1.addLong(keyHash); + if (useBloomFilter) { + bloom1.addLong(keyHash); + } if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { // destination on disk putToSidefile = true; @@ -909,7 +942,7 @@ public ReusableRowContainer() { public JoinUtil.JoinResult setFromOutput(Output output) throws HiveException { int keyHash = HashCodeUtil.murmurHash(output.getData(), 0, output.getLength()); - if (!bloom1.testLong(keyHash)) { + if (useBloomFilter && !bloom1.testLong(keyHash)) { /* * if the keyHash is missing in the bloom filter, then the value cannot * exist in any of the spilled partition - return NOMATCH @@ -1063,7 +1096,7 @@ public void write(MapJoinObjectSerDeContext valueContext, ObjectOutputStream out int keyHash = HashCodeUtil.murmurHash(bytes, offset, length); partitionId = keyHash & (hashPartitions.length - 1); - if (!bloom1.testLong(keyHash)) { + if (useBloomFilter && !bloom1.testLong(keyHash)) { /* * if the keyHash is missing in the bloom filter, then the value cannot exist in any of the * spilled partition - return NOMATCH