commit 99a529821fb42b4cedcd7245b3f97a24984a8ce9 Author: Todd Lipcon Date: Fri Sep 2 17:23:33 2011 -0700 some cache hacking diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java index 18a3332..8f8f0cd 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java @@ -19,12 +19,10 @@ */ package org.apache.hadoop.hbase.io.hfile.slab; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -94,24 +92,9 @@ public class SingleSizeCache implements BlockCache { MapEvictionListener listener = new MapEvictionListener() { @Override public void onEviction(String key, CacheablePair value) { - try { - value.evictionLock.writeLock().lock(); - timeSinceLastAccess.set(System.nanoTime() - - value.recentlyAccessed.get()); - backingStore.free(value.serializedData); - stats.evict(); - /** - * We may choose to run this cache alone, without the SlabCache on - * top, no evictionWatcher in that case - */ - if (evictionWatcher != null) { - evictionWatcher.onEviction(key, false); - } - size.addAndGet(-1 * value.heapSize()); - stats.evicted(); - } finally { - value.evictionLock.writeLock().unlock(); - } + timeSinceLastAccess.set(System.nanoTime() + - value.recentlyAccessed.get()); + doEviction(key, value); } }; @@ -121,7 +104,7 @@ public class SingleSizeCache implements BlockCache { } @Override - public synchronized void cacheBlock(String blockName, Cacheable toBeCached) { + public void cacheBlock(String blockName, Cacheable toBeCached) { ByteBuffer storedBlock; /* @@ -135,6 +118,7 @@ public class SingleSizeCache implements BlockCache { CacheablePair newEntry = new CacheablePair(toBeCached.getDeserializer(), storedBlock); + toBeCached.serialize(storedBlock); CacheablePair alreadyCached = backingMap.putIfAbsent(blockName, newEntry); @@ -142,7 +126,6 @@ public class SingleSizeCache implements BlockCache { backingStore.free(storedBlock); throw new RuntimeException("already cached " + blockName); } - toBeCached.serialize(storedBlock); newEntry.recentlyAccessed.set(System.nanoTime()); this.size.addAndGet(newEntry.heapSize()); } @@ -157,20 +140,22 @@ public class SingleSizeCache implements BlockCache { stats.hit(caching); // If lock cannot be obtained, that means we're undergoing eviction. - if (contentBlock.evictionLock.readLock().tryLock()) { - try { - contentBlock.recentlyAccessed.set(System.nanoTime()); + try { + contentBlock.recentlyAccessed.set(System.nanoTime()); + synchronized (contentBlock) { + if (contentBlock.serializedData == null) { + // concurrently evicted + LOG.warn("Concurrent eviction of " + key); + return null; + } + contentBlock.serializedData.rewind(); return contentBlock.deserializer - .deserialize(contentBlock.serializedData); - } catch (IOException e) { - e.printStackTrace(); - LOG.warn("Deserializer throwing ioexception, possibly deserializing wrong object buffer"); - return null; - } finally { - contentBlock.evictionLock.readLock().unlock(); + .deserialize(contentBlock.serializedData); } + } catch (Throwable t) { + LOG.error("Deserializer threw an exception. This may indicate a bug.", t); + return null; } - return null; } /** @@ -183,20 +168,42 @@ public class SingleSizeCache implements BlockCache { stats.evict(); CacheablePair evictedBlock = backingMap.remove(key); if (evictedBlock != null) { - try { - evictedBlock.evictionLock.writeLock().lock(); - backingStore.free(evictedBlock.serializedData); - evictionWatcher.onEviction(key, false); - stats.evicted(); - size.addAndGet(-1 * evictedBlock.heapSize()); - } finally { - evictedBlock.evictionLock.writeLock().unlock(); - } + doEviction(key, evictedBlock); } return evictedBlock != null; } + private void doEviction(String key, CacheablePair evictedBlock) { + long evictedHeap = 0; + synchronized (evictedBlock) { + if (evictedBlock.serializedData == null) { + // someone else already freed + return; + } + evictedHeap = evictedBlock.heapSize(); + ByteBuffer bb = evictedBlock.serializedData; + evictedBlock.serializedData = null; + backingStore.free(bb); + + // We have to do this callback inside the synchronization here. + // Otherwise we can have the following interleaving: + // Thread A calls getBlock(): + // SlabCache directs call to this SingleSizeCache + // It gets the CacheablePair object + // Thread B runs eviction + // doEviction() is called and sets serializedData = null, here. + // Thread A sees the null serializedData, and returns null + // Thread A calls cacheBlock on the same block, and gets + // "already cached" since the block is still in backingStore + if (evictionWatcher != null) { + evictionWatcher.onEviction(key, false); + } + } + stats.evicted(); + size.addAndGet(-1 * evictedHeap); + } + public void logStats() { long milliseconds = (long)this.timeSinceLastAccess.get() / 1000000; @@ -299,8 +306,7 @@ public class SingleSizeCache implements BlockCache { /* Just a pair class, holds a reference to the parent cacheable */ private class CacheablePair implements HeapSize { final CacheableDeserializer deserializer; - final ByteBuffer serializedData; - final ReentrantReadWriteLock evictionLock; + ByteBuffer serializedData; AtomicLong recentlyAccessed; private CacheablePair(CacheableDeserializer deserializer, @@ -308,7 +314,6 @@ public class SingleSizeCache implements BlockCache { this.recentlyAccessed = new AtomicLong(); this.deserializer = deserializer; this.serializedData = serializedData; - evictionLock = new ReentrantReadWriteLock(); } /* diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java index 1611349..4e3d337 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java @@ -232,6 +232,7 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize public Cacheable getBlock(String key, boolean caching) { SingleSizeCache cachedBlock = backingStore.get(key); if (cachedBlock == null) { + // TODO: this is a miss, isn't it? return null; } diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index ed31503..e17f0f5 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.*; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashSet; @@ -130,19 +131,26 @@ public class CacheTestUtils { public static void hammerSingleKey(final BlockCache toBeTested, int BlockSize, int numThreads, int numQueries) throws Exception { - final HFileBlockPair kv = generateHFileBlocks(BlockSize, 1)[0]; + final String key = "key"; + final byte[] buf = new byte[5*1024]; + Arrays.fill(buf, (byte)5); + + final ByteArrayCacheable bac = new ByteArrayCacheable( + buf); Configuration conf = new Configuration(); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( conf); final AtomicInteger totalQueries = new AtomicInteger(); - toBeTested.cacheBlock(kv.blockName, kv.block); + toBeTested.cacheBlock(key, bac); for (int i = 0; i < numThreads; i++) { TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { @Override public void doAnAction() throws Exception { - assertEquals(kv.block, toBeTested.getBlock(kv.blockName, false)); + ByteArrayCacheable returned = + (ByteArrayCacheable) toBeTested.getBlock(key, false); + assertArrayEquals(buf, returned.buf); totalQueries.incrementAndGet(); } }; @@ -156,6 +164,94 @@ public class CacheTestUtils { } ctx.stop(); } + + public static void hammerEviction(final BlockCache toBeTested, + int BlockSize, int numThreads, int numQueries) throws Exception { + + Configuration conf = new Configuration(); + MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( + conf); + + final AtomicInteger totalQueries = new AtomicInteger(); + + for (int i = 0; i < numThreads; i++) { + final int finalI = i; + + final byte[] buf = new byte[5*1024]; + TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { + @Override + public void doAnAction() throws Exception { + for (int j = 0; j < 10; j++) { + String key = "key_" + finalI + "_" + j; + Arrays.fill(buf, (byte)(finalI * j)); + final ByteArrayCacheable bac = new ByteArrayCacheable(buf); + + ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested.getBlock(key, true); + if (gotBack != null) { + assertArrayEquals(gotBack.buf, bac.buf); + } else { + toBeTested.cacheBlock(key, bac); + } + } + totalQueries.incrementAndGet(); + } + }; + + ctx.addThread(t); + } + + ctx.startThreads(); + while (totalQueries.get() < numQueries && ctx.shouldRun()) { + Thread.sleep(10); + } + ctx.stop(); + + assertTrue(toBeTested.getStats().getEvictedCount() > 0); + } + + private static class ByteArrayCacheable implements Cacheable { + + final byte[] buf; + + public ByteArrayCacheable(byte[] buf) { + this.buf = buf; + } + + @Override + public long heapSize() { + return 4 + buf.length; + } + + @Override + public int getSerializedLength() { + return 4 + buf.length; + } + + + @Override + public void serialize(ByteBuffer destination) { + destination.putInt(buf.length); + Thread.yield(); + destination.put(buf); + } + + @Override + public CacheableDeserializer getDeserializer() { + return new CacheableDeserializer() { + + @Override + public Cacheable deserialize(ByteBuffer b) throws IOException { + int len = b.getInt(); + Thread.yield(); + byte buf[] = new byte[len]; + b.get(buf); + return new ByteArrayCacheable(buf); + } + }; + } + + } + private static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java index d38fe7a..6881188 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java @@ -63,5 +63,10 @@ public class TestSingleSizeCache { public void testCacheMultiThreadedSingleKey() throws Exception { CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); } + + @Test + public void testCacheMultiThreadedEviction() throws Exception { + CacheTestUtils.hammerEviction(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); + } } diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java index 26705d0..2c3541f 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java @@ -43,7 +43,7 @@ public class TestSlabCache { static final int CACHE_SIZE = 1000000; static final int NUM_BLOCKS = 101; static final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; - static final int NUM_THREADS = 1000; + static final int NUM_THREADS = 50; static final int NUM_QUERIES = 10000; SlabCache cache; @@ -83,6 +83,11 @@ public class TestSlabCache { } @Test + public void testCacheMultiThreadedEviction() throws Exception { + CacheTestUtils.hammerEviction(cache, BLOCK_SIZE, 10, NUM_QUERIES); + } + + @Test /*Just checks if ranges overlap*/ public void testStatsArithmetic(){ SlabStats test = cache.requestStats;