diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java index 98ef78b..c0b1c89 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java @@ -57,7 +57,7 @@ public class SingleSizeCache implements BlockCache, HeapSize { private final int numBlocks; private final int blockSize; private final CacheStats stats; - private final SlabItemEvictionWatcher evictionWatcher; + private final SlabItemActionWatcher actionWatcher; private final AtomicLong size; private final AtomicLong timeSinceLastAccess; public final static long CACHE_FIXED_OVERHEAD = ClassSize @@ -78,12 +78,12 @@ public class SingleSizeCache implements BlockCache, HeapSize { * @param master the SlabCache this SingleSlabCache is assigned to. */ public SingleSizeCache(int blockSize, int numBlocks, - SlabItemEvictionWatcher master) { + SlabItemActionWatcher master) { this.blockSize = blockSize; this.numBlocks = numBlocks; backingStore = new Slab(blockSize, numBlocks); this.stats = new CacheStats(); - this.evictionWatcher = master; + this.actionWatcher = master; this.size = new AtomicLong(CACHE_FIXED_OVERHEAD + backingStore.heapSize()); this.timeSinceLastAccess = new AtomicLong(); @@ -121,11 +121,17 @@ public class SingleSizeCache implements BlockCache, HeapSize { storedBlock); toBeCached.serialize(storedBlock); - CacheablePair alreadyCached = backingMap.putIfAbsent(blockName, newEntry); + synchronized (this) { + CacheablePair alreadyCached = backingMap.putIfAbsent(blockName, newEntry); + - if (alreadyCached != null) { - backingStore.free(storedBlock); - throw new RuntimeException("already cached " + blockName); + if (alreadyCached != null) { + backingStore.free(storedBlock); + throw new RuntimeException("already cached " + blockName); + } + if (actionWatcher != null) { + actionWatcher.onInsertion(blockName, this); + } } newEntry.recentlyAccessed.set(System.nanoTime()); this.size.addAndGet(newEntry.heapSize()); @@ -198,8 +204,8 @@ public class SingleSizeCache implements BlockCache, HeapSize { // Thread A calls cacheBlock on the same block, and gets // "already cached" since the block is still in backingStore - if (evictionWatcher != null) { - evictionWatcher.onEviction(key, this); + if (actionWatcher != null) { + actionWatcher.onEviction(key, this); } } stats.evicted(); diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java index 9fb20f2..ddececd 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java @@ -50,7 +50,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * correct SingleSizeCache. * **/ -public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize { +public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize { private final ConcurrentHashMap backingStore; private final TreeMap sizer; @@ -212,34 +212,7 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize * twice */ scache.cacheBlock(blockName, cachedItem); - - /* - * If an eviction for this value hasn't taken place yet, we want to wait for - * it to take place. See HBase-4330. - */ - SingleSizeCache replace; - while ((replace = backingStore.putIfAbsent(blockName, scache)) != null) { - synchronized (replace) { - /* - * With the exception of unit tests, this should happen extremely - * rarely. - */ - try { - replace.wait(); - } catch (InterruptedException e) { - LOG.warn("InterruptedException on the caching thread: " + e); - } - } - } - - /* - * Let the eviction threads know that something has been cached, and let - * them try their hand at eviction - */ - synchronized (scache) { - scache.notifyAll(); - } - } + } /** * We don't care about whether its in memory or not, so we just pass the call @@ -292,59 +265,13 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize @Override public void onEviction(String key, Object notifier) { - /* - * Without the while loop below, the following can occur: - * - * Invariant: Anything in SingleSizeCache will have a representation in - * SlabCache, and vice-versa. - * - * Start: Key A is in both SingleSizeCache and SlabCache. Invariant is - * satisfied - * - * Thread A: Caches something, starting eviction of Key A in SingleSizeCache - * - * Thread B: Checks for Key A -> Returns Gets Null, as eviction has begun - * - * Thread B: Recaches Key A, gets to SingleSizeCache, does not get the - * PutIfAbsentLoop yet... - * - * Thread C: Caches another key, starting the second eviction of Key A. - * - * Thread A: does its onEviction, removing the entry of Key A from - * SlabCache. - * - * Thread C: does its onEviction, removing the (blank) entry of Key A from - * SlabCache: - * - * Thread B: goes to putifabsent, and puts its entry into SlabCache. - * - * Result: SlabCache has an entry for A, while SingleSizeCache has no - * entries for A. Invariant is violated. - * - * What the while loop does, is that, at the end, it GUARANTEES that an - * onEviction will remove an entry. See HBase-4482. - */ - - stats.evict(); - while ((backingStore.remove(key)) == null) { - /* With the exception of unit tests, this should happen extremely rarely. */ - synchronized (notifier) { - try { - notifier.wait(); - } catch (InterruptedException e) { - LOG.warn("InterruptedException on the evicting thread: " + e); - } - } - } stats.evicted(); - - /* - * Now we've evicted something, lets tell the caching threads to try to - * cache something. - */ - synchronized (notifier) { - notifier.notifyAll(); - } + backingStore.remove(key); + } + + @Override + public void onInsertion(String key, Object notifier) { + backingStore.put(key, (SingleSizeCache) notifier); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabItemEvictionWatcher.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabItemEvictionWatcher.java index 38bf85c..1a8e421 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabItemEvictionWatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabItemEvictionWatcher.java @@ -23,17 +23,21 @@ package org.apache.hadoop.hbase.io.hfile.slab; /** * Interface for objects that want to know when an eviction occurs. * */ -interface SlabItemEvictionWatcher { +interface SlabItemActionWatcher { /** - * This is called as a callback by the EvictionListener in each of the - * SingleSizeSlabCaches. + * This is called as a callback when an item is removed from a SingleSizeCache. * * @param key the key of the item being evicted * @param notifier the object notifying the SlabCache of the eviction. - * @param boolean callAssignedCache whether we should call the cache which the - * key was originally assigned to. */ void onEviction(String key, Object notifier); - + + /** + * This is called as a callback when an item is inserted into a SingleSizeCache. + * + * @param key the key of the item being added + * @param notifier the object notifying the SlabCache of the insertion.. + */ + void onInsertion(String key, Object notifier); } diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java index 3ac5601..64329c5 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java @@ -59,7 +59,7 @@ public class TestSlabCache { cache.shutdown(); } - @Ignore @Test + @Test public void testElementPlacement() { assertEquals(cache.getHigherBlock(BLOCK_SIZE).getKey().intValue(), (BLOCK_SIZE * 11 / 10)); @@ -67,28 +67,28 @@ public class TestSlabCache { .intValue(), (BLOCK_SIZE * 21 / 10)); } - @Ignore @Test + @Test public void testCacheSimple() throws Exception { CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); } - @Ignore @Test + @Test public void testCacheMultiThreaded() throws Exception { CacheTestUtils.testCacheMultiThreaded(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES, 0.80); } - @Ignore @Test + @Test public void testCacheMultiThreadedSingleKey() throws Exception { CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); } - @Ignore @Test + @Test public void testCacheMultiThreadedEviction() throws Exception { CacheTestUtils.hammerEviction(cache, BLOCK_SIZE, 10, NUM_QUERIES); } - @Ignore @Test + @Test /*Just checks if ranges overlap*/ public void testStatsArithmetic(){ SlabStats test = cache.requestStats; @@ -99,7 +99,7 @@ public class TestSlabCache { } } - @Ignore @Test + @Test public void testHeapSizeChanges(){ CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); }