diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index ce747b2..600a0bb 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -526,13 +526,17 @@ public class CacheConfig implements ConfigurationObserver { } public boolean isL2CacheEnabled() { - return l2Cache != null && !l2Cache.isShutdown(); + return l2Cache != null && l2Cache.isEnabled(); } public L2Cache getL2Cache() { return l2Cache; } + public L2CacheAgent getL2CacheAgent() { + return new L2CacheAgent(this, l2Cache); + } + @Override public void notifyOnChange(Configuration conf) { new CacheConfigBuilder(conf).update(this); diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 63cf541..e9f02e4 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1413,7 +1413,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { public static class FSReaderV2 extends AbstractFSReader { /** L2 cache instance or null if l2 cache is disabled */ - private final L2Cache l2Cache; + private final L2CacheAgent cacheAgent; /** * Name of the current hfile. Used to compose the key in for the @@ -1437,9 +1437,9 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { }; public FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo, - long fileSize, L2Cache l2Cache, String hfileNameForL2Cache) { + long fileSize, L2CacheAgent cacheAgent, String hfileNameForL2Cache) { super(istream, compressAlgo, fileSize); - this.l2Cache = l2Cache; + this.cacheAgent = cacheAgent; this.hfileNameForL2Cache = hfileNameForL2Cache; } @@ -1532,9 +1532,6 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { headerAndData.array(), headerAndData.arrayOffset() + preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, options); - if (addToL2Cache) { - cacheBlockInL2Cache(offset, headerAndData.array()); - } b = new HFileBlock(headerAndData); b.assumeUncompressed(); b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader); @@ -1542,6 +1539,10 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { if (b.nextBlockOnDiskSizeWithHeader > 0) setNextBlockHeader(offset, b); + + if (addToL2Cache) { + cacheRawBlockBytes(b.getBlockType(), offset, headerAndData.array()); + } } else { // Allocate enough space to fit the next block's header too. byte[] onDiskBlock = new byte[onDiskSizeWithHeader + HEADER_SIZE]; @@ -1565,14 +1566,14 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { } b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader); b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize; - if (l2Cache != null && addToL2Cache) { + if (addToL2Cache && cacheAgent.isL2CacheEnabled()) { if (preReadHeaderSize > 0) { // If we plan to add block to L2 cache, we need to copy the // header information into the byte array so that it can be // cached in the L2 cache. System.arraycopy(header, 0, onDiskBlock, 0, preReadHeaderSize); } - cacheBlockInL2Cache(offset, onDiskBlock); + cacheRawBlockBytes(b.getBlockType(), offset, onDiskBlock); } DataInputStream dis = new DataInputStream(new ByteArrayInputStream( onDiskBlock, HEADER_SIZE, onDiskSizeWithoutHeader)); @@ -1629,7 +1630,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { b.uncompressedSizeWithoutHeader, true, offset + HEADER_SIZE, options); if (addToL2Cache) { - cacheBlockInL2Cache(offset, b.buf.array()); + cacheRawBlockBytes(b.getBlockType(), offset, b.buf.array()); } if (b.nextBlockOnDiskSizeWithHeader > 0) { setNextBlockHeader(offset, b); @@ -1641,13 +1642,13 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { b.nextBlockOnDiskSizeWithHeader = readAtOffset(compressedBytes, HEADER_SIZE, b.onDiskSizeWithoutHeader, true, offset + HEADER_SIZE, options); - if (l2Cache != null && addToL2Cache) { + if (addToL2Cache && cacheAgent.isL2CacheEnabled()) { // If l2 cache is enabled, we need to copy the header bytes to // the compressed bytes array, so that they can be cached in the // L2 cache. System.arraycopy(headerBuf.array(), 0, compressedBytes, 0, HEADER_SIZE); - cacheBlockInL2Cache(offset, compressedBytes); + cacheRawBlockBytes(b.getBlockType(), offset, compressedBytes); } DataInputStream dis = new DataInputStream(new ByteArrayInputStream( compressedBytes, HEADER_SIZE, b.onDiskSizeWithoutHeader)); @@ -1700,9 +1701,12 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { * @param blockBytes The block's bytes as they appear on disk (i.e., * correctly encoded and compressed) */ - void cacheBlockInL2Cache(long offset, byte[] blockBytes) { - if (l2Cache != null) { - l2Cache.cacheRawBlock(hfileNameForL2Cache, offset, blockBytes); + private void cacheRawBlockBytes(BlockType type, long offset, + byte[] blockBytes) { + if (cacheAgent != null) { + BlockCacheKey cacheKey = new BlockCacheKey(hfileNameForL2Cache, offset); + RawHFileBlock rawBlock = new RawHFileBlock(type, blockBytes); + cacheAgent.cacheRawBlock(cacheKey, rawBlock); } } } diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 85f6da8..7da09d4 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -772,7 +772,7 @@ public class HFileBlockIndex { private BlockCache blockCache; /** L2Cache, or null if cache-on-write is disabled */ - private L2Cache l2Cache; + private L2CacheAgent cacheAgent; /** Name to use for computing cache keys */ private String nameForCaching; @@ -789,14 +789,15 @@ public class HFileBlockIndex { * @param blockWriter the block writer to use to write index blocks */ public BlockIndexWriter(HFileBlock.Writer blockWriter, - BlockCache blockCache, L2Cache l2Cache, String nameForCaching) { - if (nameForCaching == null && (blockCache != null || l2Cache != null)) { + BlockCache blockCache, L2CacheAgent cacheAgent, String nameForCaching) { + if ((blockCache != null || cacheAgent != null) && + nameForCaching == null) { throw new IllegalArgumentException("If BlockCache OR L2Cache are " + " not null, then nameForCaching must NOT be null"); } this.blockWriter = blockWriter; this.blockCache = blockCache; - this.l2Cache = l2Cache; + this.cacheAgent = cacheAgent; this.nameForCaching = nameForCaching; this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE; } @@ -945,16 +946,18 @@ public class HFileBlockIndex { byte[] curFirstKey = curChunk.getBlockKey(0); blockWriter.writeHeaderAndData(out); + HFileBlock blockForCaching = blockWriter.getBlockForCaching(); + // The block type and SchemaConfigured data matter here, as this will be + // used by the cache to update block type specific metrics. + BlockCacheKey cacheKey = new BlockCacheKey(nameForCaching, beginOffset); if (blockCache != null) { - HFileBlock blockForCaching = blockWriter.getBlockForCaching(); passSchemaMetricsTo(blockForCaching); - blockCache.cacheBlock(new BlockCacheKey(nameForCaching, beginOffset), - blockForCaching); + blockCache.cacheBlock(cacheKey, blockForCaching); } - - if (l2Cache != null) { - l2Cache.cacheRawBlock(nameForCaching, beginOffset, - blockWriter.getHeaderAndData()); + if (cacheAgent != null) { + RawHFileBlock rawBlock = new RawHFileBlock( + blockForCaching.getBlockType(), blockWriter.getHeaderAndData()); + cacheAgent.cacheRawBlock(cacheKey, rawBlock); } // Add intermediate index block size diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java index e8c17a1..5a0e020 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java @@ -24,6 +24,7 @@ import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -234,8 +235,10 @@ public class HFileReaderV1 extends AbstractHFileReader { (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheConf.shouldCacheBlockOnRead(effectiveCategory)); if (cachedBlock != null) { - getSchemaMetrics().updateOnCacheHit(effectiveCategory, - SchemaMetrics.NO_COMPACTION); + getSchemaMetrics().updateOnBlockRead(effectiveCategory, + SchemaMetrics.NO_COMPACTION, + TimeUnit.NANOSECONDS.toMillis( + System.nanoTime() - startTimeNs), true, false, false); return cachedBlock.getBufferWithoutHeader(); } // Cache Miss, please load. @@ -247,11 +250,12 @@ public class HFileReaderV1 extends AbstractHFileReader { passSchemaMetricsTo(hfileBlock); hfileBlock.expectType(BlockType.META); - long delta = System.nanoTime() - startTimeNs; - HFile.preadTimeNano.addAndGet(delta); + long deltaNs = System.nanoTime() - startTimeNs; + HFile.preadTimeNano.addAndGet(deltaNs); HFile.preadOps.incrementAndGet(); - HFile.preadHistogram.addValue(delta); - getSchemaMetrics().updateOnCacheMiss(BlockCategory.META, false, delta); + HFile.preadHistogram.addValue(deltaNs); + getSchemaMetrics().updateOnBlockRead(effectiveCategory, false, + TimeUnit.NANOSECONDS.toMillis(deltaNs), false, false, false); // Cache the block if (cacheBlock && cacheConf.shouldCacheBlockOnRead(effectiveCategory)) { @@ -266,8 +270,7 @@ public class HFileReaderV1 extends AbstractHFileReader { /** * Read in a file block. * @param block Index of block to read. - * @param pread Use positional read instead of seek+read (positional is - * better doing random reads whereas seek+read is better scanning). + * @param cacheBlock cache block if read from disk * @param isCompaction is this block being read as part of a compaction * @return Block wrapped in a ByteBuffer. * @throws IOException @@ -283,6 +286,8 @@ public class HFileReaderV1 extends AbstractHFileReader { ", max: " + dataBlockIndexReader.getRootBlockCount()); } + long startTimeNs = System.nanoTime(); + long offset = dataBlockIndexReader.getRootBlockOffset(block); BlockCacheKey cacheKey = new BlockCacheKey(name, offset); @@ -302,15 +307,16 @@ public class HFileReaderV1 extends AbstractHFileReader { if (kvContext != null) { kvContext.setObtainedFromCache(true); } - getSchemaMetrics().updateOnCacheHit( - cachedBlock.getBlockType().getCategory(), isCompaction); + getSchemaMetrics().updateOnBlockRead( + cachedBlock.getBlockType().getCategory(), isCompaction, + TimeUnit.NANOSECONDS.toMillis( + System.nanoTime() - startTimeNs), true, false, false); return cachedBlock.getBufferWithoutHeader(); } // Carry on, please load. } // Load block from filesystem. - long startTimeNs = System.nanoTime(); long nextOffset; if (block == dataBlockIndexReader.getRootBlockCount() - 1) { @@ -329,18 +335,19 @@ public class HFileReaderV1 extends AbstractHFileReader { passSchemaMetricsTo(hfileBlock); hfileBlock.expectType(BlockType.DATA); - long delta = System.nanoTime() - startTimeNs; + long deltaNs = System.nanoTime() - startTimeNs; if (isCompaction) { - HFile.preadCompactionTimeNano.addAndGet(delta); - HFile.preadCompactionHistogram.addValue(delta); + HFile.preadCompactionTimeNano.addAndGet(deltaNs); + HFile.preadCompactionHistogram.addValue(deltaNs); HFile.preadCompactionOps.incrementAndGet(); } else { - HFile.preadTimeNano.addAndGet(delta); - HFile.preadHistogram.addValue(delta); + HFile.preadTimeNano.addAndGet(deltaNs); + HFile.preadHistogram.addValue(deltaNs); HFile.preadOps.incrementAndGet(); } - getSchemaMetrics().updateOnCacheMiss(BlockCategory.DATA, isCompaction, - delta); + getSchemaMetrics().updateOnBlockRead( + hfileBlock.getBlockType().getCategory(), isCompaction, + TimeUnit.NANOSECONDS.toMillis(deltaNs), false, false, false); if (kvContext != null) { kvContext.setObtainedFromCache(false); } @@ -390,9 +397,9 @@ public class HFileReaderV1 extends AbstractHFileReader { if (evictOnClose && cacheConf.isBlockCacheEnabled()) { int numEvicted = 0; for (int i = 0; i < dataBlockIndexReader.getRootBlockCount(); i++) { - if (cacheConf.getBlockCache().evictBlock( - new BlockCacheKey(name, - dataBlockIndexReader.getRootBlockOffset(i)))) { + BlockCacheKey key = new BlockCacheKey(name, + dataBlockIndexReader.getRootBlockOffset(i)); + if (cacheConf.getBlockCache().evictBlock(key)) { numEvicted++; } } diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 0361864..45f6b17 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -88,6 +88,8 @@ public class HFileReaderV2 extends AbstractHFileReader { */ private List loadOnOpenBlocks = new ArrayList(); + private L2CacheAgent l2Cache; + /** * Opens a HFile. You must load the index before you can use it by calling * {@link #loadFileInfo()}. @@ -110,9 +112,13 @@ public class HFileReaderV2 extends AbstractHFileReader { throws IOException { super(path, trailer, fsdis, size, closeIStream, cacheConf, conf); trailer.expectVersion(2); + // Get a cache agent and set schema context. + l2Cache = cacheConf.getL2CacheAgent(); + passSchemaMetricsTo(l2Cache); + HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis, compressAlgo, fileSize, - cacheConf.isL2CacheEnabled() ? cacheConf.getL2Cache() : null, + cacheConf.isL2CacheEnabled() ? cacheConf.getL2CacheAgent() : null, cacheConf.isL2CacheEnabled() ? name : null); this.fsBlockReader = fsBlockReaderV2; // upcast @@ -230,20 +236,22 @@ public class HFileReaderV2 extends AbstractHFileReader { if (cachedBlock != null) { // Return a distinct 'shallow copy' of the block, // so pos does not get messed by the scanner - getSchemaMetrics().updateOnCacheHit(BlockCategory.META, false); + getSchemaMetrics().updateOnBlockRead(BlockCategory.META, false, + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs), + true, false, false); return cachedBlock.getBufferWithoutHeader(); } // Cache Miss, please load. HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, - blockSize, -1, cacheInL2); + blockSize, -1, cacheInL2); passSchemaMetricsTo(metaBlock); long deltaNs = System.nanoTime() - startTimeNs; HFile.preadTimeNano.addAndGet(deltaNs); HFile.preadHistogram.addValue(deltaNs); HFile.preadOps.incrementAndGet(); - getSchemaMetrics().updateOnCacheMiss(BlockCategory.META, false, - TimeUnit.NANOSECONDS.toMillis(deltaNs)); + getSchemaMetrics().updateOnBlockRead(BlockCategory.META, false, + TimeUnit.NANOSECONDS.toMillis(deltaNs), false, false, false); // Cache the block if (cacheBlock) { @@ -322,7 +330,7 @@ public class HFileReaderV2 extends AbstractHFileReader { // update schema metrics getSchemaMetrics().updateOnBlockRead( cachedBlock.getBlockType().getCategory(), isCompaction, - System.currentTimeMillis() - startTime, cacheOnPreload, true); + System.currentTimeMillis() - startTime, true, false, cacheOnPreload); // update profiling data Call call = HRegionServer.callContext.get(); ProfilingData pData = call == null ? null : call.getProfilingData(); @@ -344,14 +352,14 @@ public class HFileReaderV2 extends AbstractHFileReader { } getSchemaMetrics().updateOnBlockRead( cachedBlock.getBlockType().getCategory(), isCompaction, - System.currentTimeMillis() - startTime, cacheOnPreload, true); + System.currentTimeMillis() - startTime, true, false, cacheOnPreload); return cachedBlock; } // First, check if the block exists in L2 cache cachedBlock = null; try { - cachedBlock = getBlockFromL2Cache(name, dataBlockOffset, - expectedBlockType, isCompaction); + cachedBlock = getBlockFromL2Cache(cacheKey, expectedBlockType, + isCompaction); } catch (Throwable t) { // If exception is encountered when attempting to read from the L2 // cache, we should go on to try to read from disk and log the @@ -376,7 +384,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } getSchemaMetrics().updateOnBlockRead( cachedBlock.getBlockType().getCategory(), isCompaction, - System.currentTimeMillis() - startTime, cacheOnPreload, true); + System.currentTimeMillis() - startTime, false, true, cacheOnPreload); // Return early if a block exists in the L2 cache return cachedBlock; } @@ -384,7 +392,7 @@ public class HFileReaderV2 extends AbstractHFileReader { long startTimeNs = System.nanoTime(); HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, cacheBlock && !isCompaction, - getReadOptions(isCompaction)); + getReadOptions(isCompaction)); hfileBlock = dataBlockEncoder.diskToCacheFormat(hfileBlock, isCompaction); validateBlockType(hfileBlock, expectedBlockType); @@ -407,7 +415,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } getSchemaMetrics().updateOnBlockRead( blockCategory, isCompaction, - System.currentTimeMillis() - startTime, cacheOnPreload, false); + System.currentTimeMillis() - startTime, false, false, cacheOnPreload); // Cache the block if necessary if (cacheOnPreload @@ -503,10 +511,7 @@ public class HFileReaderV2 extends AbstractHFileReader { * block (i.e., compressed and encoded byte array) from the L2 cache, * de-compress, decode, and then construct an in-memory representation of the * block. - * @param hfileName Name of the HFile that contains the block (used as part - * of the cache key) - * @param offset Offset in the HFile containing the block (used as another - * part of the cache key) + * @param cacheKey the key of the block to be fetched from cache * @param expectedBlockType Expected type of the block * @param isCompaction Indicates if this is a compaction related read. This * value is passed along to @@ -518,20 +523,19 @@ public class HFileReaderV2 extends AbstractHFileReader { * offset in the L2 cache. * @throws IOException If we are unable to decompress and decode the block. */ - public HFileBlock getBlockFromL2Cache(String hfileName, long offset, + public HFileBlock getBlockFromL2Cache(BlockCacheKey cacheKey, BlockType expectedBlockType, boolean isCompaction) throws IOException { - if (cacheConf.isL2CacheEnabled()) { - byte[] bytes = cacheConf.getL2Cache().getRawBlock(hfileName, offset); - if (bytes != null) { - HFileBlock hfileBlock = HFileBlock.fromBytes(bytes, compressAlgo, - includesMemstoreTS, offset); - hfileBlock = dataBlockEncoder.diskToCacheFormat(hfileBlock, isCompaction); - validateBlockType(hfileBlock, expectedBlockType); - passSchemaMetricsTo(hfileBlock); - return hfileBlock; - } + HFileBlock cachedBlock = null; + byte[] bytes = l2Cache.getRawBlockBytes(cacheKey); + if (bytes != null) { + cachedBlock = HFileBlock.fromBytes(bytes, compressAlgo, + includesMemstoreTS, cacheKey.getOffset()); + cachedBlock = dataBlockEncoder.diskToCacheFormat(cachedBlock, + isCompaction); + validateBlockType(cachedBlock, expectedBlockType); + passSchemaMetricsTo(cachedBlock); } - return null; + return cachedBlock; } /** @@ -601,13 +605,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } } - if (cacheConf.isL2CacheEnabled() && evictL2OnClose) { - int numEvicted = cacheConf.getL2Cache().evictBlocksByHfileName(name); - if (LOG.isTraceEnabled()) { - LOG.trace("On close, file=" + name + " evicted=" + numEvicted - + " block(s) from L2 cache"); - } - } + l2Cache.evictBlocksByHfileName(name, true); if (closeIStream && istream != null) { istream.close(); istream = null; diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index 4989b82..841a053 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -79,6 +79,8 @@ public class HFileWriterV2 extends AbstractHFileWriter { private final boolean includeMemstoreTS = true; private long maxMemstoreTS = 0; + private L2CacheAgent l2Cache; + static class WriterFactoryV2 extends HFile.WriterFactory { WriterFactoryV2(Configuration conf, CacheConfig cacheConf) { super(conf, cacheConf); @@ -103,6 +105,10 @@ public class HFileWriterV2 extends AbstractHFileWriter { if (fsBlockWriter != null) throw new IllegalStateException("finishInit called twice"); + // Get a schema aware L2 cache agent. + l2Cache = cacheConf.getL2CacheAgent(); + passSchemaMetricsTo(l2Cache); + // HFile filesystem-level (non-caching) block writer fsBlockWriter = new HFileBlock.Writer(compressAlgo, blockEncoder, includeMemstoreTS); @@ -111,11 +117,11 @@ public class HFileWriterV2 extends AbstractHFileWriter { boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); boolean cacheL2IndexesOnWrite = cacheConf.shouldL2CacheDataOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter, - cacheIndexesOnWrite ? cacheConf.getBlockCache(): null, - cacheL2IndexesOnWrite ? cacheConf.getL2Cache() : null, + cacheIndexesOnWrite ? cacheConf.getBlockCache(): null, l2Cache, (cacheIndexesOnWrite || cacheL2IndexesOnWrite) ? name : null); dataBlockIndexWriter.setMaxChunkSize( HFileBlockIndex.getMaxChunkSize(conf)); + passSchemaMetricsTo(dataBlockIndexWriter); inlineBlockWriters.add(dataBlockIndexWriter); // Meta data block index writer @@ -163,13 +169,13 @@ public class HFileWriterV2 extends AbstractHFileWriter { HFile.writeTimeNano.addAndGet(System.nanoTime() - startTimeNs); HFile.writeOps.incrementAndGet(); - // If a write is succesfull, cached the written block in the L1 and L2 + // If a write is successful, cached the written block in the L1 and L2 // caches boolean cacheOnCompaction = cacheCurrentBlockForCompaction(); if (cacheConf.shouldCacheDataOnFlush() || cacheOnCompaction) { doCacheOnWrite(lastDataBlockOffset); } - if (cacheConf.isL2CacheEnabled() && cacheConf.shouldL2CacheDataOnWrite()) { + if (cacheConf.shouldL2CacheDataOnWrite()) { doCacheInL2Cache(lastDataBlockOffset); } } @@ -213,8 +219,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { if (cacheThisBlock) { doCacheOnWrite(offset); } - if (cacheConf.isL2CacheEnabled() && - cacheConf.shouldL2CacheDataOnWrite()) { + if (cacheConf.shouldL2CacheDataOnWrite()) { doCacheInL2Cache(offset); } } @@ -230,6 +235,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { final boolean isCompaction = false; HFileBlock cacheFormatBlock = blockEncoder.diskToCacheFormat( fsBlockWriter.getBlockForCaching(), isCompaction); + BlockCacheKey key = new BlockCacheKey(name, offset); passSchemaMetricsTo(cacheFormatBlock); cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(name, offset), cacheFormatBlock); @@ -241,8 +247,13 @@ public class HFileWriterV2 extends AbstractHFileWriter { * @param offset Offset at which the block has been written */ private void doCacheInL2Cache(long offset) throws IOException { - cacheConf.getL2Cache().cacheRawBlock(name, offset, - fsBlockWriter.getHeaderAndData()); + if (cacheConf.isL2CacheEnabled()) { + BlockCacheKey key = new BlockCacheKey(name, offset); + RawHFileBlock rawBlock = new RawHFileBlock( + fsBlockWriter.getBlockForCaching().getBlockType(), + fsBlockWriter.getHeaderAndData()); + l2Cache.cacheRawBlock(key, rawBlock); + } } /** @@ -304,7 +315,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { * Comparator passed on construction. * * @param kv KeyValue to add. Cannot be empty nor null. - * @param appendForCompaction Whether the KV was read from cache or not + * @param cv Whether the KV was read from cache or not * @throws IOException */ @Override @@ -339,7 +350,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { * @param value * @param voffset * @param vlength - * @param KeyValueContext + * @param cv * @throws IOException */ private void append(final long memstoreTS, final byte[] key, diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java index 66c94d6..599b9ea 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java @@ -38,30 +38,40 @@ public class L2BucketCache implements L2Cache { } @Override - public byte[] getRawBlock(String hfileName, long dataBlockOffset) { + public byte[] getRawBlockBytes(BlockCacheKey key) { long startTimeNs = System.nanoTime(); - BlockCacheKey cacheKey = new BlockCacheKey(hfileName, dataBlockOffset); - byte[] fromCache = bucketCache.getBlock(cacheKey, true); + byte[] fromCache = bucketCache.getBlock(key, true); if (LOG.isTraceEnabled()) { // Log elapsed time to retrieve a block from the cache long elapsedNs = System.nanoTime() - startTimeNs; - LOG.trace("getRawBlock() " + (fromCache == null ?"MISS" : "HIT") + - " on hfileName=" + hfileName + ", offset=" + dataBlockOffset + - " in " + elapsedNs + " ns."); + LOG.trace("getRawBlock() " + (fromCache == null ? "MISS" : "HIT") + + " on hfileName=" + key.getHfileName() + + ", offset=" + key.getOffset() + " in " + elapsedNs + " ns."); } return fromCache; } @Override - public void cacheRawBlock(String hfileName, long dataBlockOffset, byte[] block) { + public boolean cacheRawBlock(BlockCacheKey key, RawHFileBlock block) { long startTimeNs = System.nanoTime(); - BlockCacheKey cacheKey = new BlockCacheKey(hfileName, dataBlockOffset); - bucketCache.cacheBlock(cacheKey, block); + boolean cached = bucketCache.cacheBlock(key, block); if (LOG.isTraceEnabled()) { long elapsedNs = System.nanoTime() - startTimeNs; - LOG.trace("cacheRawBlock() on hfileName=" + hfileName + ", offset=" + - dataBlockOffset + " in " + elapsedNs + " ns."); + LOG.trace("cacheRawBlock() on hfileName=" + key.getHfileName() + + ", offset=" + key.getOffset() + " in " + elapsedNs + " ns."); } + return cached; + } + + @Override + public boolean evictRawBlock(BlockCacheKey cacheKey) { + long startTimeNs = System.nanoTime(); + boolean evicted = bucketCache.evictBlock(cacheKey); + if (LOG.isTraceEnabled()) { + long elapsedNs = System.nanoTime() - startTimeNs; + LOG.trace("evictBlock() of " + cacheKey + " in " + elapsedNs + " ns."); + } + return evicted; } @Override @@ -77,11 +87,10 @@ public class L2BucketCache implements L2Cache { } @Override - public boolean isShutdown() { - return !bucketCache.isEnabled(); + public boolean isEnabled() { + return bucketCache.isEnabled(); } - @Override public void shutdown() { bucketCache.shutdown(); } diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java index 67dcbb6..726532b 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java @@ -64,7 +64,7 @@ public class L2BucketCacheFactory implements L2CacheFactory { } /** Cached instance of the L2Cache or null if not initialized */ - private L2Cache l2Cache; + private L2BucketCache l2Cache; // Allows to short circuit getL2Cache() calls if the cache is disabled private boolean l2CacheDisabled; diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java index 3b2e171..12e67c6 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java @@ -28,37 +28,44 @@ public interface L2Cache { /** * Retrieve a block from the L2Cache. The block is retrieved as a byte * array, in the same exact format as it is stored on disk. - * @param hfileName Filename associated with the block - * @param dataBlockOffset Offset in the file - * @return + * @param cacheKey Key associated with the block + * @return raw byte string representing the block if present in cache, null + * otherwise */ - public byte[] getRawBlock(String hfileName, long dataBlockOffset); + public byte[] getRawBlockBytes(BlockCacheKey cacheKey); /** * Add a block to the L2Cache. The block must be represented by a * byte array identical to what would be written to disk. - * @param hfileName Filename associated with the block - * @param dataBlockOffset Offset in the file - * @param rawBlock The exact byte representation of the block + * @param cacheKey key associated with the block + * @param block the raw HFileBlock to be cached + * @return true if the block was cached, false otherwise */ - public void cacheRawBlock(String hfileName, long dataBlockOffset, - byte[] rawBlock); + public boolean cacheRawBlock(BlockCacheKey cacheKey, RawHFileBlock block); + + /** + * Evict a block from the L2Cache. + * @param cacheKey Key associated with the block to be evicted + * @return true if the block was evicted, false otherwise + */ + public boolean evictRawBlock(BlockCacheKey cacheKey); /** * Evict all blocks matching a given filename. This operation should be * efficient and can be called on each close of a store file. * @param hfileName Filename whose blocks to evict + * @return the number of evicted blocks */ public int evictBlocksByHfileName(String hfileName); /** - * @return true if the cache has been shutdown + * @return true if the cache is enabled, false otherwise */ - public boolean isShutdown(); + public boolean isEnabled(); /** - * Shutdown the cache + * Shutdown the L2 cache. */ public void shutdown(); } diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2CacheAgent.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2CacheAgent.java new file mode 100644 index 0000000..3119c4f --- /dev/null +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/L2CacheAgent.java @@ -0,0 +1,49 @@ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; + +/** + * {@link L2CacheAgent} wraps around a {@link CacheConfig} and {@link L2Cache} + * instance and is {@link SchemaConfigured}. As such it will make sure schema + * data is added to blocks going into the cache, allowing the appropriate + * metrics to be updated throughout the blocks lifecycle. Further more + * {@link L2CacheAgent} would allow for making schema aware cache decisions, + * removing the need for individual block writers to deal with cache policies. + */ +public class L2CacheAgent extends SchemaConfigured { + private CacheConfig cacheConfig; + private L2Cache l2Cache; + + public L2CacheAgent(final CacheConfig cacheConfig, + final L2Cache l2Cache) { + this.cacheConfig = cacheConfig; + this.l2Cache = l2Cache; + } + + public byte[] getRawBlockBytes(BlockCacheKey cacheKey) { + return isL2CacheEnabled() ? l2Cache.getRawBlockBytes(cacheKey) : null; + } + + public void cacheRawBlock(BlockCacheKey cacheKey, RawHFileBlock rawBlock) { + if (isL2CacheEnabled()) { + passSchemaMetricsTo(rawBlock); + l2Cache.cacheRawBlock(cacheKey, rawBlock); + } + } + + public boolean evictRawBlock(BlockCacheKey cacheKey) { + return isL2CacheEnabled() && l2Cache.evictRawBlock(cacheKey); + } + + public int evictBlocksByHfileName(String hfileName, boolean isClose) { + // If the HFile was closed but the policy is not to evict on close, skip + // the eviction. + if (isClose && !cacheConfig.shouldL2EvictOnClose()) return 0; + return isL2CacheEnabled() ? + l2Cache.evictBlocksByHfileName(hfileName) : 0; + } + + public boolean isL2CacheEnabled() { + return l2Cache != null && l2Cache.isEnabled(); + } +} diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 56edae6..52568f5 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -47,6 +47,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.regionserver.LruHashMap; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/RawHFileBlock.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/RawHFileBlock.java new file mode 100644 index 0000000..413a960 --- /dev/null +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/RawHFileBlock.java @@ -0,0 +1,43 @@ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; +import org.apache.hadoop.hbase.util.ClassSize; + +/** + * A representation of a raw {@link HFileBlock}, used to pass a small amount + * of metadata around with the raw byte array. This is primarily useful in the + * context of making policy decisions and updating metrics when caching raw + * block data in L2 and L3 caches. + */ +public class RawHFileBlock extends SchemaConfigured implements Cacheable { + private final BlockType type; + private final byte[] data; + + public static final long RAW_HFILE_BLOCK_OVERHEAD = + SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE + + // type and data references. + 2 * ClassSize.REFERENCE; + + public RawHFileBlock(BlockType type, byte[] data) { + this.type = type; + this.data = data; + } + + public RawHFileBlock(final HFileBlock block) { + this(block.getBlockType(), block.getBufferWithHeader().array()); + } + + @Override + public BlockType getBlockType() { + return type; + } + + public byte[] getData() { + return data; + } + + @Override + public long heapSize() { + return ClassSize.align(RAW_HFILE_BLOCK_OVERHEAD); + } +} diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 624a4c0..11ef8eb 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -26,24 +26,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.CachedBlock; -import org.apache.hadoop.hbase.io.hfile.LruBlockCache; +import org.apache.hadoop.hbase.io.hfile.*; import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.HasThread; -import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; +import org.apache.hadoop.hbase.util.*; import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Set; +import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -252,43 +243,47 @@ public class BucketCache implements HeapSize { /** * Cache the block with the specified name and buffer. * @param cacheKey block's cache key - * @param buf block buffer + * @param block Raw HFile block + * @return true if the block was cached, false otherwise */ - public void cacheBlock(BlockCacheKey cacheKey, byte[] buf) { - cacheBlock(cacheKey, buf, false); + public boolean cacheBlock(BlockCacheKey cacheKey, RawHFileBlock block) { + return cacheBlock(cacheKey, block, false); } /** * Cache the block with the specified name and buffer. * @param cacheKey block's cache key - * @param cachedItem block buffer + * @param block Raw HFile block * @param inMemory if block is in-memory + * @return true if the block was cached, false otherwise */ - public void cacheBlock(BlockCacheKey cacheKey, byte[] cachedItem, boolean inMemory) { - cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); + public boolean cacheBlock(BlockCacheKey cacheKey, RawHFileBlock block, + boolean inMemory) { + return cacheBlockWithWait(cacheKey, block, inMemory, wait_when_cache); } /** * Cache the block to ramCache * @param cacheKey block's cache key - * @param cachedItem block buffer + * @param block Raw HFile block * @param inMemory if block is in-memory * @param wait if true, blocking wait when queue is full + * @return true if the block was cached, false otherwise */ - public void cacheBlockWithWait(BlockCacheKey cacheKey, byte[] cachedItem, + public boolean cacheBlockWithWait(BlockCacheKey cacheKey, RawHFileBlock block, boolean inMemory, boolean wait) { if (!cacheEnabled) - return; + return false; if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) - return; + return true; /* * Stuff the entry into the RAM cache so it can get drained to the * persistent store */ - RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, - accessCount.incrementAndGet(), inMemory); + RAMQueueEntry re = new RAMQueueEntry(cacheKey, block, + accessCount.incrementAndGet(), inMemory); ramCache.put(cacheKey, re); int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); BlockingQueue bq = writerQueues.get(queueNum); @@ -308,12 +303,12 @@ public class BucketCache implements HeapSize { failedBlockAdditions.incrementAndGet(); } else { this.blockNumber.incrementAndGet(); - this.heapSize.addAndGet(cachedItem.length); + this.heapSize.addAndGet(re.getRawHFileBlock().getData().length); blocksByHFile.put(cacheKey.getHfileName(), cacheKey); } + return successfulAddition; } - /** * Get the buffer of the block with the specified key. * @param key block's cache key @@ -337,7 +332,7 @@ public class BucketCache implements HeapSize { if (re != null) { cacheStats.hit(caching); re.access(accessCount.incrementAndGet()); - return re.getData(); + return re.getRawHFileBlock().getData(); } BucketEntry bucketEntry = backingMap.get(key); if(bucketEntry!=null) { @@ -382,7 +377,8 @@ public class BucketCache implements HeapSize { RAMQueueEntry removedBlock = ramCache.remove(cacheKey); if (removedBlock != null) { this.blockNumber.decrementAndGet(); - this.heapSize.addAndGet(-1 * removedBlock.getData().length); + this.heapSize.addAndGet( + -1 * removedBlock.getRawHFileBlock().getData().length); } BucketEntry bucketEntry = backingMap.get(cacheKey); if (bucketEntry == null) { return false; } @@ -391,7 +387,7 @@ public class BucketCache implements HeapSize { lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); if (bucketEntry.equals(backingMap.remove(cacheKey))) { bucketAllocator.freeBlock(bucketEntry.offset()); - realCacheSize.addAndGet(-1 * bucketEntry.getLength()); + updateSizeMetrics(bucketEntry, true); blocksByHFile.remove(cacheKey.getHfileName(), cacheKey); if (removedBlock == null) { this.blockNumber.decrementAndGet(); @@ -411,6 +407,17 @@ public class BucketCache implements HeapSize { return true; } + protected long updateSizeMetrics(BucketEntry entry, boolean eviction) { + long delta = (eviction ? -1 : 1) * entry.getLength(); + SchemaMetrics metrics = entry.getSchemaMetrics(); + BlockType type = entry.getBlockType(); + //LOG.info("updateSizeMetrics: " + type + ", delta: " + delta); + if (metrics != null && type != null) { + metrics.updateOnL2CachePutOrEvict(type.getCategory(), delta); + } + return realCacheSize.addAndGet(delta); + } + /* * Statistics thread. Periodically prints the cache statistics to the log. */ @@ -666,6 +673,25 @@ public class BucketCache implements HeapSize { LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); } + private BucketEntry writeToCache(final RAMQueueEntry ramEntry) + throws CacheFullException, IOException, BucketAllocatorException { + int len = ramEntry.getRawHFileBlock().getData().length; + if (len == 0) { + return null; + } + long offset = bucketAllocator.allocateBlock(len); + BucketEntry bucketEntry = new BucketEntry(offset, ramEntry); + try { + ioEngine.write(ramEntry.getRawHFileBlock().getData(), offset); + } catch (IOException ioe) { + // free it in bucket allocator + bucketAllocator.freeBlock(offset); + throw ioe; + } + updateSizeMetrics(bucketEntry, false); + return bucketEntry; + } + /** * Flush the entries in ramCache to IOEngine and add bucket entry to * backingMap @@ -686,8 +712,7 @@ public class BucketCache implements HeapSize { LOG.warn("Couldn't get the entry from RAM queue, who steals it?"); continue; } - BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine, - bucketAllocator, realCacheSize); + BucketEntry bucketEntry = writeToCache(ramEntry); ramEntries[done] = ramEntry; bucketEntries[done++] = bucketEntry; if (ioErrorStartTime > 0) { @@ -730,7 +755,8 @@ public class BucketCache implements HeapSize { } RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey()); if (ramCacheEntry != null) { - heapSize.addAndGet(-1 * ramEntries[i].getData().length); + heapSize.addAndGet( + -1 * ramEntries[i].getRawHFileBlock().getData().length); } } @@ -864,15 +890,30 @@ public class BucketCache implements HeapSize { * up the long. Doubt we'll see devices this big for ages. Offsets are divided * by 256. So 5 bytes gives us 256TB or so. */ - static class BucketEntry implements Serializable, Comparable { + static class BucketEntry implements Serializable, Comparable, + Cacheable { private static final long serialVersionUID = -6741504807982257534L; private int offsetBase; private int length; private byte offset1; private volatile long accessTime; private CachedBlock.BlockPriority priority; - - BucketEntry(long offset, int length, long accessTime, boolean inMemory) { + private BlockType type; + private SchemaMetrics metrics; + + public final static long BUCKET_ENTRY_OVERHEAD = + ClassSize.OBJECT + + // this.offsetBase, this.length + 2 * Bytes.SIZEOF_INT + + // this.offset1 + Bytes.SIZEOF_BYTE + + // this.accessTime + Bytes.SIZEOF_LONG + + // this.priority + ClassSize.REFERENCE; + + BucketEntry(long offset, int length, long accessTime, boolean inMemory, + BlockType type, SchemaMetrics metrics) { setOffset(offset); this.length = length; this.accessTime = accessTime; @@ -881,6 +922,15 @@ public class BucketCache implements HeapSize { } else { this.priority = CachedBlock.BlockPriority.SINGLE; } + this.type = type; + this.metrics = metrics; + } + + BucketEntry(long offset, RAMQueueEntry ramEntry) { + this(offset, ramEntry.getRawHFileBlock().getData().length, + ramEntry.getAccessTime(), ramEntry.isInMemory(), + ramEntry.getRawHFileBlock().getBlockType(), + ramEntry.getRawHFileBlock().getSchemaMetrics()); } long offset() { // Java has no unsigned numbers @@ -924,6 +974,21 @@ public class BucketCache implements HeapSize { public boolean equals(Object that) { return this == that; } + + @Override + public long heapSize() { + return ClassSize.align(BUCKET_ENTRY_OVERHEAD); + } + + @Override + public BlockType getBlockType() { + return type; + } + + @Override + public SchemaMetrics getSchemaMetrics() { + return metrics; + } } /** diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java index fd1dd6b..809dcf5 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java @@ -20,60 +20,60 @@ package org.apache.hadoop.hbase.io.hfile.bucket; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hbase.io.hfile.RawHFileBlock; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; /** * Block Entry stored in the memory with key,data and so on */ -class RAMQueueEntry { +class RAMQueueEntry implements HeapSize { private BlockCacheKey key; - private byte[] data; + private RawHFileBlock block; private long accessTime; private boolean inMemory; - public RAMQueueEntry(BlockCacheKey bck, byte[] data, long accessTime, - boolean inMemory) { + public final static long RAM_QUEUE_ENTRY_OVERHEAD = + ClassSize.OBJECT + + // key, block + 2 * ClassSize.REFERENCE + + // accessTime + Bytes.SIZEOF_LONG + + // inMemory + Bytes.SIZEOF_BOOLEAN; + + public RAMQueueEntry(BlockCacheKey bck, RawHFileBlock block, long accessTime, + boolean inMemory) { this.key = bck; - this.data = data; + this.block = block; this.accessTime = accessTime; this.inMemory = inMemory; } - public byte[] getData() { - return data; - } - public BlockCacheKey getKey() { return key; } + public RawHFileBlock getRawHFileBlock() { + return block; + } + + public long getAccessTime() { + return accessTime; + } + + public boolean isInMemory() { + return inMemory; + } + public void access(long accessTime) { this.accessTime = accessTime; } - public BucketCache.BucketEntry writeToCache(final IOEngine ioEngine, - final BucketAllocator bucketAllocator, - final AtomicLong realCacheSize) throws CacheFullException, IOException, - BucketAllocatorException { - int len = data.length; - if (len == 0) { - return null; - } - long offset = bucketAllocator.allocateBlock(len); - BucketCache.BucketEntry bucketEntry = new BucketCache.BucketEntry(offset, len, accessTime, - inMemory); - try { - ioEngine.write(data, offset); - } catch (IOException ioe) { - // free it in bucket allocator - bucketAllocator.freeBlock(offset); - throw ioe; - } - - realCacheSize.addAndGet(len); - return bucketEntry; + @Override + public long heapSize() { + return ClassSize.align(RAM_QUEUE_ENTRY_OVERHEAD); } } diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java index 2d7437a..578a195 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java @@ -151,8 +151,8 @@ public class SchemaConfigured implements HeapSize, SchemaAware { * current table and column family name, and the associated collection of * metrics. */ - public void passSchemaMetricsTo(SchemaConfigured block) { - SchemaConfigured upcast = block; // need this to assign private fields + public void passSchemaMetricsTo(SchemaConfigured that) { + SchemaConfigured upcast = that; // need this to assign private fields upcast.tableName = tableName; upcast.cfName = cfName; upcast.schemaMetrics = schemaMetrics; diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java index 6d5189f..329ad47 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java @@ -114,20 +114,30 @@ public class SchemaMetrics { public static enum BlockMetricType { READ_TIME("Read", COMPACTION_AWARE_METRIC_FLAG | TIME_VARYING_METRIC_FLAG), + READ_COUNT("BlockReadCnt", COMPACTION_AWARE_METRIC_FLAG), CACHE_HIT("BlockReadCacheHitCnt", COMPACTION_AWARE_METRIC_FLAG), CACHE_MISS("BlockReadCacheMissCnt", COMPACTION_AWARE_METRIC_FLAG), - PRELOAD_CACHE_HIT("PreloadCacheHitCnt",COMPACTION_AWARE_METRIC_FLAG), - PRELOAD_CACHE_MISS("PreloadCacheMissCnt",COMPACTION_AWARE_METRIC_FLAG), - PRELOAD_READ_TIME("PreloadReadTime", - COMPACTION_AWARE_METRIC_FLAG | TIME_VARYING_METRIC_FLAG), - CACHE_SIZE("blockCacheSize", PERSISTENT_METRIC_FLAG), UNENCODED_CACHE_SIZE("blockCacheUnencodedSize", PERSISTENT_METRIC_FLAG), CACHE_NUM_BLOCKS("cacheNumBlocks", PERSISTENT_METRIC_FLAG), CACHED("blockCacheNumCached"), - EVICTED("blockCacheNumEvicted"); + EVICTED("blockCacheNumEvicted"), + + L2_READ_COUNT("L2ReadCnt", COMPACTION_AWARE_METRIC_FLAG), + L2_CACHE_HIT("L2CacheHitCnt", COMPACTION_AWARE_METRIC_FLAG), + L2_CACHE_MISS("L2CacheMissCnt", COMPACTION_AWARE_METRIC_FLAG), + + L2_CACHE_NUM("L2CacheNumBlocks", PERSISTENT_METRIC_FLAG), + L2_CACHE_SIZE("L2CacheSize", PERSISTENT_METRIC_FLAG), + L2_CACHED("L2CacheNumCached"), + L2_EVICTED("L2CacheNumEvicted"), + + PRELOAD_CACHE_HIT("PreloadCacheHitCnt", COMPACTION_AWARE_METRIC_FLAG), + PRELOAD_CACHE_MISS("PreloadCacheMissCnt", COMPACTION_AWARE_METRIC_FLAG), + PRELOAD_READ_TIME("PreloadReadTime", + COMPACTION_AWARE_METRIC_FLAG | TIME_VARYING_METRIC_FLAG); private final String metricStr; private final int flags; @@ -250,7 +260,7 @@ public class SchemaMetrics { StoreMetricType.values().length; /** Conf key controlling whether we include table name in metric names */ - private static final String SHOW_TABLE_NAME_CONF_KEY = + public static final String SHOW_TABLE_NAME_CONF_KEY = "hbase.metrics.showTableName"; private static final String WORD_BOUNDARY_RE_STR = "\\b"; @@ -522,70 +532,76 @@ public class SchemaMetrics { * @param blockCategory category of the block read * @param isCompaction whether this is compaction read or not * @param timeMs time taken to read the block + * @param l1Cached whether this block was read from cache or not + * @param l2Cached whether this block was read from L2 cache or not * @param preload whether this a preloaded block or not - * @param obtainedFromCache whether the block is found in cache or not */ public void updateOnBlockRead(BlockCategory blockCategory, - boolean isCompaction, long timeMs, boolean preload, - boolean obtainedFromCache) { - if (obtainedFromCache) { - if (!preload) { - updateOnCacheHit(blockCategory, isCompaction, timeMs); - } else { - updateOnPreloadCacheHit(blockCategory, isCompaction, timeMs); - } + boolean isCompaction, long timeMs, boolean l1Cached, boolean l2Cached, + boolean preload) { + addToReadTime(blockCategory, isCompaction, timeMs); + if (l1Cached || l2Cached) { + if (l1Cached) updateOnCacheHit(blockCategory, isCompaction); + if (l2Cached) updateOnL2CacheHit(blockCategory, isCompaction); + if (preload) updateOnPreloadCacheHit(blockCategory, isCompaction, timeMs); } else { - if (!preload) { - updateOnCacheMiss(blockCategory, isCompaction, timeMs); - } else { - updateOnPreloadCacheMiss(blockCategory, isCompaction, timeMs); - } + updateOnCacheMiss(blockCategory, isCompaction); + updateOnL2CacheMiss(blockCategory, isCompaction); + if (preload) updateOnPreloadCacheMiss(blockCategory, isCompaction, timeMs); } - } - /** - * Updates the number of hits and the total number of block reads on a block cache hit. - */ - public void updateOnCacheHit(BlockCategory blockCategory, - boolean isCompaction) { - blockCategory.expectSpecific(); - incrNumericMetric(blockCategory, isCompaction, BlockMetricType.CACHE_HIT); - incrNumericMetric(blockCategory, isCompaction, BlockMetricType.READ_COUNT); if (this != ALL_SCHEMA_METRICS) { - ALL_SCHEMA_METRICS.updateOnCacheHit(blockCategory, isCompaction); + ALL_SCHEMA_METRICS.updateOnBlockRead(blockCategory, isCompaction, timeMs, + l1Cached, l2Cached, preload); } } + /** * Updates the number of hits and the total number of block reads on a block * cache hit. */ public void updateOnCacheHit(BlockCategory blockCategory, - boolean isCompaction, long deltaMs) { + boolean isCompaction) { blockCategory.expectSpecific(); incrNumericMetric(blockCategory, isCompaction, BlockMetricType.CACHE_HIT); incrNumericMetric(blockCategory, isCompaction, BlockMetricType.READ_COUNT); - addToReadTime(blockCategory, isCompaction, deltaMs); - if (this != ALL_SCHEMA_METRICS) { - ALL_SCHEMA_METRICS.updateOnCacheHit(blockCategory, isCompaction, deltaMs); - } } /** - * Updates read time, the number of misses, and the total number of block - * reads on a block cache miss. + * Updates read time and the number of misses on a block cache miss. */ - public void updateOnCacheMiss(BlockCategory blockCategory, - boolean isCompaction, long timeMs) { + private void updateOnCacheMiss(BlockCategory blockCategory, + boolean isCompaction) { blockCategory.expectSpecific(); - addToReadTime(blockCategory, isCompaction, timeMs); incrNumericMetric(blockCategory, isCompaction, BlockMetricType.CACHE_MISS); incrNumericMetric(blockCategory, isCompaction, BlockMetricType.READ_COUNT); - if (this != ALL_SCHEMA_METRICS) { - ALL_SCHEMA_METRICS.updateOnCacheMiss(blockCategory, isCompaction, - timeMs); - } } - + + /** + * Updates the number of hits and the total number of block reads on a L2 + * cache hit. + */ + private void updateOnL2CacheHit(BlockCategory blockCategory, + boolean isCompaction) { + blockCategory.expectSpecific(); + incrNumericMetric(blockCategory, isCompaction, + BlockMetricType.L2_READ_COUNT); + incrNumericMetric(blockCategory, isCompaction, + BlockMetricType.L2_CACHE_HIT); + } + + /** + * Updates read time and the number of misses on a block cache miss. + */ + private void updateOnL2CacheMiss(BlockCategory blockCategory, + boolean isCompaction) { + blockCategory.expectSpecific(); + incrNumericMetric(blockCategory, isCompaction, + BlockMetricType.L2_CACHE_MISS); + incrNumericMetric(blockCategory, isCompaction, + BlockMetricType.L2_READ_COUNT); + } + private void addToPreloadReadTime(BlockCategory blockCategory, boolean isCompaction, long timeMs) { HRegion.incrTimeVaryingMetric(getBlockMetricName(blockCategory, @@ -600,29 +616,22 @@ public class SchemaMetrics { /** * Updates read time, the number of misses, and the total number of block for preloader */ - public void updateOnPreloadCacheMiss(BlockCategory blockCategory, boolean isCompaction, + private void updateOnPreloadCacheMiss(BlockCategory blockCategory, boolean isCompaction, long timeMs) { blockCategory.expectSpecific(); addToPreloadReadTime(blockCategory, isCompaction, timeMs); incrNumericMetric(blockCategory, isCompaction, BlockMetricType.PRELOAD_CACHE_MISS); - if (this != ALL_SCHEMA_METRICS) { - ALL_SCHEMA_METRICS.updateOnPreloadCacheMiss(blockCategory, isCompaction, timeMs); - } } /** * Updates read time, the number of hits, and the total number of block for preloader */ - public void updateOnPreloadCacheHit(BlockCategory blockCategory, + private void updateOnPreloadCacheHit(BlockCategory blockCategory, boolean isCompaction, long timeMs) { blockCategory.expectSpecific(); addToPreloadReadTime(blockCategory, isCompaction, timeMs); incrNumericMetric(blockCategory, isCompaction, - BlockMetricType.PRELOAD_CACHE_HIT); - if (this != ALL_SCHEMA_METRICS) { - ALL_SCHEMA_METRICS.updateOnPreloadCacheMiss(blockCategory, isCompaction, - timeMs); - } + BlockMetricType.PRELOAD_CACHE_HIT); } /** @@ -649,6 +658,21 @@ public class SchemaMetrics { } } + private void updateL2CacheSize(BlockCategory category, long delta) { + if (category == null) { + category = BlockCategory.ALL_CATEGORIES; + } + HRegion.incrNumericPersistentMetric(getBlockMetricName(category, + DEFAULT_COMPACTION_FLAG, BlockMetricType.L2_CACHE_NUM), + delta > 0 ? 1 : -1); + HRegion.incrNumericPersistentMetric(getBlockMetricName(category, + DEFAULT_COMPACTION_FLAG, BlockMetricType.L2_CACHE_SIZE), delta); + + if (category != BlockCategory.ALL_CATEGORIES) { + updateL2CacheSize(BlockCategory.ALL_CATEGORIES, delta); + } + } + /** * Updates the number and the total size of blocks in cache for both the configured table/CF * and all table/CFs (by calling the same method on {@link #ALL_SCHEMA_METRICS}), both the given @@ -670,6 +694,16 @@ public class SchemaMetrics { } } + public void updateOnL2CachePutOrEvict(BlockCategory blockCategory, + long delta) { + updateL2CacheSize(blockCategory, delta); + incrNumericMetric(blockCategory, DEFAULT_COMPACTION_FLAG, + delta > 0 ? BlockMetricType.L2_CACHED : BlockMetricType.L2_EVICTED); + if (this != ALL_SCHEMA_METRICS) { + ALL_SCHEMA_METRICS.updateOnL2CachePutOrEvict(blockCategory, delta); + } + } + /** * Increments both the per-CF and the aggregate counter of bloom * positives/negatives as specified by the argument. diff --git a/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java b/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java index d48d40c..92a9879 100644 --- a/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java +++ b/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java @@ -21,17 +21,18 @@ package org.apache.hadoop.hbase.io.hfile; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.ClientConfigurationUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; +import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.junit.After; import org.junit.Before; @@ -40,9 +41,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Random; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.*; @@ -72,6 +71,8 @@ public class TestL2BucketCache { private Configuration conf; private CacheConfig cacheConf; + private HColumnDescriptor family; + private HRegion region; private FileSystem fs; private Path storeFilePath; @@ -115,7 +116,7 @@ public class TestL2BucketCache { } @After - public void tearDown() { + public void tearDown() throws IOException { underlyingCache.shutdown(); } @@ -145,8 +146,8 @@ public class TestL2BucketCache { new BlockCacheKey(reader.getName(), offset), true) != null; if (isInL1Lcache) { cachedCount++; - byte[] blockFromCacheRaw = - mockedL2Cache.getRawBlock(reader.getName(), offset); + BlockCacheKey key = new BlockCacheKey(reader.getName(), offset); + byte[] blockFromCacheRaw = mockedL2Cache.getRawBlockBytes(key); assertNotNull("All blocks in l1 cache, should also be in l2 cache: " + blockFromDisk.toString(), blockFromCacheRaw); HFileBlock blockFromL2Cache = HFileBlock.fromBytes(blockFromCacheRaw, @@ -176,7 +177,8 @@ public class TestL2BucketCache { while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { HFileBlock blockFromDisk = reader.readBlock(offset, -1, true, false, false, null, encodingInCache, null); - assertNotNull(mockedL2Cache.getRawBlock(reader.getName(), offset)); + BlockCacheKey key = new BlockCacheKey(reader.getName(), offset); + assertNotNull(mockedL2Cache.getRawBlockBytes(key)); cacheConf.getBlockCache().evictBlock(new BlockCacheKey(reader.getName(), offset)); HFileBlock blockFromL2Cache = reader.readBlock(offset, -1, true, false, @@ -227,12 +229,78 @@ public class TestL2BucketCache { assertFalse(cacheConf.isL2CacheEnabled()); } + @Test + public void shouldUpdatePerBlockCategoryMetrics() throws Exception { + Map snapshot = SchemaMetrics.getMetricsSnapshot(); + writeStoreFile(); + + // Get an unknown table and CF. + SchemaConfigured unknownSchema = new SchemaConfigured(conf, null); + // Get the test table and CF. + SchemaConfigured testSchema = new SchemaConfigured(conf, storeFilePath); + + assertEquals( + "Unknown schema DATA category size in L2 cache should be zero", + 0, getL2CacheSize(unknownSchema, BlockType.BlockCategory.DATA)); + assertTrue( + "Test schema DATA category size in L2 cache should not be zero", + getL2CacheSize(testSchema, BlockType.BlockCategory.DATA) > 0); + // Validate that the per-cf-category metrics add up with the all-cf-category + // metrics. + SchemaMetrics.validateMetricChanges(snapshot); + + readAndEvictBlocksFromL2Cache(); + + assertTrue("Test schema should have L2 cache hits", + getL2CacheHitCount(testSchema, BlockType.BlockCategory.DATA) > 0); + assertEquals("L2 cache should be empty", 0, + underlyingCache.getBlockCount()); + assertEquals("DATA category size in L2 cache should be zero", 0, + getL2CacheSize(testSchema, BlockType.BlockCategory.DATA)); + SchemaMetrics.validateMetricChanges(snapshot); + } + + private void readAndEvictBlocksFromL2Cache() throws IOException { + long offset = 0; + HFileReaderV2 reader = (HFileReaderV2) HFile.createReaderWithEncoding(fs, + storeFilePath, cacheConf, ENCODER.getEncodingInCache()); + // Clear the BlockCache to make sure reads are satisfied from L2 cache or + // disk. + cacheConf.getBlockCache().clearCache(); + while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { + HFileBlock block = reader.readBlock(offset, -1, true, false, + false, null, reader.getEffectiveEncodingInCache(false), null); + mockedL2Cache.evictRawBlock(new BlockCacheKey(reader.getName(), offset)); + offset += block.getOnDiskSizeWithHeader(); + } + } + + private long getL2CacheSize(SchemaConfigured schema, + BlockType.BlockCategory category) { + return HRegion.getNumericPersistentMetric( + schema.getSchemaMetrics().getBlockMetricName(category, false, + SchemaMetrics.BlockMetricType.L2_CACHE_SIZE)); + } + + private long getL2CacheHitCount(SchemaConfigured schema, + BlockType.BlockCategory category) { + return HRegion.getNumericMetric( + schema.getSchemaMetrics().getBlockMetricName(category, false, + SchemaMetrics.BlockMetricType.L2_CACHE_HIT)); + } + private void writeStoreFile() throws IOException { - Path storeFileParentDir = new Path(TEST_UTIL.getTestDir(), - "test_cache_on_write"); + // Generate a random family name in order to keep different tests apart. + // This is important for the metrics test. + family = new HColumnDescriptor( + Integer.toHexString(new Random().nextInt(1023))); + region = TEST_UTIL.createTestRegion(TestL2BucketCache.class.getSimpleName(), + family); + Path storeHomeDir = Store.getStoreHomedir(region.getRegionDir(), + region.getRegionInfo().getEncodedName(), family.getName()); StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs, DATA_BLOCK_SIZE) - .withOutputDir(storeFileParentDir) + .withOutputDir(storeHomeDir) .withCompression(Compression.Algorithm.GZ) .withDataBlockEncoder(ENCODER) .withComparator(KeyValue.COMPARATOR) @@ -271,26 +339,35 @@ public class TestL2BucketCache { } @Override - public byte[] getRawBlock(String hfileName, long dataBlockOffset) { + public byte[] getRawBlockBytes(BlockCacheKey key) { byte[] ret = null; if (enableReads.get()) { - ret = underlying.getRawBlock(hfileName, dataBlockOffset); + ret = underlying.getRawBlockBytes(key); if (LOG.isTraceEnabled()) { LOG.trace("Cache " + (ret == null ?"miss":"hit") + - " for hfileName=" + hfileName + ", offset=" + dataBlockOffset); + " for hfileName=" + key.getHfileName() + + ", offset=" + key.getOffset()); } } return ret; } @Override - public void cacheRawBlock(String hfileName, long dataBlockOffset, - byte[] rawBlock) { + public boolean cacheRawBlock(BlockCacheKey cacheKey, + RawHFileBlock rawBlock) { + if (LOG.isTraceEnabled()) { + LOG.trace("Caching " + rawBlock.getData().length + " bytes, hfileName=" + + cacheKey.getHfileName() + ", offset=" + cacheKey.getOffset()); + } + return underlying.cacheRawBlock(cacheKey, rawBlock); + } + + @Override + public boolean evictRawBlock(BlockCacheKey cacheKey) { if (LOG.isTraceEnabled()) { - LOG.trace("Caching " + rawBlock.length + " bytes, hfileName=" + - hfileName + ", offset=" + dataBlockOffset); + LOG.trace("Evicting " + cacheKey); } - underlying.cacheRawBlock(hfileName, dataBlockOffset, rawBlock); + return underlying.evictRawBlock(cacheKey); } @Override @@ -299,8 +376,8 @@ public class TestL2BucketCache { } @Override - public boolean isShutdown() { - return underlying.isShutdown(); + public boolean isEnabled() { + return underlying.isEnabled(); } @Override diff --git a/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 81f3583..70a8c42 100644 --- a/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -25,7 +25,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.RawHFileBlock; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -135,7 +137,8 @@ public class TestBucketCache { // Add blocks for (BlockOnDisk block : blocks) { - cache.cacheBlock(block.blockName, block.block); + cache.cacheBlock(block.blockName, + new RawHFileBlock(BlockType.DATA, block.block)); } // Check if all blocks are properly cached and contain the right @@ -155,7 +158,8 @@ public class TestBucketCache { for (BlockOnDisk block : blocks) { try { if (cache.getBlock(block.blockName, true) != null) { - cache.cacheBlock(block.blockName, block.block); + cache.cacheBlock(block.blockName, + new RawHFileBlock(BlockType.DATA, block.block)); } } catch (RuntimeException re) { // expected @@ -174,7 +178,7 @@ public class TestBucketCache { conf); final AtomicInteger totalQueries = new AtomicInteger(); - cache.cacheBlock(key, buf); + cache.cacheBlock(key, new RawHFileBlock(BlockType.DATA, buf)); for (int i = 0; i < NUM_THREADS; i++) { MultithreadedTestUtil.TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { @@ -202,7 +206,8 @@ public class TestBucketCache { cache.stopWriterThreads(); BlockOnDisk[] blocks = generateDiskBlocks(BLOCK_SIZE, 1); long heapSize = cache.heapSize(); - cache.cacheBlock(blocks[0].blockName, blocks[0].block); + cache.cacheBlock(blocks[0].blockName, + new RawHFileBlock(BlockType.DATA, blocks[0].block)); /*When we cache something HeapSize should always increase */ assertTrue(heapSize < cache.heapSize()); @@ -254,20 +259,20 @@ public class TestBucketCache { } @Override - public void cacheBlock(BlockCacheKey cacheKey, byte[] buf, + public boolean cacheBlock(BlockCacheKey cacheKey, RawHFileBlock block, boolean inMemory) { if (super.getBlock(cacheKey, true, false) != null) { throw new RuntimeException("Cached an already cached block"); } - super.cacheBlock(cacheKey, buf, inMemory); + return super.cacheBlock(cacheKey, block, inMemory); } @Override - public void cacheBlock(BlockCacheKey cacheKey, byte[] buf) { + public boolean cacheBlock(BlockCacheKey cacheKey, RawHFileBlock block) { if (super.getBlock(cacheKey, true, false) != null) { throw new RuntimeException("Cached an already cached block"); } - super.cacheBlock(cacheKey, buf); + return super.cacheBlock(cacheKey, block); } } } diff --git a/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaMetrics.java b/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaMetrics.java index ff62ca3..c3c2ea8 100644 --- a/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaMetrics.java +++ b/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaMetrics.java @@ -252,9 +252,10 @@ public class TestSchemaMetrics { } for (boolean isCompaction : BOOL_VALUES) { - sm.updateOnCacheHit(blockCat, isCompaction); + sm.updateOnBlockRead(blockCat, isCompaction, 0, true, false, false); checkMetrics(); - sm.updateOnCacheMiss(blockCat, isCompaction, rand.nextInt(READ_TIME_MS_RANGE)); + sm.updateOnBlockRead(blockCat, isCompaction, + rand.nextInt(READ_TIME_MS_RANGE), false, false, false); checkMetrics(); }