diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index fb6b1a6..1f99e4e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.DirectMemoryUtils; +import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.util.StringUtils; /** @@ -110,6 +113,16 @@ public class CacheConfig { private final BlockCache blockCache; /** + * IdLock is a "sparse lock" implementation allowing to lock on a particular block + * identified by offset. The purpose of this is to avoid two clients loading + * the same block, and have all but one client wait to get the block from the + * cache. We have one IdLock per open hfile so that all hfile readers + * share the same lock instance (half store file reader, etc) + */ + private Map blockIdLocks = new HashMap(); + private Map blockIdLocksRefCount = new HashMap(); + + /** * Whether blocks should be cached on read (default is on if there is a * cache but this can be turned off on a per-family or per-request basis) */ @@ -190,7 +203,7 @@ public class CacheConfig { * @param evictOnClose whether blocks should be evicted when HFile is closed * @param cacheCompressed whether to store blocks as compressed in the cache */ - CacheConfig(final BlockCache blockCache, + public CacheConfig(final BlockCache blockCache, final boolean cacheDataOnRead, final boolean inMemory, final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheBloomsOnWrite, final boolean evictOnClose, @@ -336,6 +349,7 @@ public class CacheConfig { * Static reference to the block cache, or null if no caching should be used * at all. */ + // TODO: Using static variables like this is WRONG. It makes testing harder. private static BlockCache globalBlockCache; /** Boolean whether we have disabled the block cache entirely. */ @@ -419,4 +433,46 @@ public class CacheConfig { } return globalBlockCache; } + + /** + * Creates or returns an associated IdLock for locking on the offsets of the + * hfile blocks. After the hfile is closed for reading, {@link #releaseBlockIdLock(String, IdLock)} + * SHOULD be called to release the lock holder. + * @param filename the full path for the hfile + * @return an IdLock. + */ + public IdLock getBlockIdLock(String filename) { + synchronized (blockIdLocks) { + IdLock lock = blockIdLocks.get(filename); + int refCount = 0; + if (lock == null) { + lock = new IdLock(); + blockIdLocks.put(filename, lock); + } else { + refCount = blockIdLocksRefCount.get(filename); + } + blockIdLocksRefCount.put(filename, refCount + 1); + return lock; + } + } + + /** + * Releases the IdLock if the reference count for the IdLock for this + * file has reached to zero. + * @param filename the full path for the hfile + * @param idLock the lock obtained from {@link #getBlockIdLock(String)} + */ + public void releaseBlockIdLock(String filename, IdLock idLock) { + synchronized (blockIdLocks) { + int refCount = blockIdLocksRefCount.get(filename); + refCount--; + if (refCount == 0) { + blockIdLocks.remove(filename); + blockIdLocksRefCount.remove(filename); + } else { + blockIdLocksRefCount.put(filename, refCount); + } + } + } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 618c024..3f2d86e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -27,15 +27,14 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; -import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.io.WritableUtils; @@ -64,13 +63,7 @@ public class HFileReaderV2 extends AbstractHFileReader { /** Filesystem-level block reader. */ private HFileBlock.FSReader fsBlockReader; - /** - * A "sparse lock" implementation allowing to lock on a particular block - * identified by offset. The purpose of this is to avoid two clients loading - * the same block, and have all but one client wait to get the block from the - * cache. - */ - private IdLock offsetLock = new IdLock(); + private IdLock offsetLock; /** * Blocks read from the load-on-open section, excluding data root index, meta @@ -113,6 +106,8 @@ public class HFileReaderV2 extends AbstractHFileReader { compressAlgo, fileSize, trailer.getMinorVersion(), hfs, path); this.fsBlockReader = fsBlockReaderV2; // upcast + offsetLock = cacheConf.getBlockIdLock(path.toString()); + // Comparator class name is stored in the trailer in version 2. comparator = trailer.createComparator(); dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator, @@ -216,16 +211,20 @@ public class HFileReaderV2 extends AbstractHFileReader { // Per meta key from any given file, synchronize reads for said block. This // is OK to do for meta blocks because the meta block index is always // single-level. - synchronized (metaBlockIndexReader.getRootBlockKey(block)) { + IdLock.Entry lockEntry = null; + long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block); + + try { + lockEntry = offsetLock.getLockEntry(metaBlockOffset); // Check cache for block. If found return. - long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block); + BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset, - DataBlockEncoding.NONE, BlockType.META); + DataBlockEncoding.NONE, BlockType.META); cacheBlock &= cacheConf.shouldCacheDataOnRead(); if (cacheConf.isBlockCacheEnabled()) { HFileBlock cachedBlock = - (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, false); + (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, false); if (cachedBlock != null) { // Return a distinct 'shallow copy' of the block, // so pos does not get messed by the scanner @@ -243,10 +242,14 @@ public class HFileReaderV2 extends AbstractHFileReader { // Cache the block if (cacheBlock) { cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock, - cacheConf.isInMemory()); + cacheConf.isInMemory()); } - return metaBlock.getBufferWithoutHeader(); + + } finally { + if (lockEntry != null) { + offsetLock.releaseLockEntry(lockEntry); + } } } @@ -410,6 +413,7 @@ public class HFileReaderV2 extends AbstractHFileReader { close(cacheConf.shouldEvictOnClose()); } + @Override public void close(boolean evictOnClose) throws IOException { if (evictOnClose && cacheConf.isBlockCacheEnabled()) { int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name); @@ -419,6 +423,8 @@ public class HFileReaderV2 extends AbstractHFileReader { } } fsBlockReader.closeStreams(); + + cacheConf.releaseBlockIdLock(this.path.toString(), offsetLock); } /** For testing */ @@ -1118,7 +1124,7 @@ public class HFileReaderV2 extends AbstractHFileReader { private void validateMinorVersion(Path path, int minorVersion) { if (minorVersion < MIN_MINOR_VERSION || minorVersion > MAX_MINOR_VERSION) { - String msg = "Minor version for path " + path + + String msg = "Minor version for path " + path + " is expected to be between " + MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION + " but is found to be " + minorVersion; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index bd14524..3aeb847 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -289,10 +289,12 @@ public class LruBlockCache implements BlockCache, HeapSize { * @param buf block buffer * @param inMemory if block is in-memory */ + @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { CachedBlock cb = map.get(cacheKey); if(cb != null) { - throw new RuntimeException("Cached an already cached block"); + LOG.warn("Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey()); + assert cb != null; } cb = new CachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); long newSize = updateSizeMetrics(cb, false); @@ -313,6 +315,7 @@ public class LruBlockCache implements BlockCache, HeapSize { * @param cacheKey block's cache key * @param buf block buffer */ + @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { cacheBlock(cacheKey, buf, false); } @@ -560,6 +563,7 @@ public class LruBlockCache implements BlockCache, HeapSize { return totalSize; } + @Override public int compareTo(BlockBucket that) { if(this.overflow() == that.overflow()) return 0; return this.overflow() > that.overflow() ? 1 : -1; @@ -588,6 +592,7 @@ public class LruBlockCache implements BlockCache, HeapSize { * Get the current size of this cache. * @return current size in bytes */ + @Override public long getCurrentSize() { return this.size.get(); } @@ -596,6 +601,7 @@ public class LruBlockCache implements BlockCache, HeapSize { * Get the current size of this cache. * @return current size in bytes */ + @Override public long getFreeSize() { return getMaxSize() - getCurrentSize(); } @@ -604,6 +610,7 @@ public class LruBlockCache implements BlockCache, HeapSize { * Get the size of this cache (number of cached blocks) * @return number of cached blocks */ + @Override public long size() { return this.elements.get(); } @@ -624,6 +631,7 @@ public class LruBlockCache implements BlockCache, HeapSize { * Get the number of blocks that have been evicted during the lifetime * of this cache. */ + @Override public long getEvictedCount() { return this.stats.getEvictedCount(); } @@ -730,6 +738,7 @@ public class LruBlockCache implements BlockCache, HeapSize { *

Includes: total accesses, hits, misses, evicted blocks, and runs * of the eviction processes. */ + @Override public CacheStats getStats() { return this.stats; } @@ -740,6 +749,7 @@ public class LruBlockCache implements BlockCache, HeapSize { + ClassSize.OBJECT); // HeapSize implementation + @Override public long heapSize() { return getCurrentSize(); } @@ -803,6 +813,7 @@ public class LruBlockCache implements BlockCache, HeapSize { return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor); } + @Override public void shutdown() { if (victimHandler != null) victimHandler.shutdown(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java index d5b985e..d116239 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java @@ -26,15 +26,25 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.io.hfile.LruBlockCache; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -228,6 +238,89 @@ public class TestHalfStoreFileReader { curr.getBuffer(), curr.getQualifierOffset(), curr.getQualifierLength()); } + @Test + public void testConcurrentBlockCachingInHalfStoreFileReaders() throws Exception { + HBaseTestingUtility test_util = new HBaseTestingUtility(); + String root_dir = test_util.getDataTestDir("testConcurrentBlockCachingInHalfStoreFileReaders").toString(); + final Path p = new Path(root_dir, "test"); + final Configuration conf = test_util.getConfiguration(); + final FileSystem fs = FileSystem.get(conf); + CacheConfig cacheConf = new CacheConfig(CacheConfig.instantiateBlockCache(conf), + true, true, false, false, false, false, false); + + HFile.Writer w = HFile.getWriterFactory(conf, cacheConf) + .withPath(fs, p) + .withBlockSize(1024) + .withComparator(KeyValue.KEY_COMPARATOR) + .create(); + + // write some things. + List items = genSomeKeys(); + for (KeyValue kv : items) { + w.append(kv); + } + w.close(); + + + HFile.Reader r = HFile.createReader(fs, p, cacheConf); + r.loadFileInfo(); + + //print debug info about the file + HFilePrettyPrinter printer = new HFilePrettyPrinter(); + printer.run(new String[] {"-m", "-b", "-f", p.toString()}); + + ExecutorService exec = Executors.newFixedThreadPool(2); + + for (int i = 1 ; i < SIZE - 1 ;i++) { + // instantiate empty block cache + LruBlockCache blockCache = new LruBlockCache(1024 * 1024 , StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf); + final CacheConfig cache = new CacheConfig(blockCache, + true, true, false, false, false, false, false); + assertEquals(0, cache.getBlockCache().getBlockCount()); + + byte[] midrow = getKV(i).getRow(); + Reference bottom = new Reference(midrow, Reference.Range.bottom); + Reference top = new Reference(midrow, Reference.Range.top); + + Callable callable1 = getSeekAndVerifyTask(p, fs, bottom, + getKV(i-1).getKey(), cache, i-1); + Callable callable2 = getSeekAndVerifyTask(p, fs, top, + getKV(i+1).getKey(), cache, i+1); + + Future future1 = exec.submit(callable1); + Future future2 = exec.submit(callable2); + + assertTrue(future1.get()); + assertTrue(future2.get()); + blockCache.shutdown(); + } + + exec.shutdown(); + exec.awaitTermination(5, TimeUnit.SECONDS); + } + + private static Callable getSeekAndVerifyTask(final Path p, final FileSystem fs, + final Reference r, final byte[] seekPoint, final CacheConfig cache, final int expected) { + return new Callable() { + @Override + public Boolean call() throws Exception { + KeyValue kv = doSeek(p, fs, r, seekPoint, cache); + assertEquals(String.format("row_%04d", expected), Bytes.toString(kv.getRow())); + return true; + } + }; + } + + private static KeyValue doSeek(Path p, FileSystem fs, Reference r, byte[] seekPoint, + CacheConfig cacheConfig) throws IOException { + final HalfStoreFileReader halfreader = new HalfStoreFileReader(fs, p, + cacheConfig, r, DataBlockEncoding.NONE); + halfreader.loadFileInfo(); + final HFileScanner scanner = halfreader.getScanner(true, false, false); + scanner.seekTo(seekPoint); + return scanner.getKeyValue(); + } + static final int SIZE = 1000; static byte[] _b(String s) { @@ -237,19 +330,20 @@ public class TestHalfStoreFileReader { List genSomeKeys() { List ret = new ArrayList(SIZE); for (int i = 0; i < SIZE; i++) { - KeyValue kv = - new KeyValue( - _b(String.format("row_%04d", i)), - _b("family"), - _b("qualifier"), - 1000, // timestamp - _b("value")); + KeyValue kv = getKV(i); ret.add(kv); } return ret; } - + KeyValue getKV(int i) { + return new KeyValue( + _b(String.format("row_%04d", i)), + _b("family"), + _b("qualifier"), + 1000, // timestamp + _b("value")); + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java index bbf4bba..9951cd0 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.util; +import static org.junit.Assert.assertTrue; + import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; @@ -27,12 +29,10 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - -import static org.junit.Assert.*; - import org.apache.hadoop.hbase.MediumTests; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -105,6 +105,7 @@ public class TestIdLock { idLock.assertMapEmpty(); } finally { exec.shutdown(); + exec.awaitTermination(5000, TimeUnit.MILLISECONDS); } }