diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 0e7e4d3..ae0d42c 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -79,6 +79,7 @@ public class HColumnDescriptor implements WritableComparable public static final String CACHE_INDEX_ON_WRITE = "CACHE_INDEX_ON_WRITE"; public static final String CACHE_BLOOMS_ON_WRITE = "CACHE_BLOOMS_ON_WRITE"; public static final String EVICT_BLOCKS_ON_CLOSE = "EVICT_BLOCKS_ON_CLOSE"; + public static final String PREFETCH_BLOCKS_ON_OPEN = "PREFETCH_BLOCKS_ON_OPEN"; /** * Size of storefile/hfile 'blocks'. Default is {@link #DEFAULT_BLOCKSIZE}. @@ -187,6 +188,11 @@ public class HColumnDescriptor implements WritableComparable */ public static final boolean DEFAULT_EVICT_BLOCKS_ON_CLOSE = false; + /** + * Default setting for whether to prefetch blocks into the blockcache on open. + */ + public static final boolean DEFAULT_PREFETCH_BLOCKS_ON_OPEN = false; + private final static Map DEFAULT_VALUES = new HashMap(); private final static Set RESERVED_KEYWORDS @@ -208,6 +214,7 @@ public class HColumnDescriptor implements WritableComparable DEFAULT_VALUES.put(CACHE_INDEX_ON_WRITE, String.valueOf(DEFAULT_CACHE_INDEX_ON_WRITE)); DEFAULT_VALUES.put(CACHE_BLOOMS_ON_WRITE, String.valueOf(DEFAULT_CACHE_BLOOMS_ON_WRITE)); DEFAULT_VALUES.put(EVICT_BLOCKS_ON_CLOSE, String.valueOf(DEFAULT_EVICT_BLOCKS_ON_CLOSE)); + DEFAULT_VALUES.put(PREFETCH_BLOCKS_ON_OPEN, String.valueOf(DEFAULT_PREFETCH_BLOCKS_ON_OPEN)); for (String s : DEFAULT_VALUES.keySet()) { RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s))); } @@ -898,6 +905,25 @@ public class HColumnDescriptor implements WritableComparable } /** + * @return true if we should prefetch blocks into the blockcache on open + */ + public boolean shouldPrefetchBlocksOnOpen() { + String value = getValue(PREFETCH_BLOCKS_ON_OPEN); + if (value != null) { + return Boolean.valueOf(value).booleanValue(); + } + return DEFAULT_PREFETCH_BLOCKS_ON_OPEN; + } + + /** + * @param value true if we should prefetch blocks into the blockcache on open + * @return this (for chained invocation) + */ + public HColumnDescriptor setPrefetchBlocksOnOpen(boolean value) { + return setValue(PREFETCH_BLOCKS_ON_OPEN, Boolean.toString(value)); + } + + /** * @see java.lang.Object#toString() */ @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index df3b696..4d6ffb2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -51,10 +51,12 @@ public interface BlockCache { * @param caching Whether this request has caching enabled (used for stats) * @param repeat Whether this is a repeat lookup for the same block * (used to avoid double counting cache misses when doing double-check locking) + * @param updateCacheMetrics Whether to update cache metrics or not * @return Block or null if block is not in 2 cache. * @see HFileReaderV2#readBlock(long, long, boolean, boolean, boolean, BlockType) */ - Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat); + Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, + boolean updateCacheMetrics); /** * Evict block from cache. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index fb6b1a6..50f2a7e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -96,6 +96,13 @@ public class CacheConfig { public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64; public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 0.9f; + /** + * Configuration key to prefetch all blocks of a given file into the block cache + * when the file is opened. + */ + public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = + "hbase.rs.prefetchblocksonopen"; + // Defaults public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; @@ -105,6 +112,7 @@ public class CacheConfig { public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false; public static final boolean DEFAULT_EVICT_ON_CLOSE = false; public static final boolean DEFAULT_COMPRESSED_CACHE = false; + public static final boolean DEFAULT_PREFETCH_ON_OPEN = false; /** Local reference to the block cache, null if completely disabled */ private final BlockCache blockCache; @@ -133,6 +141,9 @@ public class CacheConfig { /** Whether data blocks should be stored in compressed form in the cache */ private final boolean cacheCompressed; + /** Whether data blocks should be prefetched into the cache */ + private final boolean prefetchOnOpen; + /** * Create a cache configuration using the specified configuration object and * family descriptor. @@ -153,7 +164,9 @@ public class CacheConfig { DEFAULT_CACHE_BLOOMS_ON_WRITE) || family.shouldCacheBloomsOnWrite(), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(), - conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE) + conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE), + conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, + DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen() ); } @@ -174,7 +187,8 @@ public class CacheConfig { DEFAULT_CACHE_BLOOMS_ON_WRITE), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, - DEFAULT_COMPRESSED_CACHE) + DEFAULT_COMPRESSED_CACHE), + conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) ); } @@ -189,12 +203,13 @@ public class CacheConfig { * @param cacheBloomsOnWrite whether blooms should be cached on write * @param evictOnClose whether blocks should be evicted when HFile is closed * @param cacheCompressed whether to store blocks as compressed in the cache + * @param prefetchOnOpen whether to prefetch blocks upon open */ CacheConfig(final BlockCache blockCache, final boolean cacheDataOnRead, final boolean inMemory, final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheBloomsOnWrite, final boolean evictOnClose, - final boolean cacheCompressed) { + final boolean cacheCompressed, final boolean prefetchOnOpen) { this.blockCache = blockCache; this.cacheDataOnRead = cacheDataOnRead; this.inMemory = inMemory; @@ -203,6 +218,7 @@ public class CacheConfig { this.cacheBloomsOnWrite = cacheBloomsOnWrite; this.evictOnClose = evictOnClose; this.cacheCompressed = cacheCompressed; + this.prefetchOnOpen = prefetchOnOpen; } /** @@ -213,7 +229,7 @@ public class CacheConfig { this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory, cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose, - cacheConf.cacheCompressed); + cacheConf.cacheCompressed, cacheConf.prefetchOnOpen); } /** @@ -248,7 +264,10 @@ public class CacheConfig { boolean shouldCache = isBlockCacheEnabled() && (cacheDataOnRead || category == BlockCategory.INDEX || - category == BlockCategory.BLOOM); + category == BlockCategory.BLOOM || + (prefetchOnOpen && + (category != BlockCategory.META && + category != BlockCategory.UNKNOWN))); return shouldCache; } @@ -316,6 +335,13 @@ public class CacheConfig { return isBlockCacheEnabled() && this.cacheCompressed; } + /** + * @return true if blocks should be prefetched into the cache on open, false if not + */ + public boolean shouldPrefetchOnOpen() { + return isBlockCacheEnabled() && this.prefetchOnOpen; + } + @Override public String toString() { if (!isBlockCacheEnabled()) { @@ -327,7 +353,8 @@ public class CacheConfig { "[cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() + "] " + "[cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() + "] " + "[cacheEvictOnClose=" + shouldEvictOnClose() + "] " + - "[cacheCompressed=" + shouldCacheCompressed() + "]"; + "[cacheCompressed=" + shouldCacheCompressed() + "]" + + "[prefetchOnOpen=" + shouldPrefetchOnOpen() + "]"; } // Static block cache reference and methods diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 063a03b..aa490d5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -74,11 +74,11 @@ public class CombinedBlockCache implements BlockCache, HeapSize { @Override public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, - boolean repeat) { + boolean repeat, boolean updateCacheMetrics) { if (lruCache.containsBlock(cacheKey)) { - return lruCache.getBlock(cacheKey, caching, repeat); + return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); } - return bucketCache.getBlock(cacheKey, caching, repeat); + return bucketCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java index f89c364..ef68dab 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java @@ -91,22 +91,25 @@ public class DoubleBlockCache implements BlockCache, HeapSize { } @Override - public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) { + public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, + boolean updateCacheMetrics) { Cacheable cachedBlock; - if ((cachedBlock = onHeapCache.getBlock(cacheKey, caching, repeat)) != null) { - stats.hit(caching); + if ((cachedBlock = onHeapCache.getBlock(cacheKey, caching, repeat, + updateCacheMetrics)) != null) { + if (updateCacheMetrics) stats.hit(caching); return cachedBlock; - } else if ((cachedBlock = offHeapCache.getBlock(cacheKey, caching, repeat)) != null) { + } else if ((cachedBlock = offHeapCache.getBlock(cacheKey, caching, repeat, + updateCacheMetrics)) != null) { if (caching) { onHeapCache.cacheBlock(cacheKey, cachedBlock); } - stats.hit(caching); + if (updateCacheMetrics) stats.hit(caching); return cachedBlock; } - if (!repeat) stats.miss(caching); + if (!repeat && updateCacheMetrics) stats.miss(caching); return null; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index ddaffcc..9940fcc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -430,7 +430,7 @@ public class HFile { public interface CachingBlockReader { HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock, final boolean pread, final boolean isCompaction, - BlockType expectedBlockType) + final boolean updateCacheMetrics, BlockType expectedBlockType) throws IOException; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 74bbb2e..ca345b7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -252,7 +252,7 @@ public class HFileBlockIndex { expectedBlockType = BlockType.DATA; } block = cachingBlockReader.readBlock(currentOffset, - currentOnDiskSize, shouldCache, pread, isCompaction, + currentOnDiskSize, shouldCache, pread, isCompaction, true, expectedBlockType); } @@ -329,7 +329,7 @@ public class HFileBlockIndex { // Caching, using pread, assuming this is not a compaction. HFileBlock midLeafBlock = cachingBlockReader.readBlock( - midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, + midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true, BlockType.LEAF_INDEX); ByteBuffer b = midLeafBlock.getBufferWithoutHeader(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 30d2bb0..676b353 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -248,7 +248,7 @@ public class HFileReaderV2 extends AbstractHFileReader { cacheBlock &= cacheConf.shouldCacheDataOnRead(); if (cacheConf.isBlockCacheEnabled()) { HFileBlock cachedBlock = - (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, false); + (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, false, true); if (cachedBlock != null) { // Return a distinct 'shallow copy' of the block, // so pos does not get messed by the scanner @@ -289,10 +289,9 @@ public class HFileReaderV2 extends AbstractHFileReader { * @throws IOException */ @Override - public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, - final boolean cacheBlock, boolean pread, final boolean isCompaction, - BlockType expectedBlockType) - throws IOException { + public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, boolean cacheBlock, + boolean pread, boolean isCompaction, boolean updateCacheMetrics, + BlockType expectedBlockType) throws IOException { if (dataBlockIndexReader == null) { throw new IOException("Block index not loaded"); } @@ -327,9 +326,9 @@ public class HFileReaderV2 extends AbstractHFileReader { // Try and get the block from the block cache. If the useLock variable is true then this // is the second time through the loop and it should not be counted as a block cache miss. HFileBlock cachedBlock = (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, - cacheBlock, useLock); + cacheBlock, useLock, updateCacheMetrics); if (cachedBlock != null) { - if (cachedBlock.getBlockType() == BlockType.DATA) { + if (updateCacheMetrics && cachedBlock.getBlockType() == BlockType.DATA) { HFile.dataBlockReadCnt.incrementAndGet(); } @@ -370,7 +369,7 @@ public class HFileReaderV2 extends AbstractHFileReader { cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory()); } - if (hfileBlock.getBlockType() == BlockType.DATA) { + if (updateCacheMetrics && hfileBlock.getBlockType() == BlockType.DATA) { HFile.dataBlockReadCnt.incrementAndGet(); } @@ -578,7 +577,7 @@ public class HFileReaderV2 extends AbstractHFileReader { // figure out the size. seekToBlock = reader.readBlock(previousBlockOffset, seekToBlock.getOffset() - previousBlockOffset, cacheBlocks, - pread, isCompaction, BlockType.DATA); + pread, isCompaction, true, BlockType.DATA); // TODO shortcut: seek forward in this block to the last key of the // block. } @@ -615,7 +614,7 @@ public class HFileReaderV2 extends AbstractHFileReader { curBlock = reader.readBlock(curBlock.getOffset() + curBlock.getOnDiskSizeWithHeader(), curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread, - isCompaction, null); + isCompaction, true, null); } while (!(curBlock.getBlockType().equals(BlockType.DATA) || curBlock.getBlockType().equals(BlockType.ENCODED_DATA))); @@ -774,7 +773,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, - isCompaction, BlockType.DATA); + isCompaction, true, BlockType.DATA); if (block.getOffset() < 0) { throw new IOException("Invalid block offset: " + block.getOffset()); } @@ -1055,7 +1054,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, - isCompaction, BlockType.DATA); + isCompaction, true, BlockType.DATA); if (block.getOffset() < 0) { throw new IOException("Invalid block offset: " + block.getOffset()); } @@ -1194,4 +1193,5 @@ public class HFileReaderV2 extends AbstractHFileReader { public int getMajorVersion() { return 2; } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java index ac9612c..a2d8d58 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; @@ -30,6 +32,8 @@ import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; +import com.google.common.annotations.VisibleForTesting; + /** * {@link HFile} reader for version 3. * This Reader is aware of Tags. @@ -37,7 +41,10 @@ import org.apache.hadoop.io.WritableUtils; @InterfaceAudience.Private public class HFileReaderV3 extends HFileReaderV2 { + private static final Log LOG = LogFactory.getLog(HFileReaderV3.class); + public static final int MAX_MINOR_VERSION = 0; + /** * Opens a HFile. You must load the index before you can use it by calling * {@link #loadFileInfo()}. @@ -56,15 +63,50 @@ public class HFileReaderV3 extends HFileReaderV2 { * is already encoded on disk, we will still use its on-disk encoding * in cache. */ - public HFileReaderV3(Path path, FixedFileTrailer trailer, final FSDataInputStreamWrapper fsdis, - final long size, final CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache, - final HFileSystem hfs) throws IOException { + public HFileReaderV3(final Path path, FixedFileTrailer trailer, + final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf, + final DataBlockEncoding preferredEncodingInCache, final HFileSystem hfs) throws IOException { super(path, trailer, fsdis, size, cacheConf, preferredEncodingInCache, hfs); byte[] tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN); // max tag length is not present in the HFile means tags were not at all written to file. if (tmp != null) { hfileContext.setIncludesTags(true); } + + // Prefetch file blocks upon open if requested + if (cacheConf.shouldPrefetchOnOpen()) { + PrefetchExecutor.request(path, new Runnable() { + public void run() { + try { + long offset = 0; + long end = fileSize - getTrailer().getTrailerSize(); + HFileBlock prevBlock = null; + while (offset < end) { + if (Thread.interrupted()) { + break; + } + long onDiskSize = -1; + if (prevBlock != null) { + onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); + } + HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false, null); + prevBlock = block; + offset += block.getOnDiskSizeWithHeader(); + } + } catch (IOException e) { + // IOExceptions are probably due to region closes (relocation, etc.) + if (LOG.isTraceEnabled()) { + LOG.trace("Exception encountered while prefetching " + path + ":", e); + } + } catch (Exception e) { + // Other exceptions are interesting + LOG.warn("Exception encountered while prefetching " + path + ":", e); + } finally { + PrefetchExecutor.complete(path); + } + } + }); + } } @Override @@ -101,6 +143,13 @@ public class HFileReaderV3 extends HFileReaderV2 { return new ScannerV3(this, cacheBlocks, pread, isCompaction); } + + @Override + public void close(boolean evictOnClose) throws IOException { + PrefetchExecutor.cancel(path); + super.close(evictOnClose); + } + /** * Implementation of {@link HFileScanner} interface. */ @@ -278,4 +327,14 @@ public class HFileReaderV3 extends HFileReaderV2 { protected HFileBlock diskToCacheFormat(HFileBlock hfileBlock, final boolean isCompaction) { return dataBlockEncoder.diskToCacheFormat(hfileBlock, isCompaction); } + + /** + * Returns false if block prefetching was requested for this file and has + * not completed, true otherwise + */ + @VisibleForTesting + boolean prefetchComplete() { + return PrefetchExecutor.isCompleted(path); + } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 51ef024..753f369 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -358,19 +358,22 @@ public class LruBlockCache implements BlockCache, HeapSize { * @param caching true if the caller caches blocks on cache misses * @param repeat Whether this is a repeat lookup for the same block * (used to avoid double counting cache misses when doing double-check locking) + * @param updateCacheMetrics Whether to update cache metrics or not * @return buffer of specified cache key, or null if not in cache * @see HFileReaderV2#readBlock(long, long, boolean, boolean, boolean, BlockType) */ @Override - public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) { + public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, + boolean updateCacheMetrics) { CachedBlock cb = map.get(cacheKey); if(cb == null) { - if (!repeat) stats.miss(caching); - if (victimHandler != null) - return victimHandler.getBlock(cacheKey, caching, repeat); + if (!repeat && updateCacheMetrics) stats.miss(caching); + if (victimHandler != null) { + return victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + } return null; } - stats.hit(caching); + if (updateCacheMetrics) stats.hit(caching); cb.access(count.incrementAndGet()); return cb.getBuffer(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java new file mode 100644 index 0000000..f033443 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -0,0 +1,122 @@ +package org.apache.hadoop.hbase.io.hfile; + +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; + +public class PrefetchExecutor { + + private static final Log LOG = LogFactory.getLog(PrefetchExecutor.class); + + /** Futures for tracking block prefetch activity */ + private static final Map> prefetchFutures = + new ConcurrentSkipListMap>(); + /** Executor pool shared among all HFiles for block prefetch */ + private static final ScheduledExecutorService prefetchExecutorPool; + /** Delay before beginning prefetch */ + private static final int prefetchDelayMillis; + /** Variation in prefetch delay times, to mitigate stampedes */ + private static final float prefetchDelayVariation; + static { + // Consider doing this on demand with a configuration passed in rather + // than in a static initializer. + Configuration conf = HBaseConfiguration.create(); + // 1s here for tests, consider 30s in hbase-default.xml + // Set to 0 for no delay + prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000); + prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f); + int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4); + prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("hfile-prefetch-" + System.currentTimeMillis()); + t.setDaemon(true); + return t; + } + }); + } + + private static final Random RNG = new Random(); + + // TODO: We want HFile, which is where the blockcache lives, to handle + // prefetching of file blocks but the Store level is where path convention + // knowledge should be contained + private static final Pattern prefetchPathExclude = + Pattern.compile( + "(" + + Path.SEPARATOR_CHAR + + HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.") + + Path.SEPARATOR_CHAR + + ")|(" + + Path.SEPARATOR_CHAR + + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + + Path.SEPARATOR_CHAR + + ")"); + + public static void request(Path path, Runnable runnable) { + if (!prefetchPathExclude.matcher(path.toString()).find()) { + long delay; + if (prefetchDelayMillis > 0) { + delay = (long)((prefetchDelayMillis * (1.0f - (prefetchDelayVariation/2))) + + (prefetchDelayMillis * (prefetchDelayVariation/2) * RNG.nextFloat())); + } else { + delay = 0; + } + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Prefetch requested for " + path + ", delay=" + delay + " ms"); + } + prefetchFutures.put(path, prefetchExecutorPool.schedule(runnable, delay, + TimeUnit.MILLISECONDS)); + } catch (RejectedExecutionException e) { + prefetchFutures.remove(path); + LOG.warn("Prefetch request rejected for " + path); + } + } + } + + public static void complete(Path path) { + prefetchFutures.remove(path); + if (LOG.isDebugEnabled()) { + LOG.debug("Prefetch completed for " + path); + } + } + + public static void cancel(Path path) { + Future future = prefetchFutures.get(path); + if (future != null) { + // ok to race with other cancellation attempts + future.cancel(true); + prefetchFutures.remove(path); + if (LOG.isDebugEnabled()) { + LOG.debug("Prefetch cancelled for " + path); + } + } + } + + public static boolean isCompleted(Path path) { + Future future = prefetchFutures.get(path); + if (future != null) { + return future.isDone(); + } + return true; + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java index 6253312..57ffeb8 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java @@ -69,7 +69,8 @@ public class SimpleBlockCache implements BlockCache { return cache.size(); } - public synchronized Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) { + public synchronized Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, + boolean updateCacheMetrics) { processQueue(); // clear out some crap. Ref ref = cache.get(cacheKey); if (ref == null) diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index e29c94b..b7b785a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -330,15 +330,17 @@ public class BucketCache implements BlockCache, HeapSize { * @param key block's cache key * @param caching true if the caller caches blocks on cache misses * @param repeat Whether this is a repeat lookup for the same block + * @param updateCacheMetrics Whether we should update cache metrics or not * @return buffer of specified cache key, or null if not in cache */ @Override - public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) { + public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, + boolean updateCacheMetrics) { if (!cacheEnabled) return null; RAMQueueEntry re = ramCache.get(key); if (re != null) { - cacheStats.hit(caching); + if (updateCacheMetrics) cacheStats.hit(caching); re.access(accessCount.incrementAndGet()); return re.getData(); } @@ -355,8 +357,10 @@ public class BucketCache implements BlockCache, HeapSize { Cacheable cachedBlock = bucketEntry.deserializerReference( deserialiserMap).deserialize(bb, true); long timeTaken = System.nanoTime() - start; - cacheStats.hit(caching); - cacheStats.ioHit(timeTaken); + if (updateCacheMetrics) { + cacheStats.hit(caching); + cacheStats.ioHit(timeTaken); + } bucketEntry.access(accessCount.incrementAndGet()); if (this.ioErrorStartTime > 0) { ioErrorStartTime = -1; @@ -372,7 +376,7 @@ public class BucketCache implements BlockCache, HeapSize { } } } - if(!repeat)cacheStats.miss(caching); + if (!repeat && updateCacheMetrics) cacheStats.miss(caching); return null; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java index 3b2d85e..b8dfebf 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java @@ -153,14 +153,15 @@ public class SingleSizeCache implements BlockCache, HeapSize { } @Override - public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) { + public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, + boolean updateCacheMetrics) { CacheablePair contentBlock = backingMap.get(key); if (contentBlock == null) { - if (!repeat) stats.miss(caching); + if (!repeat && updateCacheMetrics) stats.miss(caching); return null; } - stats.hit(caching); + if (updateCacheMetrics) stats.hit(caching); // If lock cannot be obtained, that means we're undergoing eviction. try { contentBlock.recentlyAccessed.set(System.nanoTime()); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java index 0711fa6..f1ea4d9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java @@ -231,22 +231,25 @@ public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize { * Get the buffer of the block with the specified name. * @param caching * @param key + * @param repeat + * @param updateCacheMetrics * * @return buffer of specified block name, or null if not in cache */ - public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) { + public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, + boolean updateCacheMetrics) { SingleSizeCache cachedBlock = backingStore.get(key); if (cachedBlock == null) { if (!repeat) stats.miss(caching); return null; } - Cacheable contentBlock = cachedBlock.getBlock(key, caching, false); + Cacheable contentBlock = cachedBlock.getBlock(key, caching, false, updateCacheMetrics); if (contentBlock != null) { - stats.hit(caching); + if (updateCacheMetrics) stats.hit(caching); } else if (!repeat) { - stats.miss(caching); + if (updateCacheMetrics) stats.miss(caching); } return contentBlock; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java index 911738a..780c57e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java @@ -97,7 +97,7 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase try { // We cache the block and use a positional read. bloomBlock = reader.readBlock(index.getRootBlockOffset(block), - index.getRootBlockDataSize(block), true, true, false, + index.getRootBlockDataSize(block), true, true, false, true, BlockType.BLOOM_CHUNK); } catch (IOException ex) { // The Bloom filter is broken, turn it off. diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index dfe19f7..bc62d97 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -93,12 +93,12 @@ public class CacheTestUtils { } toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block); Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, - false, false); + false, false, true); if (retrievedBlock != null) { assertEquals(ourBlock.block, retrievedBlock); toBeTested.evictBlock(ourBlock.blockName); hits.incrementAndGet(); - assertNull(toBeTested.getBlock(ourBlock.blockName, false, false)); + assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true)); } else { miss.incrementAndGet(); } @@ -126,7 +126,7 @@ public class CacheTestUtils { HFileBlockPair[] blocks = generateHFileBlocks(numBlocks, blockSize); // Confirm empty for (HFileBlockPair block : blocks) { - assertNull(toBeTested.getBlock(block.blockName, true, false)); + assertNull(toBeTested.getBlock(block.blockName, true, false, true)); } // Add blocks @@ -139,7 +139,7 @@ public class CacheTestUtils { // MapMaker makes no guarantees when it will evict, so neither can we. for (HFileBlockPair block : blocks) { - HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false); + HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true); if (buf != null) { assertEquals(block.block, buf); } @@ -150,7 +150,7 @@ public class CacheTestUtils { for (HFileBlockPair block : blocks) { try { - if (toBeTested.getBlock(block.blockName, true, false) != null) { + if (toBeTested.getBlock(block.blockName, true, false, true) != null) { toBeTested.cacheBlock(block.blockName, block.block); if (!(toBeTested instanceof BucketCache)) { // BucketCache won't throw exception when caching already cached @@ -184,7 +184,7 @@ public class CacheTestUtils { @Override public void doAnAction() throws Exception { ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested - .getBlock(key, false, false); + .getBlock(key, false, false, true); assertArrayEquals(buf, returned.buf); totalQueries.incrementAndGet(); } @@ -223,7 +223,7 @@ public class CacheTestUtils { final ByteArrayCacheable bac = new ByteArrayCacheable(buf); ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested - .getBlock(key, true, false); + .getBlock(key, true, false, true); if (gotBack != null) { assertArrayEquals(gotBack.buf, bac.buf); } else { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java index 5bb53a6..61c6b84 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java @@ -67,7 +67,7 @@ public class RandomSeek { long start = System.currentTimeMillis(); SimpleBlockCache cache = new SimpleBlockCache(); CacheConfig cacheConf = new CacheConfig(cache, true, false, false, false, - false, false, false); + false, false, false, false); Reader reader = HFile.createReader(lfs, path, cacheConf); reader.loadFileInfo(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index a08c5de..6424dba 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -248,10 +248,10 @@ public class TestCacheOnWrite { // Flags: don't cache the block, use pread, this is not a compaction. // Also, pass null for expected block type to avoid checking it. HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, - false, null); + false, true, null); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset, encodingInCache, block.getBlockType()); - boolean isCached = blockCache.getBlock(blockCacheKey, true, false) != null; + boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null; boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); if (shouldBeCached != isCached) { throw new AssertionError( diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index a7187f6..d21fbed 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -164,7 +164,7 @@ public class TestHFileBlockIndex { @Override public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread, boolean isCompaction, - BlockType expectedBlockType) + boolean updateCacheMetrics, BlockType expectedBlockType) throws IOException { if (offset == prevOffset && onDiskSize == prevOnDiskSize && pread == prevPread) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java index e706f2c..3716216 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java @@ -92,7 +92,7 @@ public class TestHFileDataBlockEncoder { BlockCacheKey cacheKey = new BlockCacheKey("test", 0); blockCache.cacheBlock(cacheKey, cacheBlock); - HeapSize heapSize = blockCache.getBlock(cacheKey, false, false); + HeapSize heapSize = blockCache.getBlock(cacheKey, false, false, true); assertTrue(heapSize instanceof HFileBlock); HFileBlock returnedBlock = (HFileBlock) heapSize;; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java index 7e2dce8..07d05b3 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java @@ -106,7 +106,7 @@ public class TestLruBlockCache { // Confirm empty for (CachedItem block : blocks) { - assertTrue(cache.getBlock(block.cacheKey, true, false) == null); + assertTrue(cache.getBlock(block.cacheKey, true, false, true) == null); } // Add blocks @@ -120,7 +120,7 @@ public class TestLruBlockCache { // Check if all blocks are properly cached and retrieved for (CachedItem block : blocks) { - HeapSize buf = cache.getBlock(block.cacheKey, true, false); + HeapSize buf = cache.getBlock(block.cacheKey, true, false, true); assertTrue(buf != null); assertEquals(buf.heapSize(), block.heapSize()); } @@ -130,7 +130,7 @@ public class TestLruBlockCache { // Check if all blocks are properly cached and retrieved for (CachedItem block : blocks) { - HeapSize buf = cache.getBlock(block.cacheKey, true, false); + HeapSize buf = cache.getBlock(block.cacheKey, true, false, true); assertTrue(buf != null); assertEquals(buf.heapSize(), block.heapSize()); } @@ -175,9 +175,9 @@ public class TestLruBlockCache { (maxSize * LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR)); // All blocks except block 0 should be in the cache - assertTrue(cache.getBlock(blocks[0].cacheKey, true, false) == null); + assertTrue(cache.getBlock(blocks[0].cacheKey, true, false, true) == null); for(int i=1;i