diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index a5ffaa2..1fc2ecb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -503,12 +503,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { if(bytesToFree <= 0) return; // Instantiate priority buckets - BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, - singleSize()); - BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, - multiSize()); - BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, - memorySize()); + BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize()); + BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize()); + BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize()); // Scan entire map putting into appropriate buckets for(LruCachedBlock cachedBlock : map.values()) { @@ -606,7 +603,6 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { * to configuration parameters and their relatives sizes. */ private class BlockBucket implements Comparable { - private LruCachedBlockQueue queue; private long totalSize = 0; private long bucketSize; @@ -652,10 +648,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { if (that == null || !(that instanceof BlockBucket)){ return false; } - - return compareTo(( BlockBucket)that) == 0; + return compareTo((BlockBucket)that) == 0; } + @Override + public int hashCode() { + // Nothing distingushing about each instance unless I pass in a 'name' or something + return super.hashCode(); + } } /** @@ -714,18 +714,20 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { while (this.go) { synchronized(this) { try { - this.wait(); + this.wait(1000 * 10/*Don't wait for ever*/); } catch(InterruptedException e) {} } LruBlockCache cache = this.cache.get(); - if(cache == null) break; + if (cache == null) break; cache.evict(); } } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", + justification="This is what we want") public void evict() { synchronized(this) { - this.notifyAll(); // FindBugs NN_NAKED_NOTIFY + this.notifyAll(); } } @@ -873,6 +875,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } @Override + public int hashCode() { + return b.hashCode(); + } + + @Override public boolean equals(Object obj) { if (obj instanceof CachedBlock) { CachedBlock cb = (CachedBlock)obj; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java index 9deca1a..831cd66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; import org.codehaus.jackson.annotate.JsonIgnoreProperties; @@ -415,7 +416,9 @@ public final class BucketAllocator { assert blockSize > 0; BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize); if (bsi == null) { - throw new BucketAllocatorException("Allocation too big size=" + blockSize); + throw new BucketAllocatorException("Allocation too big size=" + blockSize + + "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY + + " to accomodate if size seems reasonable and you want it cached."); } long offset = bsi.allocateBlock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index e756d59..2ad1c71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -12,7 +12,7 @@ * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - + * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.util.StringUtils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -77,14 +78,14 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * determine if a given element is in the cache. The bucket cache can use on-heap or * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to * store/read the block data. - * + * *

Eviction is via a similar algorithm as used in * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} - * + * *

BucketCache can be used as mainly a block cache (see * {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS GC and * heap fragmentation. - * + * *

It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store * blocks) to enlarge cache space via * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache} @@ -112,7 +113,8 @@ public class BucketCache implements BlockCache, HeapSize { IOEngine ioEngine; // Store the block in this map before writing it to cache - private Map ramCache; + @VisibleForTesting + Map ramCache; // In this map, store the block's meta data like offset, length private Map backingMap; @@ -123,8 +125,17 @@ public class BucketCache implements BlockCache, HeapSize { */ private volatile boolean cacheEnabled; - private ArrayList> writerQueues = + /** + * A list of writer queues. We have a queue per {@link WriterThread} we have running. + * In other words, the work adding blocks to the BucketCache is divided up amongst the + * running WriterThreads. Its done by taking hash of the cache key modulo queue count. + * WriterThread when it runs takes whatever has been recently added and 'drains' the entries + * to the BucketCache. It then updates the ramCache and backingMap accordingly. + */ + @VisibleForTesting + ArrayList> writerQueues = new ArrayList>(); + @VisibleForTesting WriterThread writerThreads[]; /** Volatile boolean to track if free space is in process or not */ @@ -161,6 +172,7 @@ public class BucketCache implements BlockCache, HeapSize { private final int ioErrorsTolerationDuration; // 1 min public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; + // Start time of first IO error when reading or writing IO Engine, it will be // reset after a successful read/write. private volatile long ioErrorStartTime = -1; @@ -169,7 +181,7 @@ public class BucketCache implements BlockCache, HeapSize { * A "sparse lock" implementation allowing to lock on a particular block * identified by offset. The purpose of this is to avoid freeing the block * which is being read. - * + * * TODO:We could extend the IdLock to IdReadWriteLock for better. */ private IdLock offsetLock = new IdLock(); @@ -200,7 +212,7 @@ public class BucketCache implements BlockCache, HeapSize { this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION); } - + public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) throws FileNotFoundException, IOException { @@ -245,6 +257,7 @@ public class BucketCache implements BlockCache, HeapSize { for (int i = 0; i < writerThreads.length; ++i) { writerThreads[i] = new WriterThread(writerQueues.get(i), i); writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); + writerThreads[i].setDaemon(true); writerThreads[i].start(); } // Run the statistics thread periodically to print the cache statistics log @@ -259,6 +272,11 @@ public class BucketCache implements BlockCache, HeapSize { persistencePath + ", bucketAllocator=" + this.bucketAllocator); } + @VisibleForTesting + boolean isCacheEnabled() { + return this.cacheEnabled; + } + public long getMaxSize() { return this.cacheCapacity; } @@ -483,11 +501,11 @@ public class BucketCache implements BlockCache, HeapSize { "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " + - "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : + "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" + cacheStats.getHitCachingCount() + ", " + - "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : + "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" + cacheStats.getEvictedCount() + ", " + @@ -526,45 +544,46 @@ public class BucketCache implements BlockCache, HeapSize { * Free the space if the used size reaches acceptableSize() or one size block * couldn't be allocated. When freeing the space, we use the LRU algorithm and * ensure there must be some blocks evicted + * @param why Why we are being called */ - private void freeSpace() { + private void freeSpace(final String why) { // Ensure only one freeSpace progress at a time if (!freeSpaceLock.tryLock()) return; try { freeInProgress = true; long bytesToFreeWithoutExtra = 0; - /* - * Calculate free byte for each bucketSizeinfo - */ - StringBuffer msgBuffer = new StringBuffer(); + // Calculate free byte for each bucketSizeinfo + StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null; BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); long[] bytesToFreeForBucket = new long[stats.length]; for (int i = 0; i < stats.length; i++) { bytesToFreeForBucket[i] = 0; - long freeGoal = (long) Math.floor(stats[i].totalCount() - * (1 - DEFAULT_MIN_FACTOR)); + long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR)); freeGoal = Math.max(freeGoal, 1); if (stats[i].freeCount() < freeGoal) { - bytesToFreeForBucket[i] = stats[i].itemSize() - * (freeGoal - stats[i].freeCount()); + bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; - msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" + if (msgBuffer != null) { + msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); + } } } - msgBuffer.append("Free for total=" - + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); + if (msgBuffer != null) { + msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); + } if (bytesToFreeWithoutExtra <= 0) { return; } long currentSize = bucketAllocator.getUsedSize(); long totalSize=bucketAllocator.getTotalSize(); - LOG.debug("Bucket cache free space started; Attempting to " + msgBuffer.toString() - + " of current used=" + StringUtils.byteDesc(currentSize) - + ",actual cacheSize=" + StringUtils.byteDesc(realCacheSize.get()) - + ",total=" + StringUtils.byteDesc(totalSize)); - + if (LOG.isDebugEnabled() && msgBuffer != null) { + LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + + StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize)); + } + long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra * (1 + DEFAULT_EXTRA_FREE_FACTOR)); @@ -622,8 +641,7 @@ public class BucketCache implements BlockCache, HeapSize { stats = bucketAllocator.getIndexStatistics(); boolean needFreeForExtra = false; for (int i = 0; i < stats.length; i++) { - long freeGoal = (long) Math.floor(stats[i].totalCount() - * (1 - DEFAULT_MIN_FACTOR)); + long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR)); freeGoal = Math.max(freeGoal, 1); if (stats[i].freeCount() < freeGoal) { needFreeForExtra = true; @@ -639,8 +657,7 @@ public class BucketCache implements BlockCache, HeapSize { bucketQueue.add(bucketMulti); while ((bucketGroup = bucketQueue.poll()) != null) { - long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) - / remainingBuckets; + long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; bytesFreed += bucketGroup.free(bucketBytesToFree); remainingBuckets--; } @@ -650,12 +667,14 @@ public class BucketCache implements BlockCache, HeapSize { long single = bucketSingle.totalSize(); long multi = bucketMulti.totalSize(); long memory = bucketMemory.totalSize(); - LOG.debug("Bucket cache free space completed; " + "freed=" + if (LOG.isDebugEnabled()) { + LOG.debug("Bucket cache free space completed; " + "freed=" + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); + } } } catch (Throwable t) { @@ -668,19 +687,20 @@ public class BucketCache implements BlockCache, HeapSize { } // This handles flushing the RAM cache to IOEngine. - private class WriterThread extends HasThread { - BlockingQueue inputQueue; - final int threadNO; - boolean writerEnabled = true; + @VisibleForTesting + class WriterThread extends HasThread { + private final BlockingQueue inputQueue; + private final int threadNO; + private volatile boolean writerEnabled = true; WriterThread(BlockingQueue queue, int threadNO) { super(); this.inputQueue = queue; this.threadNO = threadNO; - setDaemon(true); } - + // Used for test + @VisibleForTesting void disableWriter() { this.writerEnabled = false; } @@ -692,8 +712,7 @@ public class BucketCache implements BlockCache, HeapSize { try { try { // Blocks - entries.add(inputQueue.take()); - inputQueue.drainTo(entries); + entries = getRAMQueueEntries(inputQueue, entries); synchronized (cacheWaitSignals[threadNO]) { cacheWaitSignals[threadNO].notifyAll(); } @@ -712,80 +731,120 @@ public class BucketCache implements BlockCache, HeapSize { } /** - * Flush the entries in ramCache to IOEngine and add bucket entry to - * backingMap - * @param entries + * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. + * Process all that are passed in even if failure being sure to remove from ramCache else we'll + * never undo the references and we'll OOME. + * @param entries Presumes list passed in here will be processed by this invocation only. No + * interference expected. * @throws InterruptedException */ - private void doDrain(List entries) - throws InterruptedException { - BucketEntry[] bucketEntries = new BucketEntry[entries.size()]; - RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()]; - int done = 0; - while (entries.size() > 0 && cacheEnabled) { - // Keep going in case we throw... - RAMQueueEntry ramEntry = null; + @VisibleForTesting + void doDrain(final List entries) throws InterruptedException { + if (entries.isEmpty()) return; + // This method is a little hard to follow. We run through the passed in entries and for each + // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must + // do cleanup making sure we've cleared ramCache of all entries regardless of whether we + // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by + // filling ramCache. We do the clean up by again running through the passed in entries + // doing extra work when we find a non-null bucketEntries corresponding entry. + final int size = entries.size(); + BucketEntry[] bucketEntries = new BucketEntry[size]; + // Index updated inside loop if success or if we can't succeed. We retry if cache is full + // when we go to add an entry by going around the loop again without upping the index. + int index = 0; + while (cacheEnabled && index < size) { + RAMQueueEntry re = null; try { - ramEntry = entries.remove(entries.size() - 1); - if (ramEntry == null) { - LOG.warn("Couldn't get the entry from RAM queue, who steals it?"); + re = entries.get(index); + if (re == null) { + LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); + index++; continue; } - BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine, - bucketAllocator, deserialiserMap, realCacheSize); - ramEntries[done] = ramEntry; - bucketEntries[done++] = bucketEntry; + BucketEntry bucketEntry = + re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize); + // Successfully added. Up index and add bucketEntry. Clear io exceptions. + bucketEntries[index] = bucketEntry; if (ioErrorStartTime > 0) { ioErrorStartTime = -1; } + index++; } catch (BucketAllocatorException fle) { - LOG.warn("Failed allocating for block " - + (ramEntry == null ? "" : ramEntry.getKey()), fle); + LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle); + // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. + bucketEntries[index] = null; + index++; } catch (CacheFullException cfe) { + // Cache full when we tried to add. Try freeing space and then retrying (don't up index) if (!freeInProgress) { - freeSpace(); + freeSpace("Full!"); } else { Thread.sleep(50); } } catch (IOException ioex) { + // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. LOG.error("Failed writing to bucket cache", ioex); checkIOErrorIsTolerated(); } } - // Make sure that the data pages we have written are on the media before - // we update the map. + // Make sure data pages are written are on media before we update maps. try { ioEngine.sync(); } catch (IOException ioex) { - LOG.error("Faild syncing IO engine", ioex); + LOG.error("Failed syncing IO engine", ioex); checkIOErrorIsTolerated(); // Since we failed sync, free the blocks in bucket allocator - for (int i = 0; i < done; ++i) { + for (int i = 0; i < entries.size(); ++i) { if (bucketEntries[i] != null) { bucketAllocator.freeBlock(bucketEntries[i].offset()); + bucketEntries[i] = null; } } - done = 0; } - for (int i = 0; i < done; ++i) { + // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if + // success or error. + for (int i = 0; i < size; ++i) { + BlockCacheKey key = entries.get(i).getKey(); + // Only add if non-null entry. if (bucketEntries[i] != null) { - backingMap.put(ramEntries[i].getKey(), bucketEntries[i]); + backingMap.put(key, bucketEntries[i]); } - RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey()); + // Always remove from ramCache even if we failed adding it to the block cache above. + RAMQueueEntry ramCacheEntry = ramCache.remove(key); if (ramCacheEntry != null) { - heapSize.addAndGet(-1 * ramEntries[i].getData().heapSize()); + heapSize.addAndGet(-1 * entries.get(i).getData().heapSize()); } } - if (bucketAllocator.getUsedSize() > acceptableSize()) { - freeSpace(); + long used = bucketAllocator.getUsedSize(); + if (used > acceptableSize()) { + freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); } + return; } } - + /** + * Blocks until elements available in q then tries to grab as many as possible + * before returning. + * @param recepticle Where to stash the elements taken from queue. We clear before we use it + * just in case. + * @param q The queue to take from. + * @return receptical laden with elements taken from the queue or empty if none found. + */ + @VisibleForTesting + static List getRAMQueueEntries(final BlockingQueue q, + final List receptical) + throws InterruptedException { + // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it + // ok even if list grew to accommodate thousands. + receptical.clear(); + receptical.add(q.take()); + q.drainTo(receptical); + return receptical; + } private void persistToFile() throws IOException { assert !cacheEnabled; @@ -863,11 +922,9 @@ public class BucketCache implements BlockCache, HeapSize { private void checkIOErrorIsTolerated() { long now = EnvironmentEdgeManager.currentTimeMillis(); if (this.ioErrorStartTime > 0) { - if (cacheEnabled - && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) { - LOG.error("IO errors duration time has exceeded " - + ioErrorsTolerationDuration - + "ms, disabing cache, please check your IOEngine"); + if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) { + LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration + + "ms, disabing cache, please check your IOEngine"); disableCache(); } } else { @@ -971,7 +1028,7 @@ public class BucketCache implements BlockCache, HeapSize { ++numEvicted; } } - + return numEvicted; } @@ -1046,7 +1103,7 @@ public class BucketCache implements BlockCache, HeapSize { this.priority = BlockPriority.MULTI; } } - + public BlockPriority getPriority() { return this.priority; } @@ -1128,7 +1185,8 @@ public class BucketCache implements BlockCache, HeapSize { /** * Block Entry stored in the memory with key,data and so on */ - private static class RAMQueueEntry { + @VisibleForTesting + static class RAMQueueEntry { private BlockCacheKey key; private Cacheable data; private long accessTime; @@ -1163,8 +1221,7 @@ public class BucketCache implements BlockCache, HeapSize { // This cacheable thing can't be serialized... if (len == 0) return null; long offset = bucketAllocator.allocateBlock(len); - BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, - inMemory); + BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory); bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); try { if (data instanceof HFileBlock) { @@ -1185,7 +1242,7 @@ public class BucketCache implements BlockCache, HeapSize { bucketAllocator.freeBlock(offset); throw ioe; } - + realCacheSize.addAndGet(len); return bucketEntry; } @@ -1270,6 +1327,11 @@ public class BucketCache implements BlockCache, HeapSize { } @Override + public int hashCode() { + return e.getKey().hashCode(); + } + + @Override public boolean equals(Object obj) { if (obj instanceof CachedBlock) { CachedBlock cb = (CachedBlock)obj; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java new file mode 100644 index 0000000..d883661 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -0,0 +1,170 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category(SmallTests.class) +public class TestBucketWriterThread { + private BucketCache bc; + private BucketCache.WriterThread wt; + private BlockingQueue q; + private Cacheable plainCacheable; + private BlockCacheKey plainKey; + + /** + * Set up variables and get BucketCache and WriterThread into state where tests can manually + * control the running of WriterThread and BucketCache is empty. + * @throws Exception + */ + @Before + public void setUp() throws Exception { + // Arbitrary capacity. + final int capacity = 16; + // Run with one writer thread only. Means there will be one writer queue only too. We depend + // on this in below. + final int writerThreadsCount = 1; + this.bc = new BucketCache("heap", capacity, 1, new int [] {1}, writerThreadsCount, + capacity, null, 100/*Tolerate ioerrors for 100ms*/); + assertEquals(writerThreadsCount, bc.writerThreads.length); + assertEquals(writerThreadsCount, bc.writerQueues.size()); + // Get reference to our single WriterThread instance. + this.wt = bc.writerThreads[0]; + this.q = bc.writerQueues.get(0); + // On construction bucketcache WriterThread is blocked on the writer queue so it will not + // notice the disabling of the writer until after it has processed an entry. Lets pass one + // through after setting disable flag on the writer. We want to disable the WriterThread so + // we can run the doDrain manually so we can watch it working and assert it doing right thing. + wt.disableWriter(); + this.plainKey = new BlockCacheKey("f", 0); + this.plainCacheable = Mockito.mock(Cacheable.class); + bc.cacheBlock(this.plainKey, plainCacheable); + while(!bc.ramCache.isEmpty()) Threads.sleep(1); + assertTrue(q.isEmpty()); + // Now writer thread should be disabled. + } + + @After + public void tearDown() throws Exception { + if (this.bc != null) this.bc.shutdown(); + } + + /** + * Test non-error case just works. + * @throws FileNotFoundException + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=30000) + public void testNonErrorCase() + throws FileNotFoundException, IOException, InterruptedException { + bc.cacheBlock(this.plainKey, this.plainCacheable); + doDrainOfOneEntry(this.bc, this.wt, this.q); + } + + /** + * Pass through a too big entry and ensure it is cleared from queues and ramCache. + * Manually run the WriterThread. + * @throws InterruptedException + */ + @Test + public void testTooBigEntry() throws InterruptedException { + Cacheable tooBigCacheable = Mockito.mock(Cacheable.class); + Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE); + this.bc.cacheBlock(this.plainKey, tooBigCacheable); + doDrainOfOneEntry(this.bc, this.wt, this.q); + } + + /** + * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then + * put it back and process it. + * @throws IOException + * @throws BucketAllocatorException + * @throws CacheFullException + * @throws InterruptedException + */ + @SuppressWarnings("unchecked") + @Test (timeout=30000) + public void testIOE() + throws CacheFullException, BucketAllocatorException, IOException, InterruptedException { + this.bc.cacheBlock(this.plainKey, plainCacheable); + RAMQueueEntry rqe = q.remove(); + RAMQueueEntry spiedRqe = Mockito.spy(rqe); + Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe). + writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), + (UniqueIndexMap)Mockito.any(), (AtomicLong)Mockito.any()); + this.q.add(spiedRqe); + doDrainOfOneEntry(bc, wt, q); + // Cache disabled when ioes w/o ever healing. + assertTrue(!bc.isCacheEnabled()); + } + + /** + * Do Cache full exception + * @throws IOException + * @throws BucketAllocatorException + * @throws CacheFullException + * @throws InterruptedException + */ + @Test (timeout=30000) + public void testCacheFullException() + throws CacheFullException, BucketAllocatorException, IOException, InterruptedException { + this.bc.cacheBlock(this.plainKey, plainCacheable); + RAMQueueEntry rqe = q.remove(); + RAMQueueEntry spiedRqe = Mockito.spy(rqe); + final CacheFullException cfe = new CacheFullException(0, 0); + BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); + Mockito.doThrow(cfe). + doReturn(mockedBucketEntry). + when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), + (UniqueIndexMap)Mockito.any(), (AtomicLong)Mockito.any()); + this.q.add(spiedRqe); + doDrainOfOneEntry(bc, wt, q); + } + + private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt, + final BlockingQueue q) + throws InterruptedException { + List rqes = BucketCache.getRAMQueueEntries(q, new ArrayList(1)); + wt.doDrain(rqes); + assertTrue(q.isEmpty()); + assertTrue(bc.ramCache.isEmpty()); + assertEquals(0, bc.heapSize()); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 138103e..0b11cd1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -158,7 +158,7 @@ public class TestIPC { TestRpcServer(RpcScheduler scheduler) throws IOException { super(null, "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("0.0.0.0", 0), CONF, scheduler); + new InetSocketAddress("localhost", 0), CONF, scheduler); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index 335aaa6..fe4321f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -53,7 +53,7 @@ import com.google.protobuf.ServiceException; */ @Category(MediumTests.class) public class TestProtoBufRpc { - public final static String ADDRESS = "0.0.0.0"; + public final static String ADDRESS = "localhost"; public static int PORT = 0; private InetSocketAddress isa; private Configuration conf; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java index 94a991d..624ef1a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.procedure; +import static org.junit.Assert.assertArrayEquals; + import java.io.IOException; import java.util.HashMap; @@ -26,9 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.junit.AfterClass; -import static org.junit.Assert.assertArrayEquals; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category;