diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java index cce79725e4..482394d396 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java @@ -200,55 +200,20 @@ public class BlockCacheUtil { */ public static int validateBlockAddition(Cacheable existing, Cacheable newBlock, BlockCacheKey cacheKey) { - int comparison = compareCacheBlock(existing, newBlock, false); + int comparison = compareCacheBlock(existing, newBlock, true); if (comparison != 0) { - throw new RuntimeException("Cached block contents differ, which should not have happened." - + "cacheKey:" + cacheKey); - } - if ((existing instanceof HFileBlock) && (newBlock instanceof HFileBlock)) { - comparison = ((HFileBlock) existing).getNextBlockOnDiskSize() - - ((HFileBlock) newBlock).getNextBlockOnDiskSize(); - } - return comparison; - } + LOG.warn("Cached block contents differ, trying to just compare the block contents " + + "without the next block. CacheKey: " + cacheKey); - /** - * Because of the region splitting, it's possible that the split key locate in the middle of a - * block. So it's possible that both the daughter regions load the same block from their parent - * HFile. When pread, we don't force the read to read all of the next block header. So when two - * threads try to cache the same block, it's possible that one thread read all of the next block - * header but the other one didn't. if the already cached block hasn't next block header but the - * new block to cache has, then we can replace the existing block with the new block for better - * performance.(HBASE-20447) - * @param blockCache BlockCache to check - * @param cacheKey the block cache key - * @param newBlock the new block which try to put into the block cache. - * @return true means need to replace existing block with new block for the same block cache key. - * false means just keep the existing block. - */ - public static boolean shouldReplaceExistingCacheBlock(BlockCache blockCache, - BlockCacheKey cacheKey, Cacheable newBlock) { - Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false); - try { - int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey); - if (comparison < 0) { - LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the new block has " - + "nextBlockOnDiskSize set. Caching new block."); - return true; - } else if (comparison > 0) { - LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the existing block has " - + "nextBlockOnDiskSize set, Keeping cached block."); - return false; - } else { - LOG.warn("Caching an already cached block: {}. This is harmless and can happen in rare " - + "cases (see HBASE-8547)", - cacheKey); - return false; + // compare the contents, if they are not equal, we are in big trouble + int comparisonWithoutNextBlockMetadata = compareCacheBlock(existing, newBlock, false); + + if (comparisonWithoutNextBlockMetadata != 0) { + throw new RuntimeException( + "Cached block contents differ, which should not have happened." + "cacheKey:" + cacheKey); } - } finally { - // return the block since we need to decrement the count - blockCache.returnBlock(cacheKey, existingBlock); } + return comparison; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index ecbf37c78b..3eef3ebaf8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -383,8 +383,21 @@ public class LruBlockCache implements FirstLevelBlockCache { } LruCachedBlock cb = map.get(cacheKey); - if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) { - return; + if (cb != null) { + int comparison = BlockCacheUtil.validateBlockAddition(cb.getBuffer(), buf, cacheKey); + if (comparison != 0) { + if (comparison < 0) { + LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Keeping cached block."); + return; + } else { + LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Caching new block."); + } + } else { + String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey(); + msg += ". This is harmless and can happen in rare cases (see HBASE-8547)"; + LOG.debug(msg); + return; + } } long currentSize = size.get(); long currentAcceptableSize = acceptableSize(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 1c418f5a3b..218a808156 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -423,23 +423,34 @@ public class BucketCache implements BlockCache, HeapSize { */ public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) { - if (cacheEnabled) { - if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { - if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) { - cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); - } - } else { - cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); - } - } - } - - private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, - boolean inMemory, boolean wait) { if (!cacheEnabled) { return; } LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); + + if (backingMap.containsKey(cacheKey)) { + Cacheable existingBlock = getBlockFromBackingMap(cacheKey, false, false, false); + + if (existingBlock != null) { + try { + int comparison = + BlockCacheUtil.validateBlockAddition(existingBlock, cachedItem, cacheKey); + if (comparison != 0) { + LOG.debug("Cached block contents differ by nextBlockOnDiskSize. Keeping cached block."); + return; + } else { + String msg = "Caching an already cached block: " + cacheKey; + msg += ". This is harmless and can happen in rare cases (see HBASE-8547)"; + LOG.debug(msg); + return; + } + } finally { + // return the block since we need to decrement the count + returnBlock(cacheKey, existingBlock); + } + } + } + // Stuff the entry into the RAM cache so it can get drained to the persistent store RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); @@ -453,6 +464,11 @@ public class BucketCache implements BlockCache, HeapSize { if (ramCache.putIfAbsent(cacheKey, re) != null) { return; } + // There is no lock between check ramCache and backingMap, so check again + if (backingMap.containsKey(cacheKey)) { + ramCache.remove(cacheKey); + return; + } int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); BlockingQueue bq = writerQueues.get(queueNum); boolean successfulAddition = false; @@ -489,6 +505,15 @@ public class BucketCache implements BlockCache, HeapSize { if (!cacheEnabled) { return null; } + Cacheable cachedBlock = getBlockFromRamCache(key, caching, repeat, updateCacheMetrics); + if (cachedBlock != null) { + return cachedBlock; + } + return getBlockFromBackingMap(key, caching, repeat, updateCacheMetrics); + } + + private Cacheable getBlockFromRamCache(BlockCacheKey key, boolean caching, boolean repeat, + boolean updateCacheMetrics) { RAMQueueEntry re = ramCache.get(key); if (re != null) { if (updateCacheMetrics) { @@ -497,6 +522,11 @@ public class BucketCache implements BlockCache, HeapSize { re.access(accessCount.incrementAndGet()); return re.getData(); } + return null; + } + + private Cacheable getBlockFromBackingMap(BlockCacheKey key, boolean caching, boolean repeat, + boolean updateCacheMetrics) { BucketEntry bucketEntry = backingMap.get(key); if (bucketEntry != null) { long start = System.nanoTime(); @@ -557,37 +587,6 @@ public class BucketCache implements BlockCache, HeapSize { return evictBlock(cacheKey, true); } - // does not check for the ref count. Just tries to evict it if found in the - // bucket map - private boolean forceEvict(BlockCacheKey cacheKey) { - if (!cacheEnabled) { - return false; - } - RAMQueueEntry removedBlock = checkRamCache(cacheKey); - BucketEntry bucketEntry = backingMap.get(cacheKey); - if (bucketEntry == null) { - if (removedBlock != null) { - cacheStats.evicted(0, cacheKey.isPrimary()); - return true; - } else { - return false; - } - } - ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); - try { - lock.writeLock().lock(); - if (backingMap.remove(cacheKey, bucketEntry)) { - blockEvicted(cacheKey, bucketEntry, removedBlock == null); - } else { - return false; - } - } finally { - lock.writeLock().unlock(); - } - cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); - return true; - } - private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) { RAMQueueEntry removedBlock = ramCache.remove(cacheKey); if (removedBlock != null) { @@ -932,31 +931,6 @@ public class BucketCache implements BlockCache, HeapSize { LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); } - /** - * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing - * cache with a new block for the same cache key. there's a corner case: one thread cache a - * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another - * new block with the same cache key do the same thing for the same cache key, so if not evict - * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone - * but the bucketAllocator do not free its memory. - * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey - * cacheKey, Cacheable newBlock) - * @param key Block cache key - * @param bucketEntry Bucket entry to put into backingMap. - */ - private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { - BucketEntry previousEntry = backingMap.put(key, bucketEntry); - if (previousEntry != null && previousEntry != bucketEntry) { - ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset()); - lock.writeLock().lock(); - try { - blockEvicted(key, previousEntry, false); - } finally { - lock.writeLock().unlock(); - } - } - } - /** * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. * Process all that are passed in even if failure being sure to remove from ramCache else we'll @@ -1038,7 +1012,7 @@ public class BucketCache implements BlockCache, HeapSize { BlockCacheKey key = entries.get(i).getKey(); // Only add if non-null entry. if (bucketEntries[i] != null) { - putIntoBackingMap(key, bucketEntries[i]); + backingMap.put(key, bucketEntries[i]); } // Always remove from ramCache even if we failed adding it to the block cache above. RAMQueueEntry ramCacheEntry = ramCache.remove(key); @@ -1049,8 +1023,13 @@ public class BucketCache implements BlockCache, HeapSize { ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset()); try { lock.writeLock().lock(); - if (backingMap.remove(key, bucketEntries[i])) { - blockEvicted(key, bucketEntries[i], false); + int refCount = bucketEntries[i].getRefCount(); + if (refCount == 0) { + if (backingMap.remove(key, bucketEntries[i])) { + blockEvicted(key, bucketEntries[i], false); + } + } else { + bucketEntries[i].markForEvict(); } } finally { lock.writeLock().unlock(); @@ -1693,7 +1672,7 @@ public class BucketCache implements BlockCache, HeapSize { if (bucketEntry != null) { int refCount = bucketEntry.decrementRefCountAndGet(); if (refCount == 0 && bucketEntry.isMarkedForEvict()) { - forceEvict(cacheKey); + evictBlock(cacheKey, true); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 19c1d66b09..73ac1275c3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -203,19 +203,14 @@ public class TestBucketCache { CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); } - private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) - throws InterruptedException { - while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { - Thread.sleep(100); - } - } - // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer // threads will flush it to the bucket and put reference entry in backingMap. private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, Cacheable block) throws InterruptedException { cache.cacheBlock(cacheKey, block); - waitUntilFlushedToBucket(cache, cacheKey); + while (!cache.backingMap.containsKey(cacheKey)) { + Thread.sleep(100); + } } @Test @@ -424,7 +419,7 @@ public class TestBucketCache { } @Test - public void testCacheBlockNextBlockMetadataMissing() throws Exception { + public void testCacheBlockNextBlockMetadataMissing() { int size = 100; int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; byte[] byteArr = new byte[length]; @@ -446,8 +441,6 @@ public class TestBucketCache { CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, block1Buffer); - waitUntilFlushedToBucket(cache, key); - // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, block1Buffer); @@ -458,8 +451,6 @@ public class TestBucketCache { CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, block2Buffer); - waitUntilFlushedToBucket(cache, key); - // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, block1Buffer);