diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 8b75751..bc3f8d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.codehaus.jackson.annotate.JsonIgnoreProperties; - + import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -491,12 +491,9 @@ public class LruBlockCache implements BlockCache, HeapSize { if(bytesToFree <= 0) return; // Instantiate priority buckets - BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, - singleSize()); - BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, - multiSize()); - BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, - memorySize()); + BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize()); + BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize()); + BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize()); // Scan entire map putting into appropriate buckets for(LruCachedBlock cachedBlock : map.values()) { @@ -594,7 +591,6 @@ public class LruBlockCache implements BlockCache, HeapSize { * to configuration parameters and their relatives sizes. */ private class BlockBucket implements Comparable { - private LruCachedBlockQueue queue; private long totalSize = 0; private long bucketSize; @@ -640,10 +636,14 @@ public class LruBlockCache implements BlockCache, HeapSize { if (that == null || !(that instanceof BlockBucket)){ return false; } - - return compareTo(( BlockBucket)that) == 0; + return compareTo((BlockBucket)that) == 0; } + @Override + public int hashCode() { + // Nothing distingushing about each instance unless I pass in a 'name' or something + return super.hashCode(); + } } /** @@ -702,18 +702,20 @@ public class LruBlockCache implements BlockCache, HeapSize { while (this.go) { synchronized(this) { try { - this.wait(); + this.wait(1000 * 10/*Don't wait for ever*/); } catch(InterruptedException e) {} } LruBlockCache cache = this.cache.get(); - if(cache == null) break; + if (cache == null) break; cache.evict(); } } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", + justification="This is what we want") public void evict() { synchronized(this) { - this.notifyAll(); // FindBugs NN_NAKED_NOTIFY + this.notifyAll(); } } @@ -850,7 +852,30 @@ public class LruBlockCache implements BlockCache, HeapSize { @Override public int compareTo(CachedBlock other) { - return (int)(other.getOffset() - this.getOffset()); + int diff = this.getFilename().compareTo(other.getFilename()); + if (diff != 0) return diff; + diff = (int)(this.getOffset() - other.getOffset()); + if (diff != 0) return diff; + if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { + throw new IllegalStateException("" + this.getCachedTime() + ", " + + other.getCachedTime()); + } + return (int)(other.getCachedTime() - this.getCachedTime()); + } + + @Override + public int hashCode() { + return b.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof CachedBlock) { + CachedBlock cb = (CachedBlock)obj; + return compareTo(cb) == 0; + } else { + return false; + } } }; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java index 255813b..1cfb408 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; import org.codehaus.jackson.annotate.JsonIgnoreProperties; @@ -414,7 +415,9 @@ public final class BucketAllocator { assert blockSize > 0; BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize); if (bsi == null) { - throw new BucketAllocatorException("Allocation too big size=" + blockSize); + throw new BucketAllocatorException("Allocation too big size=" + blockSize + + "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY + + " to accomodate if size seems reasonable and you want it cached."); } long offset = bsi.allocateBlock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 7a462c8..acf43f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -12,7 +12,7 @@ * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - + * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and @@ -69,25 +69,26 @@ import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.util.StringUtils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * BucketCache uses {@link BucketAllocator} to allocate/free block, and use * {@link BucketCache#ramCache} and {@link BucketCache#backingMap} in order to - * determine whether a given element hit. It could uses memory - * {@link ByteBufferIOEngine} or file {@link FileIOEngine}to store/read the - * block data. - * - * Eviction is using similar algorithm as + * determine if a given element is in the cache. The bucket cache can use on-heap or + * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to + * store/read the block data. + * + *

Eviction is via a similar algorithm as used in * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} - * - * BucketCache could be used as mainly a block cache(see - * {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS and - * fragment by GC. - * - * Also could be used as a secondary cache(e.g. using Fusionio to store block) - * to enlarge cache space by + * + *

BucketCache can be used as mainly a block cache (see + * {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS GC and + * heap fragmentation. + * + *

It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store + * blocks) to enlarge cache space via * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache} */ @InterfaceAudience.Private @@ -113,7 +114,8 @@ public class BucketCache implements BlockCache, HeapSize { IOEngine ioEngine; // Store the block in this map before writing it to cache - private ConcurrentHashMap ramCache; + @VisibleForTesting + Map ramCache; // In this map, store the block's meta data like offset, length private ConcurrentHashMap backingMap; @@ -124,8 +126,17 @@ public class BucketCache implements BlockCache, HeapSize { */ private volatile boolean cacheEnabled; - private ArrayList> writerQueues = + /** + * A list of writer queues. We have a queue per {@link WriterThread} we have running. + * In other words, the work adding blocks to the BucketCache is divided up amongst the + * running WriterThreads. Its done by taking hash of the cache key modulo queue count. + * WriterThread when it runs takes whatever has been recently added and 'drains' the entries + * to the BucketCache. It then updates the ramCache and backingMap accordingly. + */ + @VisibleForTesting + ArrayList> writerQueues = new ArrayList>(); + @VisibleForTesting WriterThread writerThreads[]; @@ -164,6 +175,7 @@ public class BucketCache implements BlockCache, HeapSize { private final int ioErrorsTolerationDuration; // 1 min public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; + // Start time of first IO error when reading or writing IO Engine, it will be // reset after a successful read/write. private volatile long ioErrorStartTime = -1; @@ -172,7 +184,7 @@ public class BucketCache implements BlockCache, HeapSize { * A "sparse lock" implementation allowing to lock on a particular block * identified by offset. The purpose of this is to avoid freeing the block * which is being read. - * + * * TODO:We could extend the IdLock to IdReadWriteLock for better. */ private IdLock offsetLock = new IdLock(); @@ -207,7 +219,7 @@ public class BucketCache implements BlockCache, HeapSize { this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION); } - + public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) throws FileNotFoundException, IOException { @@ -252,6 +264,7 @@ public class BucketCache implements BlockCache, HeapSize { for (int i = 0; i < writerThreads.length; ++i) { writerThreads[i] = new WriterThread(writerQueues.get(i), i); writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); + writerThreads[i].setDaemon(true); writerThreads[i].start(); } // Run the statistics thread periodically to print the cache statistics log @@ -264,6 +277,15 @@ public class BucketCache implements BlockCache, HeapSize { persistencePath + ", bucketAllocator=" + this.bucketAllocator); } + @VisibleForTesting + boolean isCacheEnabled() { + return this.cacheEnabled; + } + + public long getMaxSize() { + return this.cacheCapacity; + } + public String getIoEngine() { return ioEngine.toString(); } @@ -449,7 +471,7 @@ public class BucketCache implements BlockCache, HeapSize { cacheStats.evicted(); return true; } - + /* * Statistics thread. Periodically prints the cache statistics to the log. */ @@ -466,7 +488,7 @@ public class BucketCache implements BlockCache, HeapSize { bucketCache.logStats(); } } - + public void logStats() { if (!LOG.isDebugEnabled()) return; // Log size @@ -484,11 +506,11 @@ public class BucketCache implements BlockCache, HeapSize { "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " + - "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : + "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" + cacheStats.getHitCachingCount() + ", " + - "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : + "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" + cacheStats.getEvictedCount() + ", " + @@ -523,45 +545,46 @@ public class BucketCache implements BlockCache, HeapSize { * Free the space if the used size reaches acceptableSize() or one size block * couldn't be allocated. When freeing the space, we use the LRU algorithm and * ensure there must be some blocks evicted + * @param why Why we are being called */ - private void freeSpace() { + private void freeSpace(final String why) { // Ensure only one freeSpace progress at a time if (!freeSpaceLock.tryLock()) return; try { freeInProgress = true; long bytesToFreeWithoutExtra = 0; - /* - * Calculate free byte for each bucketSizeinfo - */ - StringBuffer msgBuffer = new StringBuffer(); + // Calculate free byte for each bucketSizeinfo + StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null; BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); long[] bytesToFreeForBucket = new long[stats.length]; for (int i = 0; i < stats.length; i++) { bytesToFreeForBucket[i] = 0; - long freeGoal = (long) Math.floor(stats[i].totalCount() - * (1 - DEFAULT_MIN_FACTOR)); + long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR)); freeGoal = Math.max(freeGoal, 1); if (stats[i].freeCount() < freeGoal) { - bytesToFreeForBucket[i] = stats[i].itemSize() - * (freeGoal - stats[i].freeCount()); + bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; - msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" + if (msgBuffer != null) { + msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); + } } } - msgBuffer.append("Free for total=" - + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); + if (msgBuffer != null) { + msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); + } if (bytesToFreeWithoutExtra <= 0) { return; } long currentSize = bucketAllocator.getUsedSize(); long totalSize=bucketAllocator.getTotalSize(); - LOG.debug("Bucket cache free space started; Attempting to " + msgBuffer.toString() - + " of current used=" + StringUtils.byteDesc(currentSize) - + ",actual cacheSize=" + StringUtils.byteDesc(realCacheSize.get()) - + ",total=" + StringUtils.byteDesc(totalSize)); - + if (LOG.isDebugEnabled() && msgBuffer != null) { + LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + + StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize)); + } + long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra * (1 + DEFAULT_EXTRA_FREE_FACTOR)); @@ -619,8 +642,7 @@ public class BucketCache implements BlockCache, HeapSize { stats = bucketAllocator.getIndexStatistics(); boolean needFreeForExtra = false; for (int i = 0; i < stats.length; i++) { - long freeGoal = (long) Math.floor(stats[i].totalCount() - * (1 - DEFAULT_MIN_FACTOR)); + long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR)); freeGoal = Math.max(freeGoal, 1); if (stats[i].freeCount() < freeGoal) { needFreeForExtra = true; @@ -636,8 +658,7 @@ public class BucketCache implements BlockCache, HeapSize { bucketQueue.add(bucketMulti); while ((bucketGroup = bucketQueue.poll()) != null) { - long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) - / remainingBuckets; + long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; bytesFreed += bucketGroup.free(bucketBytesToFree); remainingBuckets--; } @@ -647,12 +668,14 @@ public class BucketCache implements BlockCache, HeapSize { long single = bucketSingle.totalSize(); long multi = bucketMulti.totalSize(); long memory = bucketMemory.totalSize(); - LOG.debug("Bucket cache free space completed; " + "freed=" + if (LOG.isDebugEnabled()) { + LOG.debug("Bucket cache free space completed; " + "freed=" + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); + } } } catch (Throwable t) { @@ -665,19 +688,20 @@ public class BucketCache implements BlockCache, HeapSize { } // This handles flushing the RAM cache to IOEngine. - private class WriterThread extends HasThread { - BlockingQueue inputQueue; - final int threadNO; - boolean writerEnabled = true; + @VisibleForTesting + class WriterThread extends HasThread { + private final BlockingQueue inputQueue; + private final int threadNO; + private volatile boolean writerEnabled = true; WriterThread(BlockingQueue queue, int threadNO) { super(); this.inputQueue = queue; this.threadNO = threadNO; - setDaemon(true); } - + // Used for test + @VisibleForTesting void disableWriter() { this.writerEnabled = false; } @@ -689,8 +713,7 @@ public class BucketCache implements BlockCache, HeapSize { try { try { // Blocks - entries.add(inputQueue.take()); - inputQueue.drainTo(entries); + entries = getRAMQueueEntries(inputQueue, entries); synchronized (cacheWaitSignals[threadNO]) { cacheWaitSignals[threadNO].notifyAll(); } @@ -709,80 +732,120 @@ public class BucketCache implements BlockCache, HeapSize { } /** - * Flush the entries in ramCache to IOEngine and add bucket entry to - * backingMap - * @param entries + * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. + * Process all that are passed in even if failure being sure to remove from ramCache else we'll + * never undo the references and we'll OOME. + * @param entries Presumes list passed in here will be processed by this invocation only. No + * interference expected. * @throws InterruptedException */ - private void doDrain(List entries) - throws InterruptedException { - BucketEntry[] bucketEntries = new BucketEntry[entries.size()]; - RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()]; - int done = 0; - while (entries.size() > 0 && cacheEnabled) { - // Keep going in case we throw... - RAMQueueEntry ramEntry = null; + @VisibleForTesting + void doDrain(final List entries) throws InterruptedException { + if (entries.isEmpty()) return; + // This method is a little hard to follow. We run through the passed in entries and for each + // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must + // do cleanup making sure we've cleared ramCache of all entries regardless of whether we + // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by + // filling ramCache. We do the clean up by again running through the passed in entries + // doing extra work when we find a non-null bucketEntries corresponding entry. + final int size = entries.size(); + BucketEntry[] bucketEntries = new BucketEntry[size]; + // Index updated inside loop if success or if we can't succeed. We retry if cache is full + // when we go to add an entry by going around the loop again without upping the index. + int index = 0; + while (cacheEnabled && index < size) { + RAMQueueEntry re = null; try { - ramEntry = entries.remove(entries.size() - 1); - if (ramEntry == null) { - LOG.warn("Couldn't get the entry from RAM queue, who steals it?"); + re = entries.get(index); + if (re == null) { + LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); + index++; continue; } - BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine, - bucketAllocator, deserialiserMap, realCacheSize); - ramEntries[done] = ramEntry; - bucketEntries[done++] = bucketEntry; + BucketEntry bucketEntry = + re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize); + // Successfully added. Up index and add bucketEntry. Clear io exceptions. + bucketEntries[index] = bucketEntry; if (ioErrorStartTime > 0) { ioErrorStartTime = -1; } + index++; } catch (BucketAllocatorException fle) { - LOG.warn("Failed allocating for block " - + (ramEntry == null ? "" : ramEntry.getKey()), fle); + LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle); + // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. + bucketEntries[index] = null; + index++; } catch (CacheFullException cfe) { + // Cache full when we tried to add. Try freeing space and then retrying (don't up index) if (!freeInProgress) { - freeSpace(); + freeSpace("Full!"); } else { Thread.sleep(50); } } catch (IOException ioex) { + // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. LOG.error("Failed writing to bucket cache", ioex); checkIOErrorIsTolerated(); } } - // Make sure that the data pages we have written are on the media before - // we update the map. + // Make sure data pages are written are on media before we update maps. try { ioEngine.sync(); } catch (IOException ioex) { - LOG.error("Faild syncing IO engine", ioex); + LOG.error("Failed syncing IO engine", ioex); checkIOErrorIsTolerated(); // Since we failed sync, free the blocks in bucket allocator - for (int i = 0; i < done; ++i) { + for (int i = 0; i < entries.size(); ++i) { if (bucketEntries[i] != null) { bucketAllocator.freeBlock(bucketEntries[i].offset()); + bucketEntries[i] = null; } } - done = 0; } - for (int i = 0; i < done; ++i) { + // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if + // success or error. + for (int i = 0; i < size; ++i) { + BlockCacheKey key = entries.get(i).getKey(); + // Only add if non-null entry. if (bucketEntries[i] != null) { - backingMap.put(ramEntries[i].getKey(), bucketEntries[i]); + backingMap.put(key, bucketEntries[i]); } - RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey()); + // Always remove from ramCache even if we failed adding it to the block cache above. + RAMQueueEntry ramCacheEntry = ramCache.remove(key); if (ramCacheEntry != null) { - heapSize.addAndGet(-1 * ramEntries[i].getData().heapSize()); + heapSize.addAndGet(-1 * entries.get(i).getData().heapSize()); } } - if (bucketAllocator.getUsedSize() > acceptableSize()) { - freeSpace(); + long used = bucketAllocator.getUsedSize(); + if (used > acceptableSize()) { + freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); } + return; } } - + /** + * Blocks until elements available in q then tries to grab as many as possible + * before returning. + * @param recepticle Where to stash the elements taken from queue. We clear before we use it + * just in case. + * @param q The queue to take from. + * @return receptical laden with elements taken from the queue or empty if none found. + */ + @VisibleForTesting + static List getRAMQueueEntries(final BlockingQueue q, + final List receptical) + throws InterruptedException { + // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it + // ok even if list grew to accommodate thousands. + receptical.clear(); + receptical.add(q.take()); + q.drainTo(receptical); + return receptical; + } private void persistToFile() throws IOException { assert !cacheEnabled; @@ -860,11 +923,9 @@ public class BucketCache implements BlockCache, HeapSize { private void checkIOErrorIsTolerated() { long now = EnvironmentEdgeManager.currentTimeMillis(); if (this.ioErrorStartTime > 0) { - if (cacheEnabled - && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) { - LOG.error("IO errors duration time has exceeded " - + ioErrorsTolerationDuration - + "ms, disabing cache, please check your IOEngine"); + if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) { + LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration + + "ms, disabing cache, please check your IOEngine"); disableCache(); } } else { @@ -968,7 +1029,7 @@ public class BucketCache implements BlockCache, HeapSize { ++numEvicted; } } - + return numEvicted; } @@ -1043,7 +1104,7 @@ public class BucketCache implements BlockCache, HeapSize { this.priority = BlockPriority.MULTI; } } - + public BlockPriority getPriority() { return this.priority; } @@ -1125,7 +1186,8 @@ public class BucketCache implements BlockCache, HeapSize { /** * Block Entry stored in the memory with key,data and so on */ - private static class RAMQueueEntry { + @VisibleForTesting + static class RAMQueueEntry { private BlockCacheKey key; private Cacheable data; private long accessTime; @@ -1160,8 +1222,7 @@ public class BucketCache implements BlockCache, HeapSize { // This cacheable thing can't be serialized... if (len == 0) return null; long offset = bucketAllocator.allocateBlock(len); - BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, - inMemory); + BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory); bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); try { if (data instanceof HFileBlock) { @@ -1182,7 +1243,7 @@ public class BucketCache implements BlockCache, HeapSize { bucketAllocator.freeBlock(offset); throw ioe; } - + realCacheSize.addAndGet(len); return bucketEntry; } @@ -1255,7 +1316,30 @@ public class BucketCache implements BlockCache, HeapSize { @Override public int compareTo(CachedBlock other) { - return (int)(this.getOffset() - other.getOffset()); + int diff = this.getFilename().compareTo(other.getFilename()); + if (diff != 0) return diff; + diff = (int)(this.getOffset() - other.getOffset()); + if (diff != 0) return diff; + if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { + throw new IllegalStateException("" + this.getCachedTime() + ", " + + other.getCachedTime()); + } + return (int)(other.getCachedTime() - this.getCachedTime()); + } + + @Override + public int hashCode() { + return e.getKey().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof CachedBlock) { + CachedBlock cb = (CachedBlock)obj; + return compareTo(cb) == 0; + } else { + return false; + } } }; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java new file mode 100644 index 0000000..d883661 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -0,0 +1,170 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category(SmallTests.class) +public class TestBucketWriterThread { + private BucketCache bc; + private BucketCache.WriterThread wt; + private BlockingQueue q; + private Cacheable plainCacheable; + private BlockCacheKey plainKey; + + /** + * Set up variables and get BucketCache and WriterThread into state where tests can manually + * control the running of WriterThread and BucketCache is empty. + * @throws Exception + */ + @Before + public void setUp() throws Exception { + // Arbitrary capacity. + final int capacity = 16; + // Run with one writer thread only. Means there will be one writer queue only too. We depend + // on this in below. + final int writerThreadsCount = 1; + this.bc = new BucketCache("heap", capacity, 1, new int [] {1}, writerThreadsCount, + capacity, null, 100/*Tolerate ioerrors for 100ms*/); + assertEquals(writerThreadsCount, bc.writerThreads.length); + assertEquals(writerThreadsCount, bc.writerQueues.size()); + // Get reference to our single WriterThread instance. + this.wt = bc.writerThreads[0]; + this.q = bc.writerQueues.get(0); + // On construction bucketcache WriterThread is blocked on the writer queue so it will not + // notice the disabling of the writer until after it has processed an entry. Lets pass one + // through after setting disable flag on the writer. We want to disable the WriterThread so + // we can run the doDrain manually so we can watch it working and assert it doing right thing. + wt.disableWriter(); + this.plainKey = new BlockCacheKey("f", 0); + this.plainCacheable = Mockito.mock(Cacheable.class); + bc.cacheBlock(this.plainKey, plainCacheable); + while(!bc.ramCache.isEmpty()) Threads.sleep(1); + assertTrue(q.isEmpty()); + // Now writer thread should be disabled. + } + + @After + public void tearDown() throws Exception { + if (this.bc != null) this.bc.shutdown(); + } + + /** + * Test non-error case just works. + * @throws FileNotFoundException + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=30000) + public void testNonErrorCase() + throws FileNotFoundException, IOException, InterruptedException { + bc.cacheBlock(this.plainKey, this.plainCacheable); + doDrainOfOneEntry(this.bc, this.wt, this.q); + } + + /** + * Pass through a too big entry and ensure it is cleared from queues and ramCache. + * Manually run the WriterThread. + * @throws InterruptedException + */ + @Test + public void testTooBigEntry() throws InterruptedException { + Cacheable tooBigCacheable = Mockito.mock(Cacheable.class); + Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE); + this.bc.cacheBlock(this.plainKey, tooBigCacheable); + doDrainOfOneEntry(this.bc, this.wt, this.q); + } + + /** + * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then + * put it back and process it. + * @throws IOException + * @throws BucketAllocatorException + * @throws CacheFullException + * @throws InterruptedException + */ + @SuppressWarnings("unchecked") + @Test (timeout=30000) + public void testIOE() + throws CacheFullException, BucketAllocatorException, IOException, InterruptedException { + this.bc.cacheBlock(this.plainKey, plainCacheable); + RAMQueueEntry rqe = q.remove(); + RAMQueueEntry spiedRqe = Mockito.spy(rqe); + Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe). + writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), + (UniqueIndexMap)Mockito.any(), (AtomicLong)Mockito.any()); + this.q.add(spiedRqe); + doDrainOfOneEntry(bc, wt, q); + // Cache disabled when ioes w/o ever healing. + assertTrue(!bc.isCacheEnabled()); + } + + /** + * Do Cache full exception + * @throws IOException + * @throws BucketAllocatorException + * @throws CacheFullException + * @throws InterruptedException + */ + @Test (timeout=30000) + public void testCacheFullException() + throws CacheFullException, BucketAllocatorException, IOException, InterruptedException { + this.bc.cacheBlock(this.plainKey, plainCacheable); + RAMQueueEntry rqe = q.remove(); + RAMQueueEntry spiedRqe = Mockito.spy(rqe); + final CacheFullException cfe = new CacheFullException(0, 0); + BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); + Mockito.doThrow(cfe). + doReturn(mockedBucketEntry). + when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), + (UniqueIndexMap)Mockito.any(), (AtomicLong)Mockito.any()); + this.q.add(spiedRqe); + doDrainOfOneEntry(bc, wt, q); + } + + private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt, + final BlockingQueue q) + throws InterruptedException { + List rqes = BucketCache.getRAMQueueEntries(q, new ArrayList(1)); + wt.doDrain(rqes); + assertTrue(q.isEmpty()); + assertTrue(bc.ramCache.isEmpty()); + assertEquals(0, bc.heapSize()); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 128a91a..57f71bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -158,7 +158,7 @@ public class TestIPC { TestRpcServer(RpcScheduler scheduler) throws IOException { super(null, "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("0.0.0.0", 0), CONF, scheduler); + new InetSocketAddress("localhost", 0), CONF, scheduler); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index 335aaa6..fe4321f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -53,7 +53,7 @@ import com.google.protobuf.ServiceException; */ @Category(MediumTests.class) public class TestProtoBufRpc { - public final static String ADDRESS = "0.0.0.0"; + public final static String ADDRESS = "localhost"; public static int PORT = 0; private InetSocketAddress isa; private Configuration conf; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java index 8d9ec24..12a1011 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.procedure; +import static org.junit.Assert.assertArrayEquals; + import java.io.IOException; import java.util.HashMap;