diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
index 36fad56..cf075ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
@@ -415,7 +416,9 @@ public final class BucketAllocator {
assert blockSize > 0;
BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
if (bsi == null) {
- throw new BucketAllocatorException("Allocation too big size=" + blockSize);
+ throw new BucketAllocatorException("Allocation too big size=" + blockSize +
+ "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
+ " to accomodate if size seems reasonable and you want it cached.");
}
long offset = bsi.allocateBlock();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 9a8cf5a..7ad35bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -12,7 +12,7 @@
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
-
+
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -77,14 +78,14 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* determine if a given element is in the cache. The bucket cache can use on-heap or
* off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to
* store/read the block data.
- *
+ *
*
Eviction is via a similar algorithm as used in
* {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
- *
+ *
*
BucketCache can be used as mainly a block cache (see
* {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS GC and
* heap fragmentation.
- *
+ *
*
It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
* blocks) to enlarge cache space via
* {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
@@ -112,7 +113,8 @@ public class BucketCache implements BlockCache, HeapSize {
IOEngine ioEngine;
// Store the block in this map before writing it to cache
- private Map ramCache;
+ @VisibleForTesting
+ Map ramCache;
// In this map, store the block's meta data like offset, length
private Map backingMap;
@@ -123,8 +125,17 @@ public class BucketCache implements BlockCache, HeapSize {
*/
private volatile boolean cacheEnabled;
- private ArrayList> writerQueues =
+ /**
+ * A list of writer queues. We have a queue per {@link WriterThread} we have running.
+ * In other words, the work adding blocks to the BucketCache is divided up amongst the
+ * running WriterThreads. Its done by taking hash of the cache key modulo queue count.
+ * WriterThread when it runs takes whatever has been recently added and 'drains' the entries
+ * to the BucketCache. It then updates the ramCache and backingMap accordingly.
+ */
+ @VisibleForTesting
+ ArrayList> writerQueues =
new ArrayList>();
+ @VisibleForTesting
WriterThread writerThreads[];
/** Volatile boolean to track if free space is in process or not */
@@ -161,6 +172,7 @@ public class BucketCache implements BlockCache, HeapSize {
private final int ioErrorsTolerationDuration;
// 1 min
public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
+
// Start time of first IO error when reading or writing IO Engine, it will be
// reset after a successful read/write.
private volatile long ioErrorStartTime = -1;
@@ -169,7 +181,7 @@ public class BucketCache implements BlockCache, HeapSize {
* A "sparse lock" implementation allowing to lock on a particular block
* identified by offset. The purpose of this is to avoid freeing the block
* which is being read.
- *
+ *
* TODO:We could extend the IdLock to IdReadWriteLock for better.
*/
private IdLock offsetLock = new IdLock();
@@ -200,7 +212,7 @@ public class BucketCache implements BlockCache, HeapSize {
this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
persistencePath, DEFAULT_ERROR_TOLERATION_DURATION);
}
-
+
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
throws FileNotFoundException, IOException {
@@ -245,6 +257,7 @@ public class BucketCache implements BlockCache, HeapSize {
for (int i = 0; i < writerThreads.length; ++i) {
writerThreads[i] = new WriterThread(writerQueues.get(i), i);
writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
+ writerThreads[i].setDaemon(true);
writerThreads[i].start();
}
// Run the statistics thread periodically to print the cache statistics log
@@ -259,6 +272,11 @@ public class BucketCache implements BlockCache, HeapSize {
persistencePath + ", bucketAllocator=" + this.bucketAllocator);
}
+ @VisibleForTesting
+ boolean isCacheEnabled() {
+ return this.cacheEnabled;
+ }
+
public long getMaxSize() {
return this.cacheCapacity;
}
@@ -483,11 +501,11 @@ public class BucketCache implements BlockCache, HeapSize {
"hits=" + cacheStats.getHitCount() + ", " +
"IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
"IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
- "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," :
+ "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," :
(StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
"cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
"cachingHits=" + cacheStats.getHitCachingCount() + ", " +
- "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," :
+ "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," :
(StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
"evictions=" + cacheStats.getEvictionCount() + ", " +
"evicted=" + cacheStats.getEvictedCount() + ", " +
@@ -526,45 +544,46 @@ public class BucketCache implements BlockCache, HeapSize {
* Free the space if the used size reaches acceptableSize() or one size block
* couldn't be allocated. When freeing the space, we use the LRU algorithm and
* ensure there must be some blocks evicted
+ * @param why Why we are being called
*/
- private void freeSpace() {
+ private void freeSpace(final String why) {
// Ensure only one freeSpace progress at a time
if (!freeSpaceLock.tryLock()) return;
try {
freeInProgress = true;
long bytesToFreeWithoutExtra = 0;
- /*
- * Calculate free byte for each bucketSizeinfo
- */
- StringBuffer msgBuffer = new StringBuffer();
+ // Calculate free byte for each bucketSizeinfo
+ StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null;
BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
long[] bytesToFreeForBucket = new long[stats.length];
for (int i = 0; i < stats.length; i++) {
bytesToFreeForBucket[i] = 0;
- long freeGoal = (long) Math.floor(stats[i].totalCount()
- * (1 - DEFAULT_MIN_FACTOR));
+ long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
freeGoal = Math.max(freeGoal, 1);
if (stats[i].freeCount() < freeGoal) {
- bytesToFreeForBucket[i] = stats[i].itemSize()
- * (freeGoal - stats[i].freeCount());
+ bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
- msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
+ if (msgBuffer != null) {
+ msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
+ StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
+ }
}
}
- msgBuffer.append("Free for total="
- + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
+ if (msgBuffer != null) {
+ msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
+ }
if (bytesToFreeWithoutExtra <= 0) {
return;
}
long currentSize = bucketAllocator.getUsedSize();
long totalSize=bucketAllocator.getTotalSize();
- LOG.debug("Bucket cache free space started; Attempting to " + msgBuffer.toString()
- + " of current used=" + StringUtils.byteDesc(currentSize)
- + ",actual cacheSize=" + StringUtils.byteDesc(realCacheSize.get())
- + ",total=" + StringUtils.byteDesc(totalSize));
-
+ if (LOG.isDebugEnabled() && msgBuffer != null) {
+ LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
+ " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
+ StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize));
+ }
+
long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
* (1 + DEFAULT_EXTRA_FREE_FACTOR));
@@ -622,8 +641,7 @@ public class BucketCache implements BlockCache, HeapSize {
stats = bucketAllocator.getIndexStatistics();
boolean needFreeForExtra = false;
for (int i = 0; i < stats.length; i++) {
- long freeGoal = (long) Math.floor(stats[i].totalCount()
- * (1 - DEFAULT_MIN_FACTOR));
+ long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
freeGoal = Math.max(freeGoal, 1);
if (stats[i].freeCount() < freeGoal) {
needFreeForExtra = true;
@@ -639,8 +657,7 @@ public class BucketCache implements BlockCache, HeapSize {
bucketQueue.add(bucketMulti);
while ((bucketGroup = bucketQueue.poll()) != null) {
- long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed)
- / remainingBuckets;
+ long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
bytesFreed += bucketGroup.free(bucketBytesToFree);
remainingBuckets--;
}
@@ -650,12 +667,14 @@ public class BucketCache implements BlockCache, HeapSize {
long single = bucketSingle.totalSize();
long multi = bucketMulti.totalSize();
long memory = bucketMemory.totalSize();
- LOG.debug("Bucket cache free space completed; " + "freed="
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Bucket cache free space completed; " + "freed="
+ StringUtils.byteDesc(bytesFreed) + ", " + "total="
+ StringUtils.byteDesc(totalSize) + ", " + "single="
+ StringUtils.byteDesc(single) + ", " + "multi="
+ StringUtils.byteDesc(multi) + ", " + "memory="
+ StringUtils.byteDesc(memory));
+ }
}
} finally {
@@ -666,19 +685,20 @@ public class BucketCache implements BlockCache, HeapSize {
}
// This handles flushing the RAM cache to IOEngine.
- private class WriterThread extends HasThread {
- BlockingQueue inputQueue;
- final int threadNO;
- boolean writerEnabled = true;
+ @VisibleForTesting
+ class WriterThread extends HasThread {
+ private final BlockingQueue inputQueue;
+ private final int threadNO;
+ private volatile boolean writerEnabled = true;
WriterThread(BlockingQueue queue, int threadNO) {
super();
this.inputQueue = queue;
this.threadNO = threadNO;
- setDaemon(true);
}
-
+
// Used for test
+ @VisibleForTesting
void disableWriter() {
this.writerEnabled = false;
}
@@ -690,8 +710,7 @@ public class BucketCache implements BlockCache, HeapSize {
try {
try {
// Blocks
- entries.add(inputQueue.take());
- inputQueue.drainTo(entries);
+ entries = getRAMQueueEntries(inputQueue, entries);
synchronized (cacheWaitSignals[threadNO]) {
cacheWaitSignals[threadNO].notifyAll();
}
@@ -710,80 +729,120 @@ public class BucketCache implements BlockCache, HeapSize {
}
/**
- * Flush the entries in ramCache to IOEngine and add bucket entry to
- * backingMap
- * @param entries
+ * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
+ * Process all that are passed in even if failure being sure to remove from ramCache else we'll
+ * never undo the references and we'll OOME.
+ * @param entries Presumes list passed in here will be processed by this invocation only. No
+ * interference expected.
* @throws InterruptedException
*/
- private void doDrain(List entries)
- throws InterruptedException {
- BucketEntry[] bucketEntries = new BucketEntry[entries.size()];
- RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()];
- int done = 0;
- while (entries.size() > 0 && cacheEnabled) {
- // Keep going in case we throw...
- RAMQueueEntry ramEntry = null;
+ @VisibleForTesting
+ void doDrain(final List entries) throws InterruptedException {
+ if (entries.isEmpty()) return;
+ // This method is a little hard to follow. We run through the passed in entries and for each
+ // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
+ // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
+ // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
+ // filling ramCache. We do the clean up by again running through the passed in entries
+ // doing extra work when we find a non-null bucketEntries corresponding entry.
+ final int size = entries.size();
+ BucketEntry[] bucketEntries = new BucketEntry[size];
+ // Index updated inside loop if success or if we can't succeed. We retry if cache is full
+ // when we go to add an entry by going around the loop again without upping the index.
+ int index = 0;
+ while (cacheEnabled && index < size) {
+ RAMQueueEntry re = null;
try {
- ramEntry = entries.remove(entries.size() - 1);
- if (ramEntry == null) {
- LOG.warn("Couldn't get the entry from RAM queue, who steals it?");
+ re = entries.get(index);
+ if (re == null) {
+ LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
+ index++;
continue;
}
- BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine,
- bucketAllocator, deserialiserMap, realCacheSize);
- ramEntries[done] = ramEntry;
- bucketEntries[done++] = bucketEntry;
+ BucketEntry bucketEntry =
+ re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
+ // Successfully added. Up index and add bucketEntry. Clear io exceptions.
+ bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) {
ioErrorStartTime = -1;
}
+ index++;
} catch (BucketAllocatorException fle) {
- LOG.warn("Failed allocating for block "
- + (ramEntry == null ? "" : ramEntry.getKey()), fle);
+ LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
+ // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
+ bucketEntries[index] = null;
+ index++;
} catch (CacheFullException cfe) {
+ // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
if (!freeInProgress) {
- freeSpace();
+ freeSpace("Full!");
} else {
Thread.sleep(50);
}
} catch (IOException ioex) {
+ // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
LOG.error("Failed writing to bucket cache", ioex);
checkIOErrorIsTolerated();
}
}
- // Make sure that the data pages we have written are on the media before
- // we update the map.
+ // Make sure data pages are written are on media before we update maps.
try {
ioEngine.sync();
} catch (IOException ioex) {
- LOG.error("Faild syncing IO engine", ioex);
+ LOG.error("Failed syncing IO engine", ioex);
checkIOErrorIsTolerated();
// Since we failed sync, free the blocks in bucket allocator
- for (int i = 0; i < done; ++i) {
+ for (int i = 0; i < entries.size(); ++i) {
if (bucketEntries[i] != null) {
bucketAllocator.freeBlock(bucketEntries[i].offset());
+ bucketEntries[i] = null;
}
}
- done = 0;
}
- for (int i = 0; i < done; ++i) {
+ // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
+ // success or error.
+ for (int i = 0; i < size; ++i) {
+ BlockCacheKey key = entries.get(i).getKey();
+ // Only add if non-null entry.
if (bucketEntries[i] != null) {
- backingMap.put(ramEntries[i].getKey(), bucketEntries[i]);
+ backingMap.put(key, bucketEntries[i]);
}
- RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey());
+ // Always remove from ramCache even if we failed adding it to the block cache above.
+ RAMQueueEntry ramCacheEntry = ramCache.remove(key);
if (ramCacheEntry != null) {
- heapSize.addAndGet(-1 * ramEntries[i].getData().heapSize());
+ heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
}
}
- if (bucketAllocator.getUsedSize() > acceptableSize()) {
- freeSpace();
+ long used = bucketAllocator.getUsedSize();
+ if (used > acceptableSize()) {
+ freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
}
+ return;
}
}
-
+ /**
+ * Blocks until elements available in q then tries to grab as many as possible
+ * before returning.
+ * @param recepticle Where to stash the elements taken from queue. We clear before we use it
+ * just in case.
+ * @param q The queue to take from.
+ * @return receptical laden with elements taken from the queue or empty if none found.
+ */
+ @VisibleForTesting
+ static List getRAMQueueEntries(final BlockingQueue q,
+ final List receptical)
+ throws InterruptedException {
+ // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
+ // ok even if list grew to accommodate thousands.
+ receptical.clear();
+ receptical.add(q.take());
+ q.drainTo(receptical);
+ return receptical;
+ }
private void persistToFile() throws IOException {
assert !cacheEnabled;
@@ -861,11 +920,9 @@ public class BucketCache implements BlockCache, HeapSize {
private void checkIOErrorIsTolerated() {
long now = EnvironmentEdgeManager.currentTimeMillis();
if (this.ioErrorStartTime > 0) {
- if (cacheEnabled
- && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
- LOG.error("IO errors duration time has exceeded "
- + ioErrorsTolerationDuration
- + "ms, disabing cache, please check your IOEngine");
+ if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
+ LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration +
+ "ms, disabing cache, please check your IOEngine");
disableCache();
}
} else {
@@ -969,7 +1026,7 @@ public class BucketCache implements BlockCache, HeapSize {
++numEvicted;
}
}
-
+
return numEvicted;
}
@@ -1044,7 +1101,7 @@ public class BucketCache implements BlockCache, HeapSize {
this.priority = BlockPriority.MULTI;
}
}
-
+
public BlockPriority getPriority() {
return this.priority;
}
@@ -1126,7 +1183,8 @@ public class BucketCache implements BlockCache, HeapSize {
/**
* Block Entry stored in the memory with key,data and so on
*/
- private static class RAMQueueEntry {
+ @VisibleForTesting
+ static class RAMQueueEntry {
private BlockCacheKey key;
private Cacheable data;
private long accessTime;
@@ -1161,8 +1219,7 @@ public class BucketCache implements BlockCache, HeapSize {
// This cacheable thing can't be serialized...
if (len == 0) return null;
long offset = bucketAllocator.allocateBlock(len);
- BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime,
- inMemory);
+ BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory);
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
try {
if (data instanceof HFileBlock) {
@@ -1183,7 +1240,7 @@ public class BucketCache implements BlockCache, HeapSize {
bucketAllocator.freeBlock(offset);
throw ioe;
}
-
+
realCacheSize.addAndGet(len);
return bucketEntry;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
new file mode 100644
index 0000000..d883661
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -0,0 +1,170 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category(SmallTests.class)
+public class TestBucketWriterThread {
+ private BucketCache bc;
+ private BucketCache.WriterThread wt;
+ private BlockingQueue q;
+ private Cacheable plainCacheable;
+ private BlockCacheKey plainKey;
+
+ /**
+ * Set up variables and get BucketCache and WriterThread into state where tests can manually
+ * control the running of WriterThread and BucketCache is empty.
+ * @throws Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ // Arbitrary capacity.
+ final int capacity = 16;
+ // Run with one writer thread only. Means there will be one writer queue only too. We depend
+ // on this in below.
+ final int writerThreadsCount = 1;
+ this.bc = new BucketCache("heap", capacity, 1, new int [] {1}, writerThreadsCount,
+ capacity, null, 100/*Tolerate ioerrors for 100ms*/);
+ assertEquals(writerThreadsCount, bc.writerThreads.length);
+ assertEquals(writerThreadsCount, bc.writerQueues.size());
+ // Get reference to our single WriterThread instance.
+ this.wt = bc.writerThreads[0];
+ this.q = bc.writerQueues.get(0);
+ // On construction bucketcache WriterThread is blocked on the writer queue so it will not
+ // notice the disabling of the writer until after it has processed an entry. Lets pass one
+ // through after setting disable flag on the writer. We want to disable the WriterThread so
+ // we can run the doDrain manually so we can watch it working and assert it doing right thing.
+ wt.disableWriter();
+ this.plainKey = new BlockCacheKey("f", 0);
+ this.plainCacheable = Mockito.mock(Cacheable.class);
+ bc.cacheBlock(this.plainKey, plainCacheable);
+ while(!bc.ramCache.isEmpty()) Threads.sleep(1);
+ assertTrue(q.isEmpty());
+ // Now writer thread should be disabled.
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (this.bc != null) this.bc.shutdown();
+ }
+
+ /**
+ * Test non-error case just works.
+ * @throws FileNotFoundException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=30000)
+ public void testNonErrorCase()
+ throws FileNotFoundException, IOException, InterruptedException {
+ bc.cacheBlock(this.plainKey, this.plainCacheable);
+ doDrainOfOneEntry(this.bc, this.wt, this.q);
+ }
+
+ /**
+ * Pass through a too big entry and ensure it is cleared from queues and ramCache.
+ * Manually run the WriterThread.
+ * @throws InterruptedException
+ */
+ @Test
+ public void testTooBigEntry() throws InterruptedException {
+ Cacheable tooBigCacheable = Mockito.mock(Cacheable.class);
+ Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE);
+ this.bc.cacheBlock(this.plainKey, tooBigCacheable);
+ doDrainOfOneEntry(this.bc, this.wt, this.q);
+ }
+
+ /**
+ * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then
+ * put it back and process it.
+ * @throws IOException
+ * @throws BucketAllocatorException
+ * @throws CacheFullException
+ * @throws InterruptedException
+ */
+ @SuppressWarnings("unchecked")
+ @Test (timeout=30000)
+ public void testIOE()
+ throws CacheFullException, BucketAllocatorException, IOException, InterruptedException {
+ this.bc.cacheBlock(this.plainKey, plainCacheable);
+ RAMQueueEntry rqe = q.remove();
+ RAMQueueEntry spiedRqe = Mockito.spy(rqe);
+ Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
+ writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
+ (UniqueIndexMap)Mockito.any(), (AtomicLong)Mockito.any());
+ this.q.add(spiedRqe);
+ doDrainOfOneEntry(bc, wt, q);
+ // Cache disabled when ioes w/o ever healing.
+ assertTrue(!bc.isCacheEnabled());
+ }
+
+ /**
+ * Do Cache full exception
+ * @throws IOException
+ * @throws BucketAllocatorException
+ * @throws CacheFullException
+ * @throws InterruptedException
+ */
+ @Test (timeout=30000)
+ public void testCacheFullException()
+ throws CacheFullException, BucketAllocatorException, IOException, InterruptedException {
+ this.bc.cacheBlock(this.plainKey, plainCacheable);
+ RAMQueueEntry rqe = q.remove();
+ RAMQueueEntry spiedRqe = Mockito.spy(rqe);
+ final CacheFullException cfe = new CacheFullException(0, 0);
+ BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
+ Mockito.doThrow(cfe).
+ doReturn(mockedBucketEntry).
+ when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
+ (UniqueIndexMap)Mockito.any(), (AtomicLong)Mockito.any());
+ this.q.add(spiedRqe);
+ doDrainOfOneEntry(bc, wt, q);
+ }
+
+ private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt,
+ final BlockingQueue q)
+ throws InterruptedException {
+ List rqes = BucketCache.getRAMQueueEntries(q, new ArrayList(1));
+ wt.doDrain(rqes);
+ assertTrue(q.isEmpty());
+ assertTrue(bc.ramCache.isEmpty());
+ assertEquals(0, bc.heapSize());
+ }
+}
\ No newline at end of file