diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 85e784d..a012e1e 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -91,18 +91,17 @@ public class HFileBlock implements Cacheable { private static final CacheableDeserializer blockDeserializer = new CacheableDeserializer() { public HFileBlock deserialize(ByteBuffer buf) throws IOException{ - ByteBuffer tempCopy = buf.duplicate(); - ByteBuffer newByteBuffer = ByteBuffer.allocate(tempCopy.limit() + ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE); - tempCopy.limit(tempCopy.limit() + buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); - newByteBuffer.put(tempCopy); + newByteBuffer.put(buf); HFileBlock ourBuffer = new HFileBlock(newByteBuffer); - tempCopy.position(tempCopy.limit()); - tempCopy.limit(tempCopy.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE); - ourBuffer.offset = tempCopy.getLong(); - ourBuffer.nextBlockOnDiskSizeWithHeader = tempCopy.getInt(); + buf.position(buf.limit()); + buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE); + ourBuffer.offset = buf.getLong(); + ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt(); return ourBuffer; } }; diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java index 18a3332..8ac54e6 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java @@ -19,12 +19,10 @@ */ package org.apache.hadoop.hbase.io.hfile.slab; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -94,24 +92,9 @@ public class SingleSizeCache implements BlockCache { MapEvictionListener listener = new MapEvictionListener() { @Override public void onEviction(String key, CacheablePair value) { - try { - value.evictionLock.writeLock().lock(); - timeSinceLastAccess.set(System.nanoTime() - - value.recentlyAccessed.get()); - backingStore.free(value.serializedData); - stats.evict(); - /** - * We may choose to run this cache alone, without the SlabCache on - * top, no evictionWatcher in that case - */ - if (evictionWatcher != null) { - evictionWatcher.onEviction(key, false); - } - size.addAndGet(-1 * value.heapSize()); - stats.evicted(); - } finally { - value.evictionLock.writeLock().unlock(); - } + timeSinceLastAccess.set(System.nanoTime() + - value.recentlyAccessed.get()); + doEviction(key, value); } }; @@ -121,7 +104,7 @@ public class SingleSizeCache implements BlockCache { } @Override - public synchronized void cacheBlock(String blockName, Cacheable toBeCached) { + public void cacheBlock(String blockName, Cacheable toBeCached) { ByteBuffer storedBlock; /* @@ -129,12 +112,18 @@ public class SingleSizeCache implements BlockCache { * items than the memory we have allocated, but the Slab Allocator may still * be empty if we have not yet completed eviction */ - do { + + try { storedBlock = backingStore.alloc(toBeCached.getSerializedLength()); - } while (storedBlock == null); + } catch (InterruptedException e) { + LOG.warn("SlabAllocator was interrupted while waiting for block to become available"); + LOG.warn(e); + return; + } CacheablePair newEntry = new CacheablePair(toBeCached.getDeserializer(), storedBlock); + toBeCached.serialize(storedBlock); CacheablePair alreadyCached = backingMap.putIfAbsent(blockName, newEntry); @@ -142,7 +131,6 @@ public class SingleSizeCache implements BlockCache { backingStore.free(storedBlock); throw new RuntimeException("already cached " + blockName); } - toBeCached.serialize(storedBlock); newEntry.recentlyAccessed.set(System.nanoTime()); this.size.addAndGet(newEntry.heapSize()); } @@ -157,20 +145,21 @@ public class SingleSizeCache implements BlockCache { stats.hit(caching); // If lock cannot be obtained, that means we're undergoing eviction. - if (contentBlock.evictionLock.readLock().tryLock()) { - try { - contentBlock.recentlyAccessed.set(System.nanoTime()); + try { + contentBlock.recentlyAccessed.set(System.nanoTime()); + synchronized (contentBlock) { + if (contentBlock.serializedData == null) { + // concurrently evicted + LOG.warn("Concurrent eviction of " + key); + return null; + } return contentBlock.deserializer - .deserialize(contentBlock.serializedData); - } catch (IOException e) { - e.printStackTrace(); - LOG.warn("Deserializer throwing ioexception, possibly deserializing wrong object buffer"); - return null; - } finally { - contentBlock.evictionLock.readLock().unlock(); + .deserialize(contentBlock.serializedData.asReadOnlyBuffer()); } + } catch (Throwable t) { + LOG.error("Deserializer threw an exception. This may indicate a bug.", t); + return null; } - return null; } /** @@ -183,23 +172,45 @@ public class SingleSizeCache implements BlockCache { stats.evict(); CacheablePair evictedBlock = backingMap.remove(key); if (evictedBlock != null) { - try { - evictedBlock.evictionLock.writeLock().lock(); - backingStore.free(evictedBlock.serializedData); - evictionWatcher.onEviction(key, false); - stats.evicted(); - size.addAndGet(-1 * evictedBlock.heapSize()); - } finally { - evictedBlock.evictionLock.writeLock().unlock(); - } + doEviction(key, evictedBlock); } return evictedBlock != null; } + private void doEviction(String key, CacheablePair evictedBlock) { + long evictedHeap = 0; + synchronized (evictedBlock) { + if (evictedBlock.serializedData == null) { + // someone else already freed + return; + } + evictedHeap = evictedBlock.heapSize(); + ByteBuffer bb = evictedBlock.serializedData; + evictedBlock.serializedData = null; + backingStore.free(bb); + + // We have to do this callback inside the synchronization here. + // Otherwise we can have the following interleaving: + // Thread A calls getBlock(): + // SlabCache directs call to this SingleSizeCache + // It gets the CacheablePair object + // Thread B runs eviction + // doEviction() is called and sets serializedData = null, here. + // Thread A sees the null serializedData, and returns null + // Thread A calls cacheBlock on the same block, and gets + // "already cached" since the block is still in backingStore + if (evictionWatcher != null) { + evictionWatcher.onEviction(key, false); + } + } + stats.evicted(); + size.addAndGet(-1 * evictedHeap); + } + public void logStats() { - long milliseconds = (long)this.timeSinceLastAccess.get() / 1000000; + long milliseconds = (long) this.timeSinceLastAccess.get() / 1000000; LOG.info("For Slab of size " + this.blockSize + ": " + this.getOccupiedSize() / this.blockSize @@ -299,8 +310,7 @@ public class SingleSizeCache implements BlockCache { /* Just a pair class, holds a reference to the parent cacheable */ private class CacheablePair implements HeapSize { final CacheableDeserializer deserializer; - final ByteBuffer serializedData; - final ReentrantReadWriteLock evictionLock; + ByteBuffer serializedData; AtomicLong recentlyAccessed; private CacheablePair(CacheableDeserializer deserializer, @@ -308,7 +318,6 @@ public class SingleSizeCache implements BlockCache { this.recentlyAccessed = new AtomicLong(); this.deserializer = deserializer; this.serializedData = serializedData; - evictionLock = new ReentrantReadWriteLock(); } /* diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java index 9811c6b..ed32980 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.io.hfile.slab; import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.util.ClassSize; @@ -37,7 +39,7 @@ class Slab implements org.apache.hadoop.hbase.io.HeapSize { static final Log LOG = LogFactory.getLog(Slab.class); /** This is where our items, or blocks of the slab, are stored. */ - private ConcurrentLinkedQueue buffers; + private LinkedBlockingQueue buffers; /** This is where our Slabs are stored */ private ConcurrentLinkedQueue slabs; @@ -47,7 +49,7 @@ class Slab implements org.apache.hadoop.hbase.io.HeapSize { private long heapSize; Slab(int blockSize, int numBlocks) { - buffers = new ConcurrentLinkedQueue(); + buffers = new LinkedBlockingQueue(); slabs = new ConcurrentLinkedQueue(); this.blockSize = blockSize; @@ -108,16 +110,13 @@ class Slab implements org.apache.hadoop.hbase.io.HeapSize { } /* - * This returns null if empty. Throws an exception if you try to allocate a - * bigger size than the allocator can handle. + * Throws an exception if you try to allocate a + * bigger size than the allocator can handle. Alloc will block until a buffer is available. */ - ByteBuffer alloc(int bufferSize) { + ByteBuffer alloc(int bufferSize) throws InterruptedException { int newCapacity = Preconditions.checkPositionIndex(bufferSize, blockSize); - ByteBuffer returnedBuffer = buffers.poll(); - if (returnedBuffer == null) { - return null; - } + ByteBuffer returnedBuffer = buffers.take(); returnedBuffer.clear().limit(newCapacity); return returnedBuffer; diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java index 1611349..28a3725 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java @@ -232,6 +232,7 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize public Cacheable getBlock(String key, boolean caching) { SingleSizeCache cachedBlock = backingStore.get(key); if (cachedBlock == null) { + // TODO: this is a miss, isn't it? return null; } @@ -272,12 +273,15 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize } /** - * Sends a shutdown to all SingleSizeCache's contained by this cache.F + * Sends a shutdown to all SingleSizeCache's contained by this cache. + * + * Also terminates the scheduleThreadPool. */ public void shutdown() { for (SingleSizeCache s : sizer.values()) { s.shutdown(); } + this.scheduleThreadPool.shutdown(); } public long heapSize() { diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index ed31503..ff9be10 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.*; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashSet; @@ -57,8 +58,8 @@ public class CacheTestUtils { public void doAnAction() throws Exception { if (!blocksToTest.isEmpty()) { HFileBlockPair ourBlock = blocksToTest.poll(); - //if we run out of blocks to test, then we should stop the tests. - if(ourBlock == null){ + // if we run out of blocks to test, then we should stop the tests. + if (ourBlock == null) { ctx.stop(); return; } @@ -82,8 +83,9 @@ public class CacheTestUtils { Thread.sleep(10); } ctx.stop(); - if((double) hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore){ - fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get()); + if ((double) hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) { + fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + + miss.get()); } } @@ -130,19 +132,68 @@ public class CacheTestUtils { public static void hammerSingleKey(final BlockCache toBeTested, int BlockSize, int numThreads, int numQueries) throws Exception { - final HFileBlockPair kv = generateHFileBlocks(BlockSize, 1)[0]; + final String key = "key"; + final byte[] buf = new byte[5 * 1024]; + Arrays.fill(buf, (byte) 5); + + final ByteArrayCacheable bac = new ByteArrayCacheable(buf); + Configuration conf = new Configuration(); + MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( + conf); + + final AtomicInteger totalQueries = new AtomicInteger(); + toBeTested.cacheBlock(key, bac); + + for (int i = 0; i < numThreads; i++) { + TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { + @Override + public void doAnAction() throws Exception { + ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested + .getBlock(key, false); + assertArrayEquals(buf, returned.buf); + totalQueries.incrementAndGet(); + } + }; + + ctx.addThread(t); + } + + ctx.startThreads(); + while (totalQueries.get() < numQueries && ctx.shouldRun()) { + Thread.sleep(10); + } + ctx.stop(); + } + + public static void hammerEviction(final BlockCache toBeTested, int BlockSize, + int numThreads, int numQueries) throws Exception { + Configuration conf = new Configuration(); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( conf); final AtomicInteger totalQueries = new AtomicInteger(); - toBeTested.cacheBlock(kv.blockName, kv.block); for (int i = 0; i < numThreads; i++) { + final int finalI = i; + + final byte[] buf = new byte[5 * 1024]; TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { @Override public void doAnAction() throws Exception { - assertEquals(kv.block, toBeTested.getBlock(kv.blockName, false)); + for (int j = 0; j < 10; j++) { + String key = "key_" + finalI + "_" + j; + Arrays.fill(buf, (byte) (finalI * j)); + final ByteArrayCacheable bac = new ByteArrayCacheable(buf); + + ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested + .getBlock(key, true); + if (gotBack != null) { + assertArrayEquals(gotBack.buf, bac.buf); + } else { + toBeTested.cacheBlock(key, bac); + } + } totalQueries.incrementAndGet(); } }; @@ -155,6 +206,51 @@ public class CacheTestUtils { Thread.sleep(10); } ctx.stop(); + + assertTrue(toBeTested.getStats().getEvictedCount() > 0); + } + + private static class ByteArrayCacheable implements Cacheable { + + final byte[] buf; + + public ByteArrayCacheable(byte[] buf) { + this.buf = buf; + } + + @Override + public long heapSize() { + return 4 + buf.length; + } + + @Override + public int getSerializedLength() { + return 4 + buf.length; + } + + @Override + public void serialize(ByteBuffer destination) { + destination.putInt(buf.length); + Thread.yield(); + destination.put(buf); + destination.rewind(); + } + + @Override + public CacheableDeserializer getDeserializer() { + return new CacheableDeserializer() { + + @Override + public Cacheable deserialize(ByteBuffer b) throws IOException { + int len = b.getInt(); + Thread.yield(); + byte buf[] = new byte[len]; + b.get(buf); + return new ByteArrayCacheable(buf); + } + }; + } + } private static HFileBlockPair[] generateHFileBlocks(int blockSize, diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java index d38fe7a..6881188 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java @@ -63,5 +63,10 @@ public class TestSingleSizeCache { public void testCacheMultiThreadedSingleKey() throws Exception { CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); } + + @Test + public void testCacheMultiThreadedEviction() throws Exception { + CacheTestUtils.hammerEviction(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); + } } diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlab.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlab.java index ad05cc6..8d09493 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlab.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlab.java @@ -42,7 +42,7 @@ public class TestSlab { } @Test - public void testBasicFunctionality() { + public void testBasicFunctionality() throws InterruptedException { for (int i = 0; i < NUMBLOCKS; i++) { buffers[i] = testSlab.alloc(BLOCKSIZE); assertEquals(BLOCKSIZE, buffers[i].limit()); diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java index 26705d0..2c3541f 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java @@ -43,7 +43,7 @@ public class TestSlabCache { static final int CACHE_SIZE = 1000000; static final int NUM_BLOCKS = 101; static final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; - static final int NUM_THREADS = 1000; + static final int NUM_THREADS = 50; static final int NUM_QUERIES = 10000; SlabCache cache; @@ -83,6 +83,11 @@ public class TestSlabCache { } @Test + public void testCacheMultiThreadedEviction() throws Exception { + CacheTestUtils.hammerEviction(cache, BLOCK_SIZE, 10, NUM_QUERIES); + } + + @Test /*Just checks if ranges overlap*/ public void testStatsArithmetic(){ SlabStats test = cache.requestStats;