From a91b8e2482db2a33ca7eec14befd11e7d02a66e2 Mon Sep 17 00:00:00 2001 From: stack Date: Tue, 5 Aug 2014 14:16:32 -0700 Subject: [PATCH] When we failed add an entry failing with a CacheFullException -- a common occurance when heavy read load and lots of cache misses -- we would drop the entry were were trying to add failing to remove it from the onheap Map. Over time the onheap Map would fill with these orphans. Add some doc. Moved WriterThread config out of constructor so we do all thread config in one place -- name setting, daemon setting -- rather than split. Some clean up of logging messages. Put building of log strings behind an isDebugEnabled check. Add log if we fail to add items to BucketCache and why free is running Minor formatting. --- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 137 ++++++++++++--------- 1 file changed, 76 insertions(+), 61 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 9a8cf5a..4d260b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -12,7 +12,7 @@ * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - + * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and @@ -77,14 +77,14 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * determine if a given element is in the cache. The bucket cache can use on-heap or * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to * store/read the block data. - * + * *

Eviction is via a similar algorithm as used in * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} - * + * *

BucketCache can be used as mainly a block cache (see * {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS GC and * heap fragmentation. - * + * *

It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store * blocks) to enlarge cache space via * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache} @@ -123,7 +123,14 @@ public class BucketCache implements BlockCache, HeapSize { */ private volatile boolean cacheEnabled; - private ArrayList> writerQueues = + /** + * A list of writer queues. We have a queue per {@link WriterThread} we have running. + * In other words, the work adding blocks to the BucketCache is divided up amongst the + * running WriterThreads. Its done by taking hash of the cache key modulo queue count. + * WriterThread when it runs takes whatever has been recently added and 'drains' the entries + * to the BucketCache. It then updates the ramCache and backingMap accordingly. + */ + private ArrayList> writerQueues = new ArrayList>(); WriterThread writerThreads[]; @@ -169,7 +176,7 @@ public class BucketCache implements BlockCache, HeapSize { * A "sparse lock" implementation allowing to lock on a particular block * identified by offset. The purpose of this is to avoid freeing the block * which is being read. - * + * * TODO:We could extend the IdLock to IdReadWriteLock for better. */ private IdLock offsetLock = new IdLock(); @@ -200,7 +207,7 @@ public class BucketCache implements BlockCache, HeapSize { this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION); } - + public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) throws FileNotFoundException, IOException { @@ -245,6 +252,7 @@ public class BucketCache implements BlockCache, HeapSize { for (int i = 0; i < writerThreads.length; ++i) { writerThreads[i] = new WriterThread(writerQueues.get(i), i); writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); + writerThreads[i].setDaemon(true); writerThreads[i].start(); } // Run the statistics thread periodically to print the cache statistics log @@ -483,11 +491,11 @@ public class BucketCache implements BlockCache, HeapSize { "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " + - "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : + "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" + cacheStats.getHitCachingCount() + ", " + - "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : + "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" + cacheStats.getEvictedCount() + ", " + @@ -526,45 +534,46 @@ public class BucketCache implements BlockCache, HeapSize { * Free the space if the used size reaches acceptableSize() or one size block * couldn't be allocated. When freeing the space, we use the LRU algorithm and * ensure there must be some blocks evicted + * @param why Why we are being called */ - private void freeSpace() { + private void freeSpace(final String why) { // Ensure only one freeSpace progress at a time if (!freeSpaceLock.tryLock()) return; try { freeInProgress = true; long bytesToFreeWithoutExtra = 0; - /* - * Calculate free byte for each bucketSizeinfo - */ - StringBuffer msgBuffer = new StringBuffer(); + // Calculate free byte for each bucketSizeinfo + StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null; BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); long[] bytesToFreeForBucket = new long[stats.length]; for (int i = 0; i < stats.length; i++) { bytesToFreeForBucket[i] = 0; - long freeGoal = (long) Math.floor(stats[i].totalCount() - * (1 - DEFAULT_MIN_FACTOR)); + long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR)); freeGoal = Math.max(freeGoal, 1); if (stats[i].freeCount() < freeGoal) { - bytesToFreeForBucket[i] = stats[i].itemSize() - * (freeGoal - stats[i].freeCount()); + bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; - msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" + if (msgBuffer != null) { + msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); + } } } - msgBuffer.append("Free for total=" - + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); + if (msgBuffer != null) { + msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); + } if (bytesToFreeWithoutExtra <= 0) { return; } long currentSize = bucketAllocator.getUsedSize(); long totalSize=bucketAllocator.getTotalSize(); - LOG.debug("Bucket cache free space started; Attempting to " + msgBuffer.toString() - + " of current used=" + StringUtils.byteDesc(currentSize) - + ",actual cacheSize=" + StringUtils.byteDesc(realCacheSize.get()) - + ",total=" + StringUtils.byteDesc(totalSize)); - + if (LOG.isDebugEnabled() && msgBuffer != null) { + LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + + StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize)); + } + long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra * (1 + DEFAULT_EXTRA_FREE_FACTOR)); @@ -622,8 +631,7 @@ public class BucketCache implements BlockCache, HeapSize { stats = bucketAllocator.getIndexStatistics(); boolean needFreeForExtra = false; for (int i = 0; i < stats.length; i++) { - long freeGoal = (long) Math.floor(stats[i].totalCount() - * (1 - DEFAULT_MIN_FACTOR)); + long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR)); freeGoal = Math.max(freeGoal, 1); if (stats[i].freeCount() < freeGoal) { needFreeForExtra = true; @@ -639,8 +647,7 @@ public class BucketCache implements BlockCache, HeapSize { bucketQueue.add(bucketMulti); while ((bucketGroup = bucketQueue.poll()) != null) { - long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) - / remainingBuckets; + long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; bytesFreed += bucketGroup.free(bucketBytesToFree); remainingBuckets--; } @@ -650,12 +657,14 @@ public class BucketCache implements BlockCache, HeapSize { long single = bucketSingle.totalSize(); long multi = bucketMulti.totalSize(); long memory = bucketMemory.totalSize(); - LOG.debug("Bucket cache free space completed; " + "freed=" + if (LOG.isDebugEnabled()) { + LOG.debug("Bucket cache free space completed; " + "freed=" + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); + } } } finally { @@ -667,17 +676,16 @@ public class BucketCache implements BlockCache, HeapSize { // This handles flushing the RAM cache to IOEngine. private class WriterThread extends HasThread { - BlockingQueue inputQueue; - final int threadNO; - boolean writerEnabled = true; + private final BlockingQueue inputQueue; + private final int threadNO; + private boolean writerEnabled = true; WriterThread(BlockingQueue queue, int threadNO) { super(); this.inputQueue = queue; this.threadNO = threadNO; - setDaemon(true); } - + // Used for test void disableWriter() { this.writerEnabled = false; @@ -698,7 +706,12 @@ public class BucketCache implements BlockCache, HeapSize { } catch (InterruptedException ie) { if (!cacheEnabled) break; } - doDrain(entries); + int countBefore = entries.size(); + int countAfter = doDrain(entries); + if (LOG.isDebugEnabled() && countBefore != countAfter) { + LOG.debug("Failed drain all: countBefore=" + countBefore + ", countAfter=" + + countAfter); + } } catch (Exception ioe) { LOG.error("WriterThread encountered error", ioe); } @@ -712,36 +725,38 @@ public class BucketCache implements BlockCache, HeapSize { /** * Flush the entries in ramCache to IOEngine and add bucket entry to * backingMap - * @param entries + * @param entries Presumes list passed in here will not be modified by anyone but this method. + * @return Count of entries processed * @throws InterruptedException */ - private void doDrain(List entries) - throws InterruptedException { + private int doDrain(List entries) throws InterruptedException { BucketEntry[] bucketEntries = new BucketEntry[entries.size()]; RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()]; int done = 0; - while (entries.size() > 0 && cacheEnabled) { + while (cacheEnabled && !entries.isEmpty()) { // Keep going in case we throw... RAMQueueEntry ramEntry = null; try { - ramEntry = entries.remove(entries.size() - 1); - if (ramEntry == null) { - LOG.warn("Couldn't get the entry from RAM queue, who steals it?"); + int index = entries.size() - 1; + RAMQueueEntry re = entries.get(index); + BucketEntry bucketEntry = + re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize); + // Successfully written to cache. Can remove it now and do accounting. + ramEntry = entries.remove(index); + if (ramEntry == null || ramEntry != re) { + LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); continue; } - BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine, - bucketAllocator, deserialiserMap, realCacheSize); ramEntries[done] = ramEntry; bucketEntries[done++] = bucketEntry; if (ioErrorStartTime > 0) { ioErrorStartTime = -1; } } catch (BucketAllocatorException fle) { - LOG.warn("Failed allocating for block " - + (ramEntry == null ? "" : ramEntry.getKey()), fle); + LOG.warn("Failed allocating for " + (ramEntry == null ? "" : ramEntry.getKey()), fle); } catch (CacheFullException cfe) { if (!freeInProgress) { - freeSpace(); + freeSpace("Full!"); } else { Thread.sleep(50); } @@ -756,7 +771,7 @@ public class BucketCache implements BlockCache, HeapSize { try { ioEngine.sync(); } catch (IOException ioex) { - LOG.error("Faild syncing IO engine", ioex); + LOG.error("Failed syncing IO engine", ioex); checkIOErrorIsTolerated(); // Since we failed sync, free the blocks in bucket allocator for (int i = 0; i < done; ++i) { @@ -777,13 +792,15 @@ public class BucketCache implements BlockCache, HeapSize { } } - if (bucketAllocator.getUsedSize() > acceptableSize()) { - freeSpace(); + long used = bucketAllocator.getUsedSize(); + if (used > acceptableSize()) { + freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); } + return done; } } - + private void persistToFile() throws IOException { assert !cacheEnabled; @@ -861,11 +878,9 @@ public class BucketCache implements BlockCache, HeapSize { private void checkIOErrorIsTolerated() { long now = EnvironmentEdgeManager.currentTimeMillis(); if (this.ioErrorStartTime > 0) { - if (cacheEnabled - && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) { - LOG.error("IO errors duration time has exceeded " - + ioErrorsTolerationDuration - + "ms, disabing cache, please check your IOEngine"); + if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) { + LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration + + "ms, disabing cache, please check your IOEngine"); disableCache(); } } else { @@ -969,7 +984,7 @@ public class BucketCache implements BlockCache, HeapSize { ++numEvicted; } } - + return numEvicted; } @@ -1044,7 +1059,7 @@ public class BucketCache implements BlockCache, HeapSize { this.priority = BlockPriority.MULTI; } } - + public BlockPriority getPriority() { return this.priority; } @@ -1183,7 +1198,7 @@ public class BucketCache implements BlockCache, HeapSize { bucketAllocator.freeBlock(offset); throw ioe; } - + realCacheSize.addAndGet(len); return bucketEntry; } -- 1.8.5.2 (Apple Git-48)