commit 22a61230fe6b1b9aec78450eb06c73435e39ad8b Author: Todd Lipcon Date: Sun Oct 9 23:50:59 2011 -0700 Simpler slab cache diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/slab/LRUList.java src/main/java/org/apache/hadoop/hbase/io/hfile/slab/LRUList.java new file mode 100644 index 0000000..e8f2df9 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/io/hfile/slab/LRUList.java @@ -0,0 +1,90 @@ +package org.apache.hadoop.hbase.io.hfile.slab; + +import com.google.common.base.Preconditions; + +public class LRUList { + final Node HEAD_SENTINEL, TAIL_SENTINEL; + + static boolean SANITY_CHECK_ENABLED = + Boolean.getBoolean("lrulist.sanitycheck"); + + public LRUList() { + HEAD_SENTINEL = new Node(null); + TAIL_SENTINEL = new Node(null); + HEAD_SENTINEL.next = TAIL_SENTINEL; + TAIL_SENTINEL.prev = HEAD_SENTINEL; + } + + public Node addToHead(T data) { + Node newNode = new Node(data); + addToHead(newNode); + check(); + return newNode; + } + + void touch(Node node) { + node.unlink(); + addToHead(node); + check(); + } + + T removeLRU() { + Node node = TAIL_SENTINEL.prev; + if (node == HEAD_SENTINEL) { + return null; + } + node.unlink(); + assert TAIL_SENTINEL.prev != node; + check(); + return node.getValue(); + } + + private void check() { + if (!SANITY_CHECK_ENABLED) { + return; + } + + Node cur = HEAD_SENTINEL; + Node prev = null; + + while (cur != null) { + Preconditions.checkState( + cur.prev == prev); + Preconditions.checkState( + prev == null || prev.next == cur); + prev = cur; + cur = cur.next; + } + + Preconditions.checkState(prev == TAIL_SENTINEL); + Preconditions.checkState(TAIL_SENTINEL.next == null); + Preconditions.checkState(HEAD_SENTINEL.prev == null); + } + private void addToHead(Node newNode) { + newNode.prev = HEAD_SENTINEL; + newNode.next = HEAD_SENTINEL.next; + newNode.next.prev = newNode; + HEAD_SENTINEL.next = newNode; + } + + static class Node { + Node prev; + Node next; + final T val; + + public Node(T buf) { + this.val = buf; + } + + public void unlink() { + this.next.prev = this.prev; + this.prev.next = this.next; + + next = prev = null; + } + + public T getValue() { + return val; + } + } +} diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java deleted file mode 100644 index d7ab108..0000000 --- src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java +++ /dev/null @@ -1,332 +0,0 @@ -/** - * Copyright 2011 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile.slab; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; -import org.apache.hadoop.hbase.io.hfile.CacheStats; -import org.apache.hadoop.hbase.io.hfile.Cacheable; -import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.collect.MapEvictionListener; -import com.google.common.collect.MapMaker; - -/** - * SingleSizeCache is a slab allocated cache that caches elements up to a single - * size. It uses a slab allocator (Slab.java) to divide a direct bytebuffer, - * into evenly sized blocks. Any cached data will take up exactly 1 block. An - * exception will be thrown if the cached data cannot fit into the blockSize of - * this SingleSizeCache. - * - * Eviction and LRUness is taken care of by Guava's MapMaker, which creates a - * ConcurrentLinkedHashMap. - * - **/ -public class SingleSizeCache implements BlockCache, HeapSize { - private final Slab backingStore; - private final ConcurrentMap backingMap; - private final int numBlocks; - private final int blockSize; - private final CacheStats stats; - private final SlabItemEvictionWatcher evictionWatcher; - private final AtomicLong size; - private final AtomicLong timeSinceLastAccess; - public final static long CACHE_FIXED_OVERHEAD = ClassSize - .align((2 * Bytes.SIZEOF_INT) + (5 * ClassSize.REFERENCE) - + +ClassSize.OBJECT); - - static final Log LOG = LogFactory.getLog(SingleSizeCache.class); - - /** - * Default constructor. Specify the size of the blocks, number of blocks, and - * the SlabCache this cache will be assigned to. - * - * - * @param blockSize the size of each block, in bytes - * - * @param numBlocks the number of blocks of blockSize this cache will hold. - * - * @param master the SlabCache this SingleSlabCache is assigned to. - */ - public SingleSizeCache(int blockSize, int numBlocks, - SlabItemEvictionWatcher master) { - this.blockSize = blockSize; - this.numBlocks = numBlocks; - backingStore = new Slab(blockSize, numBlocks); - this.stats = new CacheStats(); - this.evictionWatcher = master; - this.size = new AtomicLong(CACHE_FIXED_OVERHEAD + backingStore.heapSize()); - this.timeSinceLastAccess = new AtomicLong(); - - // This evictionListener is called whenever the cache automatically - // evicts - // something. - MapEvictionListener listener = new MapEvictionListener() { - @Override - public void onEviction(String key, CacheablePair value) { - timeSinceLastAccess.set(System.nanoTime() - - value.recentlyAccessed.get()); - stats.evict(); - doEviction(key, value); - } - }; - - backingMap = new MapMaker().maximumSize(numBlocks - 1) - .evictionListener(listener).makeMap(); - - } - - @Override - public void cacheBlock(String blockName, Cacheable toBeCached) { - ByteBuffer storedBlock; - - try { - storedBlock = backingStore.alloc(toBeCached.getSerializedLength()); - } catch (InterruptedException e) { - LOG.warn("SlabAllocator was interrupted while waiting for block to become available"); - LOG.warn(e); - return; - } - - CacheablePair newEntry = new CacheablePair(toBeCached.getDeserializer(), - storedBlock); - toBeCached.serialize(storedBlock); - - CacheablePair alreadyCached = backingMap.putIfAbsent(blockName, newEntry); - - if (alreadyCached != null) { - backingStore.free(storedBlock); - throw new RuntimeException("already cached " + blockName); - } - newEntry.recentlyAccessed.set(System.nanoTime()); - this.size.addAndGet(newEntry.heapSize()); - } - - @Override - public Cacheable getBlock(String key, boolean caching) { - CacheablePair contentBlock = backingMap.get(key); - if (contentBlock == null) { - stats.miss(caching); - return null; - } - - stats.hit(caching); - // If lock cannot be obtained, that means we're undergoing eviction. - try { - contentBlock.recentlyAccessed.set(System.nanoTime()); - synchronized (contentBlock) { - if (contentBlock.serializedData == null) { - // concurrently evicted - LOG.warn("Concurrent eviction of " + key); - return null; - } - return contentBlock.deserializer - .deserialize(contentBlock.serializedData.asReadOnlyBuffer()); - } - } catch (Throwable t) { - LOG.error("Deserializer threw an exception. This may indicate a bug.", t); - return null; - } - } - - /** - * Evicts the block - * - * @param key the key of the entry we are going to evict - * @return the evicted ByteBuffer - */ - public boolean evictBlock(String key) { - stats.evict(); - CacheablePair evictedBlock = backingMap.remove(key); - - if (evictedBlock != null) { - doEviction(key, evictedBlock); - } - return evictedBlock != null; - - } - - private void doEviction(String key, CacheablePair evictedBlock) { - long evictedHeap = 0; - synchronized (evictedBlock) { - if (evictedBlock.serializedData == null) { - // someone else already freed - return; - } - evictedHeap = evictedBlock.heapSize(); - ByteBuffer bb = evictedBlock.serializedData; - evictedBlock.serializedData = null; - backingStore.free(bb); - - // We have to do this callback inside the synchronization here. - // Otherwise we can have the following interleaving: - // Thread A calls getBlock(): - // SlabCache directs call to this SingleSizeCache - // It gets the CacheablePair object - // Thread B runs eviction - // doEviction() is called and sets serializedData = null, here. - // Thread A sees the null serializedData, and returns null - // Thread A calls cacheBlock on the same block, and gets - // "already cached" since the block is still in backingStore - - if (evictionWatcher != null) { - evictionWatcher.onEviction(key, this); - } - } - stats.evicted(); - size.addAndGet(-1 * evictedHeap); - } - - public void logStats() { - - long milliseconds = this.timeSinceLastAccess.get() / 1000000; - - LOG.info("For Slab of size " + this.blockSize + ": " - + this.getOccupiedSize() / this.blockSize - + " occupied, out of a capacity of " + this.numBlocks - + " blocks. HeapSize is " - + StringUtils.humanReadableInt(this.heapSize()) + " bytes." + ", " - + "churnTime=" + StringUtils.formatTime(milliseconds)); - - LOG.info("Slab Stats: " + "accesses=" - + stats.getRequestCount() - + ", " - + "hits=" - + stats.getHitCount() - + ", " - + "hitRatio=" - + (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent( - stats.getHitRatio(), 2) + "%, ")) - + "cachingAccesses=" - + stats.getRequestCachingCount() - + ", " - + "cachingHits=" - + stats.getHitCachingCount() - + ", " - + "cachingHitsRatio=" - + (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent( - stats.getHitCachingRatio(), 2) + "%, ")) + "evictions=" - + stats.getEvictionCount() + ", " + "evicted=" - + stats.getEvictedCount() + ", " + "evictedPerRun=" - + stats.evictedPerEviction()); - - } - - public void shutdown() { - backingStore.shutdown(); - } - - public long heapSize() { - return this.size.get() + backingStore.heapSize(); - } - - public long size() { - return (long) this.blockSize * (long) this.numBlocks; - } - - public long getFreeSize() { - return (long) backingStore.getBlocksRemaining() * (long) blockSize; - } - - public long getOccupiedSize() { - return (long) (numBlocks - backingStore.getBlocksRemaining()) * (long) blockSize; - } - - public long getEvictedCount() { - return stats.getEvictedCount(); - } - - public CacheStats getStats() { - return this.stats; - } - - /* Since its offheap, it doesn't matter if its in memory or not */ - @Override - public void cacheBlock(String blockName, Cacheable buf, boolean inMemory) { - this.cacheBlock(blockName, buf); - } - - /* - * This is never called, as evictions are handled in the SlabCache layer, - * implemented in the event we want to use this as a standalone cache. - */ - @Override - public int evictBlocksByPrefix(String prefix) { - int evictedCount = 0; - for (String e : backingMap.keySet()) { - if (e.startsWith(prefix)) { - this.evictBlock(e); - } - } - return evictedCount; - } - - @Override - public long getCurrentSize() { - return 0; - } - - /* - * Not implemented. Extremely costly to do this from the off heap cache, you'd - * need to copy every object on heap once - */ - @Override - public List getBlockCacheColumnFamilySummaries( - Configuration conf) { - throw new UnsupportedOperationException(); - } - - /* Just a pair class, holds a reference to the parent cacheable */ - private class CacheablePair implements HeapSize { - final CacheableDeserializer deserializer; - ByteBuffer serializedData; - AtomicLong recentlyAccessed; - - private CacheablePair(CacheableDeserializer deserializer, - ByteBuffer serializedData) { - this.recentlyAccessed = new AtomicLong(); - this.deserializer = deserializer; - this.serializedData = serializedData; - } - - /* - * Heapsize overhead of this is the default object overhead, the heapsize of - * the serialized object, and the cost of a reference to the bytebuffer, - * which is already accounted for in SingleSizeCache - */ - @Override - public long heapSize() { - return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE * 3 - + ClassSize.ATOMIC_LONG); - } - } -} diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java index ed32980..969e5ca 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java @@ -39,7 +39,7 @@ class Slab implements org.apache.hadoop.hbase.io.HeapSize { static final Log LOG = LogFactory.getLog(Slab.class); /** This is where our items, or blocks of the slab, are stored. */ - private LinkedBlockingQueue buffers; + private ConcurrentLinkedQueue buffers; /** This is where our Slabs are stored */ private ConcurrentLinkedQueue slabs; @@ -49,7 +49,7 @@ class Slab implements org.apache.hadoop.hbase.io.HeapSize { private long heapSize; Slab(int blockSize, int numBlocks) { - buffers = new LinkedBlockingQueue(); + buffers = new ConcurrentLinkedQueue(); slabs = new ConcurrentLinkedQueue(); this.blockSize = blockSize; @@ -111,14 +111,16 @@ class Slab implements org.apache.hadoop.hbase.io.HeapSize { /* * Throws an exception if you try to allocate a - * bigger size than the allocator can handle. Alloc will block until a buffer is available. + * bigger size than the allocator can handle. + * Returns null if no buffer is available. */ - ByteBuffer alloc(int bufferSize) throws InterruptedException { + ByteBuffer alloc(int bufferSize) { int newCapacity = Preconditions.checkPositionIndex(bufferSize, blockSize); - ByteBuffer returnedBuffer = buffers.take(); - - returnedBuffer.clear().limit(newCapacity); + ByteBuffer returnedBuffer = buffers.poll(); + if (returnedBuffer != null) { + returnedBuffer.clear().limit(newCapacity); + } return returnedBuffer; } diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java index b3e6538..8afc35e 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java @@ -21,10 +21,11 @@ package org.apache.hadoop.hbase.io.hfile.slab; import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; import org.apache.hadoop.hbase.io.hfile.CacheStats; import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.util.StringUtils; @@ -50,10 +52,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * correct SingleSizeCache. * **/ -public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize { +public class SlabCache implements BlockCache, HeapSize { + + private final HashMap cacheMap; + private final TreeMap slabs; + private final TreeMap> lrus; - private final ConcurrentHashMap backingStore; - private final TreeMap sizer; static final Log LOG = LogFactory.getLog(SlabCache.class); static final int STAT_THREAD_PERIOD_SECS = 60 * 5; @@ -84,8 +88,10 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize this.requestStats = new SlabStats(); this.successfullyCachedStats = new SlabStats(); - backingStore = new ConcurrentHashMap(); - sizer = new TreeMap(); + this.cacheMap = new HashMap(); + this.slabs = new TreeMap(); + this.lrus = new TreeMap>(); + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD_SECS, STAT_THREAD_PERIOD_SECS, TimeUnit.SECONDS); @@ -164,8 +170,8 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize * @return the Slab that the above bytebuffer would be allocated towards. If * object is too large, returns null. */ - Entry getHigherBlock(int size) { - return sizer.higherEntry(size - 1); + Entry getSlab(int size) { + return slabs.higherEntry(size - 1); } private BigDecimal[] stringArrayToBigDecimalArray(String[] parsee) { @@ -179,7 +185,8 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize private void addSlab(int blockSize, int numBlocks) { LOG.info("Creating a slab of blockSize " + blockSize + " with " + numBlocks + " blocks."); - sizer.put(blockSize, new SingleSizeCache(blockSize, numBlocks, this)); + slabs.put(blockSize, new Slab(blockSize, numBlocks)); + lrus.put(blockSize, new LRUList()); } /** @@ -192,52 +199,77 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize * a race condition, and will throw a runtime exception. * * @param blockName block name - * @param cachedItem block buffer + * @param item block buffer */ - public void cacheBlock(String blockName, Cacheable cachedItem) { - Entry scacheEntry = getHigherBlock(cachedItem - .getSerializedLength()); - - this.requestStats.addin(cachedItem.getSerializedLength()); + public void cacheBlock(String blockName, Cacheable item) { + if (cacheMap.containsKey(blockName)) { + throw new RuntimeException("already cached"); + } + + int len = item.getSerializedLength(); + Entry slabEntry = getSlab(len); + this.requestStats.addin(len); - if (scacheEntry == null) { + if (slabEntry == null) { return; // we can't cache, something too big. } - - this.successfullyCachedStats.addin(cachedItem.getSerializedLength()); - SingleSizeCache scache = scacheEntry.getValue(); - - /* - * This will throw a runtime exception if we try to cache the same value - * twice - */ - scache.cacheBlock(blockName, cachedItem); - - /* - * If an eviction for this value hasn't taken place yet, we want to wait for - * it to take place. See HBase-4330. - */ - SingleSizeCache replace; - while ((replace = backingStore.putIfAbsent(blockName, scache)) != null) { - synchronized (replace) { - /* - * With the exception of unit tests, this should happen extremely - * rarely. - */ - try { - replace.wait(); - } catch (InterruptedException e) { - LOG.warn("InterruptedException on the caching thread: " + e); - } + slabEntry.getKey(); + + Slab slab = slabEntry.getValue(); + LRUList lru = lrus.get(slabEntry.getKey()); + + ByteBuffer buf = slab.alloc(len); + if (buf == null) { + evictOne(lru); + + buf = slab.alloc(len); + if (buf == null) { + LOG.warn("Unable to allocate even after evicting!"); + return; } } + assert buf != null; + + boolean success = false; + try { + item.serialize(buf); + success = true; + } finally { + if (!success) { + slab.free(buf); + } + } + + synchronized (this) { + CachedItem ci = new CachedItem(blockName, buf, item.getDeserializer()); + cacheMap.put(blockName, ci); + ci.lruNode = lru.addToHead(ci); + } + + this.successfullyCachedStats.addin(len); + } - /* - * Let the eviction threads know that something has been cached, and let - * them try their hand at eviction - */ - synchronized (scache) { - scache.notifyAll(); + private void evictOne(LRUList lru) { + CachedItem evicted; + synchronized (this) { + evicted = lru.removeLRU(); + if (evicted == null) { + LOG.warn("Unable to evict from slab!"); + return; + } + CachedItem removedFromMap = cacheMap.remove(evicted.key); + if (removedFromMap != evicted) { + throw new AssertionError("Invalid state! Evicted " + + evicted.key + " via LRU, but in map pointed to a different " + + "cached item: " + removedFromMap); + } + } + stats.evicted(); + + synchronized (evicted) { + Slab slab = getSlab(evicted.data.capacity()).getValue(); + slab.free(evicted.data); + evicted.data = null; } } @@ -260,100 +292,70 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize * @return buffer of specified block name, or null if not in cache */ public Cacheable getBlock(String key, boolean caching) { - SingleSizeCache cachedBlock = backingStore.get(key); - if (cachedBlock == null) { - stats.miss(caching); - return null; + CachedItem ci; + synchronized (this) { + ci = cacheMap.get(key); + if (ci == null) { + stats.miss(caching); + return null; + } + + LRUList lru = lrus.higherEntry(ci.data.capacity()).getValue(); + lru.touch(ci.lruNode); } - Cacheable contentBlock = cachedBlock.getBlock(key, caching); + Cacheable cacheable; + synchronized (ci) { + if (ci.data == null) { + // concurrently evicted + stats.miss(caching); + return null; + } + + try { + cacheable = ci.deserializer.deserialize( + ci.data.asReadOnlyBuffer()); + } catch (Throwable t) { + LOG.error("Deserializer threw an exception deserializing for key '" + + ci.key + "'. This may indicate a bug.", t); + return null; + } + } - if (contentBlock != null) { + if (cacheable != null) { stats.hit(caching); } else { stats.miss(caching); } - return contentBlock; + + + return cacheable; } /** * Evicts a block from the cache. This is public, and thus contributes to the * the evict counter. */ - public boolean evictBlock(String key) { - SingleSizeCache cacheEntry = backingStore.get(key); - if (cacheEntry == null) { + public synchronized boolean evictBlock(String key) { + CachedItem ci = cacheMap.remove(key); + if (ci == null) { return false; } else { - cacheEntry.evictBlock(key); + ci.lruNode.unlink(); + Slab slab = getSlab(ci.data.capacity()).getValue(); + slab.free(ci.data); return true; } } - @Override - public void onEviction(String key, Object notifier) { - /* - * Without the while loop below, the following can occur: - * - * Invariant: Anything in SingleSizeCache will have a representation in - * SlabCache, and vice-versa. - * - * Start: Key A is in both SingleSizeCache and SlabCache. Invariant is - * satisfied - * - * Thread A: Caches something, starting eviction of Key A in SingleSizeCache - * - * Thread B: Checks for Key A -> Returns Gets Null, as eviction has begun - * - * Thread B: Recaches Key A, gets to SingleSizeCache, does not get the - * PutIfAbsentLoop yet... - * - * Thread C: Caches another key, starting the second eviction of Key A. - * - * Thread A: does its onEviction, removing the entry of Key A from - * SlabCache. - * - * Thread C: does its onEviction, removing the (blank) entry of Key A from - * SlabCache: - * - * Thread B: goes to putifabsent, and puts its entry into SlabCache. - * - * Result: SlabCache has an entry for A, while SingleSizeCache has no - * entries for A. Invariant is violated. - * - * What the while loop does, is that, at the end, it GUARANTEES that an - * onEviction will remove an entry. See HBase-4482. - */ - - stats.evict(); - while ((backingStore.remove(key)) == null) { - /* With the exception of unit tests, this should happen extremely rarely. */ - synchronized (notifier) { - try { - notifier.wait(); - } catch (InterruptedException e) { - LOG.warn("InterruptedException on the evicting thread: " + e); - } - } - } - stats.evicted(); - - /* - * Now we've evicted something, lets tell the caching threads to try to - * cache something. - */ - synchronized (notifier) { - notifier.notifyAll(); - } - } - + /** * Sends a shutdown to all SingleSizeCache's contained by this cache. * * Also terminates the scheduleThreadPool. */ public void shutdown() { - for (SingleSizeCache s : sizer.values()) { + for (Slab s : slabs.values()) { s.shutdown(); } this.scheduleThreadPool.shutdown(); @@ -361,7 +363,7 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize public long heapSize() { long childCacheSize = 0; - for (SingleSizeCache s : sizer.values()) { + for (Slab s : slabs.values()) { childCacheSize += s.heapSize(); } return SlabCache.CACHE_FIXED_OVERHEAD + childCacheSize; @@ -382,6 +384,20 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize public long getEvictedCount() { return stats.getEvictedCount(); } + + static class CachedItem { + final String key; + ByteBuffer data; + final CacheableDeserializer deserializer; + LRUList.Node lruNode; + + public CachedItem(String key, ByteBuffer data, + CacheableDeserializer deserializer) { + this.key = key; + this.data = data; + this.deserializer = deserializer; + } + } /* * Statistics thread. Periodically prints the cache statistics to the log. @@ -397,10 +413,6 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize @Override public void run() { - for (SingleSizeCache s : ourcache.sizer.values()) { - s.logStats(); - } - SlabCache.LOG.info("Current heap size is: " + StringUtils.humanReadableInt(ourcache.heapSize())); @@ -464,9 +476,10 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize } } - public int evictBlocksByPrefix(String prefix) { + @Override + public synchronized int evictBlocksByPrefix(String prefix) { int numEvicted = 0; - for (String key : backingStore.keySet()) { + for (String key : cacheMap.keySet()) { if (key.startsWith(prefix)) { if (evictBlock(key)) ++numEvicted; diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index f7211b5..1698ff6 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -234,9 +234,9 @@ public class CacheTestUtils { assertTrue(toBeTested.getStats().getEvictedCount() > 0); } - private static class ByteArrayCacheable implements Cacheable { + public static class ByteArrayCacheable implements Cacheable { - final byte[] buf; + public final byte[] buf; public ByteArrayCacheable(byte[] buf) { this.buf = buf; diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestLRUList.java src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestLRUList.java new file mode 100644 index 0000000..65aa0a9 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestLRUList.java @@ -0,0 +1,68 @@ +package org.apache.hadoop.hbase.io.hfile.slab; + +import static org.junit.Assert.*; + +import org.apache.hadoop.hbase.io.hfile.slab.LRUList.Node; +import org.junit.Test; + + +public class TestLRUList { + LRUList l = new LRUList(); + + @SuppressWarnings("unchecked") + @Test + public void testAddRemove() { + assertNull(l.removeLRU()); + + Node node = l.addToHead("hello"); + assertLruList(node); + assertEquals("hello", l.removeLRU()); + assertNull(l.removeLRU()); + + // removal should be FIFO order + Node hello = l.addToHead("hello"); + Node goodbye = l.addToHead("goodbye"); + assertLruList(goodbye, hello); + assertEquals("hello", l.removeLRU()); + assertEquals("goodbye", l.removeLRU()); + assertNull(l.removeLRU()); + } + + @SuppressWarnings("unchecked") + @Test + public void testTouchMovesToHead() { + Node nodeA = l.addToHead("a"); + Node nodeB = l.addToHead("b"); + Node nodeC = l.addToHead("c"); + assertLruList(nodeC, nodeB, nodeA); + + // move A to head + l.touch(nodeA); + assertLruList(nodeA, nodeC, nodeB); + + // then B + l.touch(nodeB); + assertLruList(nodeB, nodeA, nodeC); + + // B again to make sure it doesn't mess up anything + l.touch(nodeB); + assertLruList(nodeB, nodeA, nodeC); + + assertEquals("c", l.removeLRU()); + assertEquals("a", l.removeLRU()); + assertEquals("b", l.removeLRU()); + } + + private void assertLruList(Node ... nodes) { + Node cur = l.HEAD_SENTINEL.next; + Node prev = l.HEAD_SENTINEL; + for (Node node : nodes) { + assertSame(node, cur); + assertSame(prev, node.prev); + prev = cur; + cur = cur.next; + } + assertSame(l.TAIL_SENTINEL, cur); + assertSame(l.TAIL_SENTINEL.prev, prev); + } +} diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java deleted file mode 100644 index ef2d147..0000000 --- src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Copyright 2011 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile.slab; - -import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -/** - * Tests SingleSlabCache. - *

- * - * Tests will ensure that evictions operate when they're supposed to and do what - * they should, and that cached blocks are accessible when expected to be. - */ -public class TestSingleSizeCache { - SingleSizeCache cache; - final int CACHE_SIZE = 1000000; - final int NUM_BLOCKS = 100; - final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; - final int NUM_THREADS = 100; - final int NUM_QUERIES = 10000; - - @Before - public void setup() { - cache = new SingleSizeCache(BLOCK_SIZE, NUM_BLOCKS, null); - } - - @After - public void tearDown() { - cache.shutdown(); - } - - @Test - public void testCacheSimple() throws Exception { - CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); - } - - @Test - public void testCacheMultiThreaded() throws Exception { - CacheTestUtils.testCacheMultiThreaded(cache, BLOCK_SIZE, - NUM_THREADS, NUM_QUERIES, 0.80); - } - - @Test - public void testCacheMultiThreadedSingleKey() throws Exception { - CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); - } - - @Test - public void testCacheMultiThreadedEviction() throws Exception { - CacheTestUtils.hammerEviction(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); - } - - @Test - public void testHeapSizeChanges(){ - CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); - } - -} diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java index 37b7302..eeaa240 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.ByteArrayCacheable; import org.apache.hadoop.hbase.io.hfile.slab.SlabCache.SlabStats; import org.junit.After; import org.junit.Before; @@ -51,6 +52,7 @@ public class TestSlabCache { public void setup() { cache = new SlabCache(CACHE_SIZE + BLOCK_SIZE * 2, BLOCK_SIZE); cache.addSlabByConf(new Configuration()); + LRUList.SANITY_CHECK_ENABLED = true; } @After @@ -60,9 +62,9 @@ public class TestSlabCache { @Test public void testElementPlacement() { - assertEquals(cache.getHigherBlock(BLOCK_SIZE).getKey().intValue(), + assertEquals(cache.getSlab(BLOCK_SIZE).getKey().intValue(), (BLOCK_SIZE * 11 / 10)); - assertEquals(cache.getHigherBlock((BLOCK_SIZE * 2)).getKey() + assertEquals(cache.getSlab((BLOCK_SIZE * 2)).getKey() .intValue(), (BLOCK_SIZE * 21 / 10)); } @@ -70,6 +72,27 @@ public class TestSlabCache { public void testCacheSimple() throws Exception { CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); } + + @Test + public void testSimpleEviction() throws Exception { + for (int i = 0; i < 100; i++) { + ByteArrayCacheable bac = new ByteArrayCacheable( + new byte[] { (byte)i }); + cache.cacheBlock("key_" + i, bac); + } + + int gotBack = 0; + for (int i = 0; i < 100; i++) { + ByteArrayCacheable bac = (ByteArrayCacheable) + cache.getBlock("key_" + i, true); + if (bac != null) { + gotBack++; + + assertEquals((byte)i, bac.buf[0]); + } + } + assertEquals(100 - cache.getEvictedCount(), gotBack); + } @Test public void testCacheMultiThreaded() throws Exception {