diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 13585b9..ea3c663 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.io.hfile; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,6 +30,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.DirectMemoryUtils; +import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.util.StringUtils; /** @@ -84,6 +87,16 @@ public class CacheConfig { private final BlockCache blockCache; /** + * IdLock is a "sparse lock" implementation allowing to lock on a particular block + * identified by offset. The purpose of this is to avoid two clients loading + * the same block, and have all but one client wait to get the block from the + * cache. We have one IdLock per open hfile so that all hfile readers + * share the same lock instance (half store file reader, etc) + */ + private Map blockIdLocks = new HashMap(); + private Map blockIdLocksRefCount = new HashMap(); + + /** * Whether blocks should be cached on read (default is on if there is a * cache but this can be turned off on a per-family or per-request basis) */ @@ -164,7 +177,7 @@ public class CacheConfig { * @param evictOnClose whether blocks should be evicted when HFile is closed * @param cacheCompressed whether to store blocks as compressed in the cache */ - CacheConfig(final BlockCache blockCache, + public CacheConfig(final BlockCache blockCache, final boolean cacheDataOnRead, final boolean inMemory, final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheBloomsOnWrite, final boolean evictOnClose, @@ -310,6 +323,7 @@ public class CacheConfig { * Static reference to the block cache, or null if no caching should be used * at all. */ + // TODO: Using static variables like this is WRONG. It makes testing harder. private static BlockCache globalBlockCache; /** Boolean whether we have disabled the block cache entirely. */ @@ -321,7 +335,7 @@ public class CacheConfig { * @param conf The current configuration. * @return The block cache or null. */ - private static synchronized BlockCache instantiateBlockCache( + public static synchronized BlockCache instantiateBlockCache( Configuration conf) { if (globalBlockCache != null) return globalBlockCache; if (blockCacheDisabled) return null; @@ -356,4 +370,46 @@ public class CacheConfig { } return globalBlockCache; } + + /** + * Creates or returns an associated IdLock for locking on the offsets of the + * hfile blocks. After the hfile is closed for reading, {@link #releaseBlockIdLock(String, IdLock)} + * SHOULD be called to release the lock holder. + * @param filename the full path for the hfile + * @return an IdLock. + */ + public IdLock getBlockIdLock(String filename) { + synchronized (blockIdLocks) { + IdLock lock = blockIdLocks.get(filename); + int refCount = 0; + if (lock == null) { + lock = new IdLock(); + blockIdLocks.put(filename, lock); + } else { + refCount = blockIdLocksRefCount.get(filename); + } + blockIdLocksRefCount.put(filename, refCount + 1); + return lock; + } + } + + /** + * Releases the IdLock if the reference count for the IdLock for this + * file has reached to zero. + * @param filename the full path for the hfile + * @param idLock the lock obtained from {@link #getBlockIdLock(String)} + */ + public void releaseBlockIdLock(String filename, IdLock idLock) { + synchronized (blockIdLocks) { + int refCount = blockIdLocksRefCount.get(filename); + refCount--; + if (refCount == 0) { + blockIdLocks.remove(filename); + blockIdLocksRefCount.remove(filename); + } else { + blockIdLocksRefCount.put(filename, refCount); + } + } + } + } diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java index 2f40db9..925cbe7 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.io.hfile.HFile.Writer; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; @@ -51,6 +52,8 @@ public class HFileReaderV1 extends AbstractHFileReader { private volatile boolean fileInfoLoaded = false; + private IdLock offsetLock; + /** * Opens a HFile. You must load the index before you can * use it by calling {@link #loadFileInfo()}. @@ -68,6 +71,7 @@ public class HFileReaderV1 extends AbstractHFileReader { trailer.expectMajorVersion(1); fsBlockReader = new HFileBlock.FSReaderV1(fsdis, compressAlgo, fileSize); + offsetLock = cacheConf.getBlockIdLock(path.toString()); } private byte[] readAllIndex(final FSDataInputStream in, @@ -223,8 +227,11 @@ public class HFileReaderV1 extends AbstractHFileReader { effectiveCategory = BlockCategory.BLOOM; } + IdLock.Entry lockEntry = null; + // Per meta key from any given file, synchronize reads for said block - synchronized (metaBlockIndexReader.getRootBlockKey(block)) { + try { + lockEntry = offsetLock.getLockEntry(offset); // Check cache for block. If found return. if (cacheConf.isBlockCacheEnabled()) { HFileBlock cachedBlock = @@ -256,6 +263,10 @@ public class HFileReaderV1 extends AbstractHFileReader { } return hfileBlock.getBufferWithoutHeader(); + } finally { + if (lockEntry != null) { + offsetLock.releaseLockEntry(lockEntry); + } } } @@ -281,12 +292,14 @@ public class HFileReaderV1 extends AbstractHFileReader { long offset = dataBlockIndexReader.getRootBlockOffset(block); BlockCacheKey cacheKey = new BlockCacheKey(name, offset); + IdLock.Entry lockEntry = null; // For any given block from any given file, synchronize reads for said // block. // Without a cache, this synchronizing is needless overhead, but really // the other choice is to duplicate work (which the cache would prevent you // from doing). - synchronized (dataBlockIndexReader.getRootBlockKey(block)) { + try { + lockEntry = offsetLock.getLockEntry(offset); // Check cache for block. If found return. if (cacheConf.isBlockCacheEnabled()) { HFileBlock cachedBlock = @@ -331,6 +344,10 @@ public class HFileReaderV1 extends AbstractHFileReader { cacheConf.isInMemory()); } return hfileBlock.getBufferWithoutHeader(); + } finally { + if (lockEntry != null) { + offsetLock.releaseLockEntry(lockEntry); + } } } @@ -386,6 +403,8 @@ public class HFileReaderV1 extends AbstractHFileReader { this.istream = null; } + cacheConf.releaseBlockIdLock(this.path.toString(), offsetLock); + getSchemaMetrics().flushMetrics(); } diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index f5f1f9b..2640b57 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -60,13 +60,7 @@ public class HFileReaderV2 extends AbstractHFileReader { return includesMemstoreTS; } - /** - * A "sparse lock" implementation allowing to lock on a particular block - * identified by offset. The purpose of this is to avoid two clients loading - * the same block, and have all but one client wait to get the block from the - * cache. - */ - private IdLock offsetLock = new IdLock(); + private IdLock offsetLock; /** * Blocks read from the load-on-open section, excluding data root index, meta @@ -101,7 +95,7 @@ public class HFileReaderV2 extends AbstractHFileReader { final boolean closeIStream, final CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache, final HFileSystem hfs) throws IOException { - super(path, trailer, fsdis, fsdisNoFsChecksum, size, + super(path, trailer, fsdis, fsdisNoFsChecksum, size, closeIStream, cacheConf, hfs); trailer.expectMajorVersion(2); validateMinorVersion(path, trailer.getMinorVersion()); @@ -110,6 +104,8 @@ public class HFileReaderV2 extends AbstractHFileReader { compressAlgo, fileSize, trailer.getMinorVersion(), hfs, path); this.fsBlockReader = fsBlockReaderV2; // upcast + offsetLock = cacheConf.getBlockIdLock(path.toString()); + // Comparator class name is stored in the trailer in version 2. comparator = trailer.createComparator(); dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator, @@ -213,16 +209,20 @@ public class HFileReaderV2 extends AbstractHFileReader { // Per meta key from any given file, synchronize reads for said block. This // is OK to do for meta blocks because the meta block index is always // single-level. - synchronized (metaBlockIndexReader.getRootBlockKey(block)) { + IdLock.Entry lockEntry = null; + long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block); + + try { + lockEntry = offsetLock.getLockEntry(metaBlockOffset); // Check cache for block. If found return. - long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block); + BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset, - DataBlockEncoding.NONE, BlockType.META); + DataBlockEncoding.NONE, BlockType.META); cacheBlock &= cacheConf.shouldCacheDataOnRead(); if (cacheConf.isBlockCacheEnabled()) { HFileBlock cachedBlock = - (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, false); + (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, false); if (cachedBlock != null) { // Return a distinct 'shallow copy' of the block, // so pos does not get messed by the scanner @@ -233,7 +233,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, - blockSize, -1, true); + blockSize, -1, true); passSchemaMetricsTo(metaBlock); final long delta = System.nanoTime() - startTimeNs; @@ -243,10 +243,14 @@ public class HFileReaderV2 extends AbstractHFileReader { // Cache the block if (cacheBlock) { cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock, - cacheConf.isInMemory()); + cacheConf.isInMemory()); } - return metaBlock.getBufferWithoutHeader(); + + } finally { + if (lockEntry != null) { + offsetLock.releaseLockEntry(lockEntry); + } } } @@ -442,6 +446,8 @@ public class HFileReaderV2 extends AbstractHFileReader { } } + cacheConf.releaseBlockIdLock(this.path.toString(), offsetLock); + getSchemaMetrics().flushMetrics(); } @@ -1129,7 +1135,7 @@ public class HFileReaderV2 extends AbstractHFileReader { private void validateMinorVersion(Path path, int minorVersion) { if (minorVersion < MIN_MINOR_VERSION || minorVersion > MAX_MINOR_VERSION) { - String msg = "Minor version for path " + path + + String msg = "Minor version for path " + path + " is expected to be between " + MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION + " but is found to be " + minorVersion; diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 3812e9c..eaa5ca4 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -193,11 +193,11 @@ public class LruBlockCache implements BlockCache, HeapSize { public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) { this(maxSize, blockSize, evictionThread, (int)Math.ceil(1.2*maxSize/blockSize), - DEFAULT_LOAD_FACTOR, + DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, - conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR), - conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR), - DEFAULT_SINGLE_FACTOR, + conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR), + conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR), + DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR); } @@ -276,7 +276,8 @@ public class LruBlockCache implements BlockCache, HeapSize { public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { CachedBlock cb = map.get(cacheKey); if(cb != null) { - throw new RuntimeException("Cached an already cached block"); + LOG.warn("Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey()); + assert cb != null; } cb = new CachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); long newSize = updateSizeMetrics(cb, false); diff --git src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java index 7d8eaa7..fc86c57 100644 --- src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java +++ src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java @@ -27,15 +27,25 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.io.hfile.LruBlockCache; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -218,6 +228,89 @@ public class TestHalfStoreFileReader { curr.getBuffer(), curr.getQualifierOffset(), curr.getQualifierLength()); } + @Test + public void testConcurrentBlockCachingInHalfStoreFileReaders() throws Exception { + HBaseTestingUtility test_util = new HBaseTestingUtility(); + String root_dir = test_util.getDataTestDir("testConcurrentBlockCachingInHalfStoreFileReaders").toString(); + final Path p = new Path(root_dir, "test"); + final Configuration conf = test_util.getConfiguration(); + final FileSystem fs = FileSystem.get(conf); + CacheConfig cacheConf = new CacheConfig(CacheConfig.instantiateBlockCache(conf), + true, true, false, false, false, false, false); + + HFile.Writer w = HFile.getWriterFactory(conf, cacheConf) + .withPath(fs, p) + .withBlockSize(1024) + .withComparator(KeyValue.KEY_COMPARATOR) + .create(); + + // write some things. + List items = genSomeKeys(); + for (KeyValue kv : items) { + w.append(kv); + } + w.close(); + + + HFile.Reader r = HFile.createReader(fs, p, cacheConf); + r.loadFileInfo(); + + //print debug info about the file + HFilePrettyPrinter printer = new HFilePrettyPrinter(); + printer.run(new String[] {"-m", "-b", "-f", p.toString()}); + + ExecutorService exec = Executors.newFixedThreadPool(2); + + for (int i = 1 ; i < SIZE - 1 ;i++) { + // instantiate empty block cache + LruBlockCache blockCache = new LruBlockCache(1024 * 1024 , StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf); + final CacheConfig cache = new CacheConfig(blockCache, + true, true, false, false, false, false, false); + assertEquals(0, cache.getBlockCache().getBlockCount()); + + byte[] midrow = getKV(i).getRow(); + Reference bottom = new Reference(midrow, Reference.Range.bottom); + Reference top = new Reference(midrow, Reference.Range.top); + + Callable callable1 = getSeekAndVerifyTask(p, fs, bottom, + getKV(i-1).getKey(), cache, i-1); + Callable callable2 = getSeekAndVerifyTask(p, fs, top, + getKV(i+1).getKey(), cache, i+1); + + Future future1 = exec.submit(callable1); + Future future2 = exec.submit(callable2); + + assertTrue(future1.get()); + assertTrue(future2.get()); + blockCache.shutdown(); + } + + exec.shutdown(); + exec.awaitTermination(5, TimeUnit.SECONDS); + } + + private static Callable getSeekAndVerifyTask(final Path p, final FileSystem fs, + final Reference r, final byte[] seekPoint, final CacheConfig cache, final int expected) { + return new Callable() { + @Override + public Boolean call() throws Exception { + KeyValue kv = doSeek(p, fs, r, seekPoint, cache); + assertEquals(String.format("row_%04d", expected), Bytes.toString(kv.getRow())); + return true; + } + }; + } + + private static KeyValue doSeek(Path p, FileSystem fs, Reference r, byte[] seekPoint, + CacheConfig cacheConfig) throws IOException { + final HalfStoreFileReader halfreader = new HalfStoreFileReader(fs, p, + cacheConfig, r, DataBlockEncoding.NONE); + halfreader.loadFileInfo(); + final HFileScanner scanner = halfreader.getScanner(true, false, false); + scanner.seekTo(seekPoint); + return scanner.getKeyValue(); + } + static final int SIZE = 1000; static byte[] _b(String s) { @@ -227,19 +320,20 @@ public class TestHalfStoreFileReader { List genSomeKeys() { List ret = new ArrayList(SIZE); for (int i = 0; i < SIZE; i++) { - KeyValue kv = - new KeyValue( - _b(String.format("row_%04d", i)), - _b("family"), - _b("qualifier"), - 1000, // timestamp - _b("value")); + KeyValue kv = getKV(i); ret.add(kv); } return ret; } - + KeyValue getKV(int i) { + return new KeyValue( + _b(String.format("row_%04d", i)), + _b("family"), + _b("qualifier"), + 1000, // timestamp + _b("value")); + } @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = diff --git src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java index 478bfbd..4038974 100644 --- src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java +++ src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.util; +import static org.junit.Assert.assertTrue; + import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; @@ -28,12 +30,10 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - -import static org.junit.Assert.*; - import org.apache.hadoop.hbase.MediumTests; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -106,6 +106,7 @@ public class TestIdLock { idLock.assertMapEmpty(); } finally { exec.shutdown(); + exec.awaitTermination(5000, TimeUnit.MILLISECONDS); } }