Index: src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java (revision 1531202) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java (working copy) @@ -67,8 +67,8 @@ Path path = new Path("/Users/ryan/rfile.big.txt"); long start = System.currentTimeMillis(); SimpleBlockCache cache = new SimpleBlockCache(); - CacheConfig cacheConf = new CacheConfig(cache, true, false, false, false, - false, false, false); + CacheConfig cacheConf = new CacheConfig.CacheConfigBuilder().withBlockCache(cache) + .withCacheDataOnRead(true).build(); Reader reader = HFile.createReader(lfs, path, cacheConf); reader.loadFileInfo(); Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java (revision 1531202) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java (working copy) @@ -152,8 +152,7 @@ } missCount += 1; - prevBlock = realReader.readBlockData(offset, onDiskSize, - -1, pread); + prevBlock = realReader.readBlockData(offset, onDiskSize, -1, pread, false); prevOffset = offset; prevOnDiskSize = onDiskSize; prevPread = pread; @@ -167,8 +166,8 @@ LOG.info("Size of " + path + ": " + fileSize); FSDataInputStream istream = fs.open(path); - HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(istream, - compr, fs.getFileStatus(path).getLen()); + HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(istream, compr, fs.getFileStatus( + path).getLen()); BlockReaderWrapper brw = new BlockReaderWrapper(blockReader); HFileBlockIndex.BlockIndexReader indexReader = @@ -221,8 +220,8 @@ HFile.DEFAULT_CHECKSUM_TYPE, HFile.DEFAULT_BYTES_PER_CHECKSUM); FSDataOutputStream outputStream = fs.create(path); - HFileBlockIndex.BlockIndexWriter biw = - new HFileBlockIndex.BlockIndexWriter(hbw, null, null); + HFileBlockIndex.BlockIndexWriter biw = new HFileBlockIndex.BlockIndexWriter(hbw, null, null, + null); for (int i = 0; i < NUM_DATA_BLOCKS; ++i) { hbw.startWriting(BlockType.DATA).write( Index: src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java (working copy) @@ -0,0 +1,276 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hfile.bucket; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MultithreadedTestUtil; + +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import static junit.framework.Assert.assertNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Category(MediumTests.class) +@RunWith(Parameterized.class) +public class TestBucketCache { + + private static final Log LOG = LogFactory.getLog(TestBucketCache.class); + + private static final long CAPACITY_SIZE = 32 * 1024 * 1024; + + private static final int CACHE_SIZE = 1000000; + private static final int NUM_BLOCKS = 100; + private static final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; + private static final int NUM_THREADS = 1000; + private static final int NUM_QUERIES = 10000; + + private final String ioEngineName; + + private BucketCache cache; + + + + public TestBucketCache(String ioEngineName) { + this.ioEngineName = ioEngineName; + LOG.info("Running with ioEngineName = " + ioEngineName); + } + + @Parameterized.Parameters + public static Collection getConfigurations() { + Object[][] data = new Object[][] { {"heap"}, {"offheap"} }; + return Arrays.asList(data); + } + + @Before + public void setup() throws IOException { + cache = new MockedBucketCache(ioEngineName, CAPACITY_SIZE, + BucketCache.DEFAULT_WRITER_THREADS, + BucketCache.DEFAULT_WRITER_QUEUE_ITEMS); + } + + @After + public void tearDown() { + cache.shutdown(); + } + + @Test + public void testBucketAllocator() throws BucketAllocatorException { + BucketAllocator mAllocator = cache.getAllocator(); + /* + * Test the allocator first + */ + int[] blockSizes = new int[2]; + blockSizes[0] = 4 * 1024; + blockSizes[1] = 8 * 1024; + boolean full = false; + int i = 0; + ArrayList allocations = new ArrayList(); + // Fill the allocated extents + while (!full) { + try { + allocations.add(mAllocator.allocateBlock(blockSizes[i + % blockSizes.length])); + ++i; + } catch (CacheFullException cfe) { + full = true; + } + } + + for (i = 0; i < blockSizes.length; i++) { + BucketAllocator.BucketSizeInfo bucketSizeInfo = mAllocator + .roundUpToBucketSizeInfo(blockSizes[0]); + BucketAllocator.IndexStatistics indexStatistics = bucketSizeInfo.statistics(); + assertTrue(indexStatistics.freeCount() == 0); + } + + for (long offset : allocations) { + assertTrue(mAllocator.sizeOfAllocation(offset) == mAllocator + .freeBlock(offset)); + } + assertTrue(mAllocator.getUsedSize() == 0); + } + + @Test + public void testCacheSimple() throws Exception { + BlockOnDisk[] blocks = generateDiskBlocks(NUM_QUERIES, + BLOCK_SIZE); + // Confirm empty + for (BlockOnDisk block : blocks) { + assertNull(cache.getBlock(block.blockName, true)); + } + + // Add blocks + for (BlockOnDisk block : blocks) { + cache.cacheBlock(block.blockName, block.block); + } + + // Check if all blocks are properly cached and contain the right + // information, or the blocks are null. + // MapMaker makes no guarantees when it will evict, so neither can we. + + for (BlockOnDisk block : blocks) { + byte[] buf = cache.getBlock(block.blockName, true); + if (buf != null) { + assertArrayEquals(block.block, buf); + } + + } + + // Re-add some duplicate blocks. Hope nothing breaks. + + for (BlockOnDisk block : blocks) { + try { + if (cache.getBlock(block.blockName, true) != null) { + cache.cacheBlock(block.blockName, block.block); + } + } catch (RuntimeException re) { + // expected + } + } + } + + @Test + public void testCacheMultiThreadedSingleKey() throws Exception { + final BlockCacheKey key = new BlockCacheKey("key", 0); + final byte[] buf = new byte[5 * 1024]; + Arrays.fill(buf, (byte) 5); + + Configuration conf = new Configuration(); + MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( + conf); + + final AtomicInteger totalQueries = new AtomicInteger(); + cache.cacheBlock(key, buf); + + for (int i = 0; i < NUM_THREADS; i++) { + MultithreadedTestUtil.TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { + @Override + public void doAnAction() throws Exception { + byte[] returned = cache.getBlock(key, false); + assertArrayEquals(buf, returned); + totalQueries.incrementAndGet(); + } + }; + + t.setDaemon(true); + ctx.addThread(t); + } + + ctx.startThreads(); + while (totalQueries.get() < NUM_QUERIES && ctx.shouldRun()) { + Thread.sleep(10); + } + ctx.stop(); + } + + @Test + public void testHeapSizeChanges() throws Exception { + cache.stopWriterThreads(); + BlockOnDisk[] blocks = generateDiskBlocks(BLOCK_SIZE, 1); + long heapSize = cache.heapSize(); + cache.cacheBlock(blocks[0].blockName, blocks[0].block); + + /*When we cache something HeapSize should always increase */ + assertTrue(heapSize < cache.heapSize()); + + cache.evictBlock(blocks[0].blockName); + + /*Post eviction, heapsize should be the same */ + assertEquals(heapSize, cache.heapSize()); + } + + private static class BlockOnDisk { + BlockCacheKey blockName; + byte[] block; + } + + private static BlockOnDisk[] generateDiskBlocks(int blockSize, + int numBlocks) { + BlockOnDisk[] retVal = new BlockOnDisk[numBlocks]; + Random rand = new Random(); + HashSet usedStrings = new HashSet(); + for (int i = 0; i < numBlocks; i++) { + byte[] generated = new byte[blockSize]; + rand.nextBytes(generated); + String strKey; + strKey = Long.toString(rand.nextLong()); + while (!usedStrings.add(strKey)) { + strKey = Long.toString(rand.nextLong()); + } + retVal[i] = new BlockOnDisk(); + retVal[i].blockName = new BlockCacheKey(strKey, 0); + retVal[i].block = generated; + } + return retVal; + } + + private static class MockedBucketCache extends BucketCache { + + public MockedBucketCache(String ioEngineName, long capacity, + int writerThreads, + int writerQLen) throws IOException { + super(ioEngineName, + capacity, + writerThreads, + writerQLen, + DEFAULT_ERROR_TOLERATION_DURATION, + CacheConfig.DEFAULT_L2_BUCKET_CACHE_BUCKET_SIZES, + null); + super.wait_when_cache = true; + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, byte[] buf, + boolean inMemory) { + if (super.getBlock(cacheKey, true, false) != null) { + throw new RuntimeException("Cached an already cached block"); + } + super.cacheBlock(cacheKey, buf, inMemory); + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, byte[] buf) { + if (super.getBlock(cacheKey, true, false) != null) { + throw new RuntimeException("Cached an already cached block"); + } + super.cacheBlock(cacheKey, buf); + } + } +} Index: src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java (working copy) @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.MediumTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + + +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertTrue; + +/** + * Test for {@link ByteBufferIOEngine} + */ +@Category(MediumTests.class) +@RunWith(Parameterized.class) +public class TestByteBufferIOEngine { + + private static final Log LOG = LogFactory.getLog(TestByteBufferIOEngine.class); + + private final boolean isDirect; + + public TestByteBufferIOEngine(boolean isDirect) { + this.isDirect = isDirect; + LOG.info("Running with direct allocation " + + (isDirect? "enabled" : "disabled")); + } + + @Parameterized.Parameters + public static Collection getConfigurations() { + Object[][] data = new Object[][] { {true}, {false} }; + return Arrays.asList(data); + } + + @Test + public void testByteBufferIOEngine() throws Exception { + int capacity = 32 * 1024 * 1024; // 32 MB + int testNum = 100; + int maxBlockSize = 64 * 1024; + ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity, + 2 * 1024 * 1024, isDirect); + int testOffsetAtStartNum = testNum / 10; + int testOffsetAtEndNum = testNum / 10; + for (int i = 0; i < testNum; i++) { + byte val = (byte) (Math.random() * 255); + int blockSize = (int) (Math.random() * maxBlockSize); + byte[] byteArray = new byte[blockSize]; + for (int j = 0; j < byteArray.length; ++j) { + byteArray[j] = val; + } + int offset = 0; + if (testOffsetAtStartNum > 0) { + testOffsetAtStartNum--; + offset = 0; + } else if (testOffsetAtEndNum > 0) { + testOffsetAtEndNum--; + offset = capacity - blockSize; + } else { + offset = (int) (Math.random() * (capacity - maxBlockSize)); + } + ioEngine.write(byteArray, offset); + byte[] dst = new byte[blockSize]; + ioEngine.read(dst, offset); + for (int j = 0; j < byteArray.length; ++j) { + assertTrue(byteArray[j] == dst[j]); + } + } + assert testOffsetAtStartNum == 0; + assert testOffsetAtEndNum == 0; + } +} Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (revision 1531202) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (working copy) @@ -284,7 +284,7 @@ int numBlocksRead = 0; long pos = 0; while (pos < totalSize) { - b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread); + b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread, false); b.sanityCheck(); pos += block.length; numBlocksRead++; @@ -322,7 +322,7 @@ FSDataInputStream is = fs.open(path); HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, algo, totalSize); - HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + HFileBlock b = hbr.readBlockData(0, -1, -1, pread, false); is.close(); assertEquals(0, HFile.getChecksumFailuresCount()); @@ -335,13 +335,15 @@ if (algo == GZ) { is = fs.open(path); hbr = new HFileBlock.FSReaderV2(is, algo, totalSize); - b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS + - b.totalChecksumBytes(), -1, pread); + b = hbr + .readBlockData(0, + 2173 + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS + b.totalChecksumBytes(), -1, pread, + false); assertEquals(blockStr, b.toString()); int wrongCompressedSize = 2172; try { - b = hbr.readBlockData(0, wrongCompressedSize - + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS, -1, pread); + b = hbr.readBlockData(0, wrongCompressedSize + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS, + -1, pread, false); fail("Exception expected"); } catch (IOException ex) { String expectedPrefix = "On-disk size without header provided is " @@ -399,7 +401,7 @@ HFileBlock b; int pos = 0; for (int blockId = 0; blockId < numBlocks; ++blockId) { - b = hbr.readBlockData(pos, -1, -1, pread); + b = hbr.readBlockData(pos, -1, -1, pread, false); assertEquals(0, HFile.getChecksumFailuresCount()); b.sanityCheck(); pos += b.getOnDiskSizeWithHeader(); @@ -524,7 +526,7 @@ if (detailedLogging) { LOG.info("Reading block #" + i + " at offset " + curOffset); } - HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread); + HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread, false); if (detailedLogging) { LOG.info("Block #" + i + ": " + b); } @@ -538,8 +540,8 @@ // Now re-load this block knowing the on-disk size. This tests a // different branch in the loader. - HFileBlock b2 = hbr.readBlockData(curOffset, - b.getOnDiskSizeWithHeader(), -1, pread); + HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), -1, pread, + false); b2.sanityCheck(); assertEquals(b.getBlockType(), b2.getBlockType()); @@ -647,7 +649,7 @@ HFileBlock b; try { long onDiskSizeArg = withOnDiskSize ? expectedSize : -1; - b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread); + b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread, false); } catch (IOException ex) { LOG.error("Error in client " + clientId + " trying to read block at " + offset + ", pread=" + pread + ", withOnDiskSize=" + Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java (revision 1531202) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java (working copy) @@ -187,7 +187,7 @@ int numBlocksRead = 0; long pos = 0; while (pos < totalSize) { - b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread); + b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread, false); b.sanityCheck(); pos += block.length; numBlocksRead++; @@ -221,8 +221,8 @@ FSDataInputStream is = fs.open(path); HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, is, algo, - totalSize, MINOR_VERSION, fs, path); - HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + totalSize, MINOR_VERSION, fs, path, null, null); + HFileBlock b = hbr.readBlockData(0, -1, -1, pread, false); is.close(); b.sanityCheck(); @@ -233,15 +233,15 @@ if (algo == GZ) { is = fs.open(path); - hbr = new HFileBlock.FSReaderV2(is, is, algo, totalSize, MINOR_VERSION, - fs, path); - b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE_NO_CHECKSUM + - b.totalChecksumBytes(), -1, pread); + hbr = new HFileBlock.FSReaderV2(is, is, algo, totalSize, MINOR_VERSION, fs, path, null, + null); + b = hbr.readBlockData(0, + 2173 + HFileBlock.HEADER_SIZE_NO_CHECKSUM + b.totalChecksumBytes(), -1, pread, false); assertEquals(blockStr, b.toString()); int wrongCompressedSize = 2172; try { - b = hbr.readBlockData(0, wrongCompressedSize - + HFileBlock.HEADER_SIZE_NO_CHECKSUM, -1, pread); + b = hbr.readBlockData(0, wrongCompressedSize + HFileBlock.HEADER_SIZE_NO_CHECKSUM, -1, + pread, false); fail("Exception expected"); } catch (IOException ex) { String expectedPrefix = "On-disk size without header provided is " @@ -291,15 +291,15 @@ os.close(); FSDataInputStream is = fs.open(path); - HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, is, algo, - totalSize, MINOR_VERSION, fs, path); + HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, is, algo, totalSize, + MINOR_VERSION, fs, path, null, null); hbr.setDataBlockEncoder(dataBlockEncoder); hbr.setIncludesMemstoreTS(includesMemstoreTS); HFileBlock b; int pos = 0; for (int blockId = 0; blockId < numBlocks; ++blockId) { - b = hbr.readBlockData(pos, -1, -1, pread); + b = hbr.readBlockData(pos, -1, -1, pread, false); b.sanityCheck(); pos += b.getOnDiskSizeWithHeader(); Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java (revision 1531202) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java (working copy) @@ -158,10 +158,9 @@ assertEquals(useChecksums?1:0, trailer.getMinorVersion()); assertEquals(entryCount, trailer.getEntryCount()); - HFileBlock.FSReader blockReader = - new HFileBlock.FSReaderV2(fsdis,fsdis, compressAlgo, fileSize, - this.useChecksums?HFileReaderV2.MAX_MINOR_VERSION:HFileReaderV2.MIN_MINOR_VERSION, - null, null); + HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(fsdis, fsdis, compressAlgo, + fileSize, this.useChecksums ? HFileReaderV2.MAX_MINOR_VERSION + : HFileReaderV2.MIN_MINOR_VERSION, null, null, null, null); // Comparator class name is stored in the trailer in version 2. RawComparator comparator = trailer.createComparator(); HFileBlockIndex.BlockIndexReader dataBlockIndexReader = @@ -206,7 +205,7 @@ fsdis.seek(0); long curBlockPos = 0; while (curBlockPos <= trailer.getLastDataBlockOffset()) { - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false, false); assertEquals(BlockType.DATA, block.getBlockType()); ByteBuffer buf = block.getBufferWithoutHeader(); while (buf.hasRemaining()) { @@ -249,7 +248,7 @@ while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + trailer.getLoadOnOpenDataOffset()); - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false, false); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); block.readInto(t); Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java (working copy) @@ -0,0 +1,270 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests L2 bucket cache for correctness + */ +@Category(MediumTests.class) +@RunWith(Parameterized.class) +public class TestL2BucketCache { + + private static final Log LOG = LogFactory.getLog(TestL2BucketCache.class); + + private static final int DATA_BLOCK_SIZE = 2048; + private static final int NUM_KV = 25000; + private static final int INDEX_BLOCK_SIZE = 512; + private static final int BLOOM_BLOCK_SIZE = 4096; + private static final StoreFile.BloomType BLOOM_TYPE = + StoreFile.BloomType.ROWCOL; + + private static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + private static final HFileDataBlockEncoderImpl ENCODER = + new HFileDataBlockEncoderImpl(DataBlockEncoding.PREFIX); + + private BucketCache underlyingCache; + private MockedL2Cache mockedL2Cache; + + private Configuration conf; + private CacheConfig cacheConf; + private FileSystem fs; + private Path storeFilePath; + + private final Random rand = new Random(12983177L); + private final String ioEngineName; + + public TestL2BucketCache(String ioEngineName) { + this.ioEngineName = ioEngineName; + } + + @Parameterized.Parameters + public static Collection getConfiguration() { + Object[][] data = new Object[][] { {"heap"}, {"offheap"}}; + return Arrays.asList(data); + } + + @Before + public void setUp() throws IOException { + conf = TEST_UTIL.getConfiguration(); + conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); + conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE); + conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, + BLOOM_BLOCK_SIZE); + conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true); + conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, true); + conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, true); + conf.setBoolean(CacheConfig.L2_CACHE_BLOCKS_ON_FLUSH_KEY, true); + underlyingCache = new BucketCache(ioEngineName, + 32 * DATA_BLOCK_SIZE * 1024, + BucketCache.DEFAULT_WRITER_QUEUE_ITEMS, + BucketCache.DEFAULT_WRITER_QUEUE_ITEMS, + BucketCache.DEFAULT_ERROR_TOLERATION_DURATION, + CacheConfig.DEFAULT_L2_BUCKET_CACHE_BUCKET_SIZES, + conf); + mockedL2Cache = new MockedL2Cache(underlyingCache); + + + fs = FileSystem.get(conf); + cacheConf = new CacheConfig.CacheConfigBuilder(conf) + .withL2Cache(mockedL2Cache) + .build(); + } + + @After + public void tearDown() { + underlyingCache.shutdown(); + } + + // Tests cache on write: when writing to an HFile, the data being written + // should also be placed in the L2 cache. + @Test + public void testCacheOnWrite() throws Exception { + writeStoreFile(); + DataBlockEncoding encodingInCache = ENCODER.getEncodingInCache(); + HFileReaderV2 reader = (HFileReaderV2) HFile.createReaderWithEncoding(fs, + storeFilePath, cacheConf, encodingInCache); + HFileScanner scanner = reader.getScanner(false, false); + assertTrue(scanner.seekTo()); + long offset = 0; + long cachedCount = 0; + while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { + mockedL2Cache.enableReads.set(false); + HFileBlock blockFromDisk; + try { + blockFromDisk = reader.readBlock(offset, -1, false, false, false, null); + } finally { + mockedL2Cache.enableReads.set(true); + } + boolean isInL1Lcache = cacheConf.getBlockCache().getBlock( + new BlockCacheKey(reader.getName(), offset, encodingInCache, + blockFromDisk.getBlockType()), true, true) != null; + if (isInL1Lcache) { + cachedCount++; + byte[] blockFromCacheRaw = + mockedL2Cache.getRawBlock(reader.getName(), offset); + assertNotNull("All blocks in l1 cache, should also be in l2 cache: " + + blockFromDisk.toString(), blockFromCacheRaw); + HFileBlock blockFromL2Cache = HFileBlock.fromBytes(blockFromCacheRaw, + Compression.Algorithm.GZ, true, offset, reader.getTrailer().getMinorVersion()); + assertEquals("Data in block from disk (" + blockFromDisk + + ") should match data in block from cache (" + blockFromL2Cache + + ").", blockFromL2Cache.getBufferWithHeader(), + blockFromDisk.getBufferWithHeader()); + assertEquals(blockFromDisk, blockFromL2Cache); + } + offset += blockFromDisk.getOnDiskSizeWithHeader(); + } + assertTrue("> 0 blocks must be cached in L2Cache", cachedCount > 0); + } + + // Tests cache on read: when blocks are read from an HFile they should + // be cached in the L2 cache. + @Test + public void testCacheOnRead() throws Exception { + writeStoreFile(); + DataBlockEncoding encodingInCache = ENCODER.getEncodingInCache(); + HFileReaderV2 reader = (HFileReaderV2) HFile.createReaderWithEncoding(fs, + storeFilePath, cacheConf, encodingInCache); + long offset = 0; + cacheConf.getBlockCache().clearCache(); + underlyingCache.clearCache(); + while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { + HFileBlock blockFromDisk = reader.readBlock(offset, -1, true, false, false, null); + assertNotNull(mockedL2Cache.getRawBlock(reader.getName(), offset)); + cacheConf.getBlockCache().evictBlock(new BlockCacheKey(reader.getName(), + offset, encodingInCache, blockFromDisk.getBlockType())); + HFileBlock blockFromL2Cache = reader.readBlock(offset, -1, true, false, false, null); + assertEquals("Data in block from disk (" + blockFromDisk + + ") should match data in block from cache (" + blockFromL2Cache + + ").", blockFromL2Cache.getBufferWithHeader(), + blockFromDisk.getBufferWithHeader()); + assertEquals(blockFromDisk, blockFromL2Cache); + offset += blockFromDisk.getOnDiskSizeWithHeader(); + } + assertTrue("This test must have read > 0 blocks", offset > 0); + } + + private void writeStoreFile() throws IOException { + Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "test_cache_on_write"); + StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs, + DATA_BLOCK_SIZE) + .withOutputDir(storeFileParentDir) + .withCompression(Compression.Algorithm.GZ) + .withDataBlockEncoder(ENCODER) + .withComparator(KeyValue.COMPARATOR) + .withBloomType(BLOOM_TYPE) + .withMaxKeyCount(NUM_KV) + .build(); + + final int rowLen = 32; + for (int i = 0; i < NUM_KV; ++i) { + byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i); + byte[] v = TestHFileWriterV2.randomValue(rand); + int cfLen = rand.nextInt(k.length - rowLen + 1); + KeyValue kv = new KeyValue( + k, 0, rowLen, + k, rowLen, cfLen, + k, rowLen + cfLen, k.length - rowLen - cfLen, + rand.nextLong(), + CreateRandomStoreFile.generateKeyType(rand), + v, 0, v.length); + sfw.append(kv); + } + sfw.close(); + storeFilePath = sfw.getPath(); + } + + // Mocked implementation which allows reads to be enabled and disabled + // at run time during the tests. Adds additional trace logging that can + // enabled during unit tests for further debugging. + private static class MockedL2Cache implements L2Cache { + + final L2BucketCache underlying; + final AtomicBoolean enableReads = new AtomicBoolean(true); + + MockedL2Cache(BucketCache underlying) throws IOException { + this.underlying = new L2BucketCache(underlying); + } + + @Override + public byte[] getRawBlock(String hfileName, long dataBlockOffset) { + byte[] ret = null; + if (enableReads.get()) { + ret = underlying.getRawBlock(hfileName, dataBlockOffset); + if (LOG.isTraceEnabled()) { + LOG.trace("Cache " + (ret == null ?"miss":"hit") + + " for hfileName=" + hfileName + ", offset=" + dataBlockOffset); + } + } + return ret; + } + + @Override + public void cacheRawBlock(String hfileName, long dataBlockOffset, + byte[] rawBlock) { + if (LOG.isTraceEnabled()) { + LOG.trace("Caching " + rawBlock.length + " bytes, hfileName=" + + hfileName + ", offset=" + dataBlockOffset); + } + underlying.cacheRawBlock(hfileName, dataBlockOffset, rawBlock); + } + + @Override + public int evictBlocksByHfileName(String hfileName) { + return underlying.evictBlocksByHfileName(hfileName); + } + + @Override + public void shutdown() { + underlying.shutdown(); + } + } +} Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java (revision 1531202) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java (working copy) @@ -102,7 +102,7 @@ FSDataInputStream is = fs.open(path); HFileBlock.FSReader hbr = new FSReaderV2Test(is, algo, totalSize, HFile.MAX_FORMAT_VERSION, fs, path); - HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + HFileBlock b = hbr.readBlockData(0, -1, -1, pread, false); b.sanityCheck(); assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(algo == GZ ? 2173 : 4936, @@ -123,17 +123,17 @@ // requests. Verify that this is correct. for (int i = 0; i < HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) { - b = hbr.readBlockData(0, -1, -1, pread); + b = hbr.readBlockData(0, -1, -1, pread, false); assertEquals(0, HFile.getChecksumFailuresCount()); } // The next read should have hbase checksum verification reanabled, // we verify this by assertng that there was a hbase-checksum failure. - b = hbr.readBlockData(0, -1, -1, pread); + b = hbr.readBlockData(0, -1, -1, pread, false); assertEquals(1, HFile.getChecksumFailuresCount()); // Since the above encountered a checksum failure, we switch // back to not checking hbase checksums. - b = hbr.readBlockData(0, -1, -1, pread); + b = hbr.readBlockData(0, -1, -1, pread, false); assertEquals(0, HFile.getChecksumFailuresCount()); is.close(); @@ -145,7 +145,7 @@ is = newfs.open(path); hbr = new FSReaderV2Test(is, algo, totalSize, HFile.MAX_FORMAT_VERSION, newfs, path); - b = hbr.readBlockData(0, -1, -1, pread); + b = hbr.readBlockData(0, -1, -1, pread, false); is.close(); b.sanityCheck(); assertEquals(4936, b.getUncompressedSizeWithoutHeader()); @@ -208,8 +208,8 @@ FSDataInputStream is = fs.open(path); FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path); HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, nochecksum, - algo, totalSize, HFile.MAX_FORMAT_VERSION, hfs, path); - HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + algo, totalSize, HFile.MAX_FORMAT_VERSION, hfs, path, null, null); + HFileBlock b = hbr.readBlockData(0, -1, -1, pread, false); is.close(); b.sanityCheck(); assertEquals(dataSize, b.getUncompressedSizeWithoutHeader()); @@ -262,7 +262,7 @@ long fileSize, int minorVersion, FileSystem fs, Path path) throws IOException { super(istream, istream, algo, fileSize, minorVersion, - (HFileSystem)fs, path); + (HFileSystem)fs, path, null, null); } @Override Index: src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java (revision 1531202) +++ src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java (working copy) @@ -74,7 +74,7 @@ allKeyValues.add(current); } - storeFile.closeReader(cacheConf.shouldEvictOnClose()); + storeFile.closeReader(cacheConf.shouldEvictOnClose(), cacheConf.shouldL2EvictOnClose()); // pick seeks by random List seeks = new ArrayList(); @@ -135,7 +135,7 @@ double seeksPerSec = (seeks.size() * NANOSEC_IN_SEC) / (finishSeeksTime - startSeeksTime); - storeFile.closeReader(cacheConf.shouldEvictOnClose()); + storeFile.closeReader(cacheConf.shouldEvictOnClose(), cacheConf.shouldL2EvictOnClose()); clearBlockCache(); System.out.println(blockEncoder); Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (working copy) @@ -773,12 +773,15 @@ /** Block cache, or null if cache-on-write is disabled */ private BlockCache blockCache; + /** L2Cache, or null if cache-on-write is disabled */ + private L2Cache l2Cache; + /** Name to use for computing cache keys */ private String nameForCaching; /** Creates a single-level block index writer */ public BlockIndexWriter() { - this(null, null, null); + this(null, null, null, null); singleLevelOnly = true; } @@ -790,14 +793,14 @@ * on write into this block cache. */ public BlockIndexWriter(HFileBlock.Writer blockWriter, - BlockCache blockCache, String nameForCaching) { - if ((blockCache == null) != (nameForCaching == null)) { - throw new IllegalArgumentException("Block cache and file name for " + - "caching must be both specified or both null"); + BlockCache blockCache, L2Cache l2Cache, String nameForCaching) { + if (nameForCaching == null && (blockCache != null || l2Cache != null)) { + throw new IllegalArgumentException("If BlockCache OR L2Cache are " + + " not null, then nameForCaching must NOT be null"); } - this.blockWriter = blockWriter; this.blockCache = blockCache; + this.l2Cache = l2Cache; this.nameForCaching = nameForCaching; this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE; } @@ -958,6 +961,9 @@ beginOffset, DataBlockEncoding.NONE, blockForCaching.getBlockType()), blockForCaching); } + if (l2Cache != null) { + l2Cache.cacheRawBlock(nameForCaching, beginOffset, blockWriter.getHeaderAndData()); + } // Add intermediate index block size totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader(); Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java (working copy) @@ -0,0 +1,450 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.ConditionUtil; + +/** + * This class is used to allocate a block with specified size and free the block + * when evicting. It manages an array of buckets, each bucket is associated with + * a size and caches elements up to this size. For completely empty bucket, this + * size could be re-specified dynamically. + * + * This class is not thread safe. + */ +public final class BucketAllocator { + static final Log LOG = LogFactory.getLog(BucketAllocator.class); + + final private static class Bucket { + private final int[] bucketSizes; + private final long bucketCapacity; + private long baseOffset; + private int itemAllocationSize, sizeIndex; + private int itemCount; + private int freeList[]; + private int freeCount, usedCount; + + public Bucket(int[] bucketSizes, long bucketCapacity, long offset) { + this.bucketSizes = bucketSizes; + this.bucketCapacity = bucketCapacity; + baseOffset = offset; + sizeIndex = -1; + } + + void reconfigure(int sizeIndex) { + this.sizeIndex = Preconditions.checkPositionIndex(sizeIndex, + bucketSizes.length); + itemAllocationSize = bucketSizes[sizeIndex]; + itemCount = (int) ((bucketCapacity) / (long) itemAllocationSize); + freeCount = itemCount; + usedCount = 0; + freeList = new int[itemCount]; + for (int i = 0; i < freeCount; ++i) + freeList[i] = i; + } + + public boolean isUninstantiated() { + return sizeIndex == -1; + } + + public int sizeIndex() { + return sizeIndex; + } + + public int itemAllocationSize() { + return itemAllocationSize; + } + + public boolean hasFreeSpace() { + return freeCount > 0; + } + + public boolean isCompletelyFree() { + return usedCount == 0; + } + + public int freeCount() { + return freeCount; + } + + public int usedCount() { + return usedCount; + } + + public int freeBytes() { + return freeCount * itemAllocationSize; + } + + public int usedBytes() { + return usedCount * itemAllocationSize; + } + + public long baseOffset() { + return baseOffset; + } + + /** + * Allocate a block in this bucket, return the offset representing the + * position in physical space + * @return the offset in the IOEngine + */ + public long allocate() { + Preconditions.checkState(freeCount > 0, "No space to allocate!"); + Preconditions.checkState(sizeIndex != -1); + ++usedCount; + return ConditionUtil + .checkPositiveOffset(baseOffset + (freeList[--freeCount] * itemAllocationSize)); + } + + private void free(long offset) { + Preconditions.checkState(usedCount > 0); + Preconditions.checkState(freeCount < itemCount, + "duplicate free, offset: " + offset); + offset = ConditionUtil.checkOffset(offset - baseOffset, + itemCount * itemAllocationSize); + Preconditions.checkState(offset % itemAllocationSize == 0); + int item = (int) (offset / (long) itemAllocationSize); + Preconditions.checkState(!freeListContains(item), "Item at " + offset + + " already on freelist!"); + + --usedCount; + freeList[freeCount++] = item; + } + + private boolean freeListContains(int blockNo) { + for (int i = 0; i < freeCount; ++i) { + if (freeList[i] == blockNo) return true; + } + return false; + } + } + + public final class BucketSizeInfo { + // Free bucket means it has space to allocate a block; + // Completely free bucket means it has no block. + private List bucketList, freeBuckets, completelyFreeBuckets; + private int sizeIndex; + + BucketSizeInfo(int sizeIndex) { + bucketList = new ArrayList(); + freeBuckets = new ArrayList(); + completelyFreeBuckets = new ArrayList(); + this.sizeIndex = sizeIndex; + } + + public void instantiateBucket(Bucket b) { + Preconditions.checkArgument(b.isUninstantiated() || b.isCompletelyFree()); + b.reconfigure(sizeIndex); + bucketList.add(b); + freeBuckets.add(b); + completelyFreeBuckets.add(b); + } + + public int sizeIndex() { + return sizeIndex; + } + + /** + * Find a bucket to allocate a block + * @return the offset in the IOEngine + */ + public long allocateBlock() { + Bucket b = null; + if (freeBuckets.size() > 0) // Use up an existing one first... + b = freeBuckets.get(freeBuckets.size() - 1); + if (b == null) { + b = grabGlobalCompletelyFreeBucket(); + if (b != null) instantiateBucket(b); + } + if (b == null) return -1; + long result = b.allocate(); + blockAllocated(b); + return result; + } + + void blockAllocated(Bucket b) { + if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b); + if (!b.hasFreeSpace()) freeBuckets.remove(b); + } + + public Bucket findAndRemoveCompletelyFreeBucket() { + Bucket b = null; + Preconditions.checkState(bucketList.size() > 0); + if (bucketList.size() == 1) { + // So we never get complete starvation of a bucket for a size + return null; + } + + if (completelyFreeBuckets.size() > 0) { + b = completelyFreeBuckets.get(0); + removeBucket(b); + } + return b; + } + + private void removeBucket(Bucket b) { + Preconditions.checkArgument(b.isCompletelyFree()); + bucketList.remove(b); + freeBuckets.remove(b); + completelyFreeBuckets.remove(b); + } + + public void freeBlock(Bucket b, long offset) { + Preconditions.checkArgument(bucketList.contains(b)); + // else we shouldn't have anything to free... + Preconditions.checkArgument(!completelyFreeBuckets.contains(b), + "nothing to free!"); + b.free(offset); + if (!freeBuckets.contains(b)) freeBuckets.add(b); + if (b.isCompletelyFree()) completelyFreeBuckets.add(b); + } + + public IndexStatistics statistics() { + long free = 0, used = 0; + for (Bucket b : bucketList) { + free += b.freeCount(); + used += b.usedCount(); + } + return new IndexStatistics(free, used, bucketSizes[sizeIndex]); + } + } + + private final int bucketSizes[]; + + /** + * Round up the given block size to bucket size, and get the corresponding + * BucketSizeInfo + * @param blockSize + * @return BucketSizeInfo + */ + public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) { + for (int i = 0; i < bucketSizes.length; ++i) + if (blockSize <= bucketSizes[i]) + return bucketSizeInfos[i]; + return null; + } + + + static public final int FEWEST_ITEMS_IN_BUCKET = 4; + // The capacity size for each bucket + + private final long bucketCapacity; + + private final Bucket[] buckets; + private final BucketSizeInfo[] bucketSizeInfos; + private final long totalSize; + + private long usedSize = 0; + + BucketAllocator(int[] bucketSizes, long availableSpace) throws BucketAllocatorException { + this.bucketSizes = bucketSizes; + int bigItemSize = bucketSizes[bucketSizes.length - 1]; + bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize; + buckets = new Bucket[(int) (availableSpace / bucketCapacity)]; + if (buckets.length < bucketSizes.length) + throw new BucketAllocatorException( + "Bucket allocator size too small - must have room for at least " + + bucketSizes.length + " buckets"); + bucketSizeInfos = new BucketSizeInfo[bucketSizes.length]; + for (int i = 0; i < bucketSizes.length; ++i) { + bucketSizeInfos[i] = new BucketSizeInfo(i); + } + for (int i = 0; i < buckets.length; ++i) { + buckets[i] = new Bucket(bucketSizes, bucketCapacity, bucketCapacity * i); + bucketSizeInfos[i < bucketSizes.length ? i : bucketSizes.length - 1] + .instantiateBucket(buckets[i]); + } + this.totalSize = ((long) buckets.length) * bucketCapacity; + } + + public String getInfo() { + StringBuilder sb = new StringBuilder(1024); + for (int i = 0; i < buckets.length; ++i) { + Bucket b = buckets[i]; + sb.append(" Bucket ").append(i).append(": ").append(b.itemAllocationSize()); + sb.append(" freeCount=").append(b.freeCount()).append(" used=") + .append(b.usedCount()); + sb.append('\n'); + } + return sb.toString(); + } + + public long getUsedSize() { + return this.usedSize; + } + + public long getFreeSize() { + return this.totalSize - getUsedSize(); + } + + public long getTotalSize() { + return this.totalSize; + } + + /** + * Allocate a block with specified size. Return the offset + * @param blockSize size of block + * @throws BucketAllocatorException,CacheFullException + * @return the offset in the IOEngine + */ + public synchronized long allocateBlock(int blockSize) throws CacheFullException, + BucketAllocatorException { + Preconditions.checkArgument(blockSize > 0); + BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize); + if (bsi == null) { + throw new BucketAllocatorException("Allocation too big size=" + blockSize); + } + long offset = bsi.allocateBlock(); + + // Ask caller to free up space and try again! + if (offset < 0) + throw new CacheFullException(blockSize, bsi.sizeIndex()); + usedSize += bucketSizes[bsi.sizeIndex()]; + return offset; + } + + private Bucket grabGlobalCompletelyFreeBucket() { + for (BucketSizeInfo bsi : bucketSizeInfos) { + Bucket b = bsi.findAndRemoveCompletelyFreeBucket(); + if (b != null) return b; + } + return null; + } + + /** + * Free a block with the offset + * @param offset block's offset + * @return size freed + */ + public synchronized int freeBlock(long offset) { + int bucketNo = (int) (offset / bucketCapacity); + Preconditions.checkPositionIndex(bucketNo, buckets.length); + Bucket targetBucket = buckets[bucketNo]; + bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset); + usedSize -= targetBucket.itemAllocationSize(); + return targetBucket.itemAllocationSize(); + } + + public int sizeOfAllocation(long offset) { + int bucketNo = (int) (offset / bucketCapacity); + Preconditions.checkPositionIndex(bucketNo, buckets.length); + Bucket targetBucket = buckets[bucketNo]; + return targetBucket.itemAllocationSize(); + } + + public static class IndexStatistics { + private long freeCount, usedCount, itemSize, totalCount; + + public long freeCount() { + return freeCount; + } + + public long usedCount() { + return usedCount; + } + + public long totalCount() { + return totalCount; + } + + public long freeBytes() { + return freeCount * itemSize; + } + + public long usedBytes() { + return usedCount * itemSize; + } + + public long totalBytes() { + return totalCount * itemSize; + } + + public long itemSize() { + return itemSize; + } + + public IndexStatistics(long free, long used, long itemSize) { + setTo(free, used, itemSize); + } + + public IndexStatistics() { + setTo(-1, -1, 0); + } + + public void setTo(long free, long used, long itemSize) { + this.itemSize = itemSize; + this.freeCount = free; + this.usedCount = used; + this.totalCount = free + used; + } + } + + public void dumpToLog() { + logStatistics(); + StringBuilder sb = new StringBuilder(); + for (Bucket b : buckets) { + sb.append("Bucket:").append(b.baseOffset).append('\n'); + sb.append(" Size index: " + b.sizeIndex() + "; Free:" + b.freeCount + + "; used:" + b.usedCount + "; freelist\n"); + for (int i = 0; i < b.freeCount(); ++i) + sb.append(b.freeList[i]).append(','); + sb.append('\n'); + } + LOG.info(sb); + } + + public void logStatistics() { + IndexStatistics total = new IndexStatistics(); + IndexStatistics[] stats = getIndexStatistics(total); + LOG.info("Bucket allocator statistics follow:\n"); + LOG.info(" Free bytes=" + total.freeBytes() + "+; used bytes=" + + total.usedBytes() + "; total bytes=" + total.totalBytes()); + for (IndexStatistics s : stats) { + LOG.info(" Object size " + s.itemSize() + " used=" + s.usedCount() + + "; free=" + s.freeCount() + "; total=" + s.totalCount()); + } + } + + public IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) { + IndexStatistics[] stats = getIndexStatistics(); + long totalfree = 0, totalused = 0; + for (IndexStatistics stat : stats) { + totalfree += stat.freeBytes(); + totalused += stat.usedBytes(); + } + grandTotal.setTo(totalfree, totalused, 1); + return stats; + } + + public IndexStatistics[] getIndexStatistics() { + IndexStatistics[] stats = new IndexStatistics[bucketSizes.length]; + for (int i = 0; i < stats.length; ++i) + stats[i] = bucketSizeInfos[i].statistics(); + return stats; + } +} + Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java (working copy) @@ -0,0 +1,61 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.io.hfile.CacheStats; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Class that implements cache metrics for bucket cache. + */ +public class BucketCacheStats extends CacheStats { + private static final int NS_PER_SECOND = 1000000; + + private final AtomicLong ioHitCount = new AtomicLong(0); + private final AtomicLong ioHitTime = new AtomicLong(0); + + private volatile long lastLogTime = EnvironmentEdgeManager.currentTimeMillis(); + + public void ioHit(long time) { + ioHitCount.incrementAndGet(); + ioHitTime.addAndGet(time); + } + + public long getIOHitsPerSecond() { + long now = EnvironmentEdgeManager.currentTimeMillis(); + long took = (now - lastLogTime) / 1000; + lastLogTime = now; + return took == 0 ? 0 : ioHitCount.get() / took; + } + + public double getIOTimePerHit() { + long time = ioHitTime.get() / NS_PER_SECOND; + long count = ioHitCount.get(); + return count == 0 ? 0.0 : ((double) time / (double) count); + } + + public void reset() { + ioHitCount.set(0); + ioHitTime.set(0); + } +} + Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java (working copy) @@ -0,0 +1,59 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A class implementing IOEngine interface could support data services for + * {@link BucketCache}. + */ +public interface IOEngine { + + /** + * Transfers data from IOEngine to the given byte buffer + * @param dst the given byte array into which bytes are to be written + * @param offset The offset in the IO engine where the first byte to be read + * @throws IOException + */ + void read(byte[] dst, long offset) throws IOException; + + /** + * Transfers data from the given byte buffer to IOEngine + * @param src the given byte array from which bytes are to be read + * @param offset The offset in the IO engine where the first byte to be + * written + * @throws IOException + */ + void write(byte[] src, long offset) throws IOException; + + /** + * Sync the data to IOEngine after writing + * @throws IOException + */ + void sync() throws IOException; + + /** + * Shutdown the IOEngine + */ + void shutdown(); +} Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java (working copy) @@ -0,0 +1,34 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; + + +/** + * Thrown by {@link BucketAllocator} + */ +public class BucketAllocatorException extends IOException { + private static final long serialVersionUID = 2479119906660788096L; + + BucketAllocatorException(String reason) { + super(reason); + } +} + Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java (working copy) @@ -0,0 +1,120 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + + +import java.util.Comparator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; + +import com.google.common.collect.MinMaxPriorityQueue; + +/** + * A memory-bound queue that will grow until an element brings total size larger + * than maxSize. From then on, only entries that are sorted larger than the + * smallest current entry will be inserted/replaced. + * + *

+ * Use this when you want to find the largest elements (according to their + * ordering, not their heap size) that consume as close to the specified maxSize + * as possible. Default behavior is to grow just above rather than just below + * specified max. + */ +public class CachedEntryQueue { + + private MinMaxPriorityQueue> queue; + + private long cacheSize; + private long maxSize; + + /** + * @param maxSize the target size of elements in the queue + * @param blockSize expected average size of blocks + */ + public CachedEntryQueue(long maxSize, long blockSize) { + int initialSize = (int) (maxSize / blockSize); + if (initialSize == 0) + initialSize++; + queue = MinMaxPriorityQueue + .orderedBy(new Comparator>() { + public int compare(Entry entry1, + Entry entry2) { + return entry1.getValue().compareTo(entry2.getValue()); + } + + }).expectedSize(initialSize).create(); + cacheSize = 0; + this.maxSize = maxSize; + } + + /** + * Attempt to add the specified entry to this queue. + * + *

+ * If the queue is smaller than the max size, or if the specified element is + * ordered after the smallest element in the queue, the element will be added + * to the queue. Otherwise, there is no side effect of this call. + * @param entry a bucket entry with key to try to add to the queue + */ + public void add(Map.Entry entry) { + if (cacheSize < maxSize) { + queue.add(entry); + cacheSize += entry.getValue().getLength(); + } else { + BucketEntry head = queue.peek().getValue(); + if (entry.getValue().compareTo(head) > 0) { + cacheSize += entry.getValue().getLength(); + cacheSize -= head.getLength(); + if (cacheSize > maxSize) { + queue.poll(); + } else { + cacheSize += head.getLength(); + } + queue.add(entry); + } + } + } + + /** + * @return The next element in this queue, or {@code null} if the queue is + * empty. + */ + public Map.Entry poll() { + return queue.poll(); + } + + /** + * @return The last element in this queue, or {@code null} if the queue is + * empty. + */ + public Map.Entry pollLast() { + return queue.pollLast(); + } + + /** + * Total size of all elements in this queue. + * @return size of all elements currently in queue, in bytes + */ + public long cacheSize() { + return cacheSize; + } +} Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java (working copy) @@ -0,0 +1,993 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.CacheStats; +import org.apache.hadoop.hbase.io.hfile.CachedBlock; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.util.StringUtils; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A modified version of BucketCache (by TaoBao/FusionIO) imported from + * HBASE-7404 patch. Simplified to only handle byte arrays (for use as a + * L2 cache). + */ +public class BucketCache implements HeapSize { + + private static final Log LOG = LogFactory.getLog(BucketCache.class); + + // TODO: these must be configurable + /** Priority buckets */ + private static final float DEFAULT_SINGLE_FACTOR = 0.25f; + private static final float DEFAULT_MULTI_FACTOR = 0.50f; + private static final float DEFAULT_MEMORY_FACTOR = 0.25f; + private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; + + private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; + private static final float DEFAULT_MIN_FACTOR = 0.85f; + + /** Statistics thread */ + private static final int statThreadPeriod = 3 * 60; + + public final static int DEFAULT_WRITER_THREADS = 3; + public final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; + + // Store/read block data + private final IOEngine ioEngine; + + // Store the block in this map before writing it to cache + private final ConcurrentHashMap ramCache; + // In this map, store the block's meta data like offset, length + private final ConcurrentHashMap backingMap; + + /** + * Flag if the cache is enabled or not... We shut it off if there are IO + * errors for some time, so that Bucket IO exceptions/errors don't bring down + * the HBase server. + */ + private volatile boolean cacheEnabled; + + private final ArrayList> writerQueues = + new ArrayList>(); + + private final WriterThread writerThreads[]; + + /** Volatile boolean to track if free space is in process or not */ + private volatile boolean freeInProgress = false; + private final Lock freeSpaceLock = new ReentrantLock(); + + private final AtomicLong realCacheSize = new AtomicLong(0); + private final AtomicLong heapSize = new AtomicLong(0); + /** Current number of cached elements */ + private final AtomicLong blockNumber = new AtomicLong(0); + private final AtomicLong failedBlockAdditions = new AtomicLong(0); + + /** Cache access count (sequential ID) */ + private final AtomicLong accessCount = new AtomicLong(0); + + private final Object[] cacheWaitSignals; + private static final int DEFAULT_CACHE_WAIT_TIME = 50; + + + // Used in test now. If the flag is false and the cache speed is very fast, + // bucket cache will skip some blocks when caching. If the flag is true, we + // will wait blocks flushed to IOEngine for some time when caching + boolean wait_when_cache = false; + + private BucketCacheStats cacheStats = new BucketCacheStats(); + + /** Approximate block size */ + private final long blockSize; + + /** Duration of IO errors tolerated before we disable cache, 1 min as default */ + private final int ioErrorsTolerationDuration; + // 1 min + public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; + // Start time of first IO error when reading or writing IO Engine, it will be + // reset after a successful read/write. + private volatile long ioErrorStartTime = -1; + + /** Minimum buffer size for ByteBufferIOEngine */ + public static final int MIN_BUFFER_SIZE = 4 * 1024 * 1024; + + /** + * A "sparse lock" implementation allowing to lock on a particular block + * identified by offset. The purpose of this is to avoid freeing the block + * which is being read. + * + * TODO:We could extend the IdLock to IdReadWriteLock for better. + */ + private IdLock offsetLock = new IdLock(); + + private final ConcurrentIndex blocksByHFile = + new ConcurrentIndex(new Comparator() { + @Override + public int compare(BlockCacheKey a, BlockCacheKey b) { + if (a.getOffset() == b.getOffset()) { + return 0; + } else if (a.getOffset() < b.getOffset()) { + return -1; + } + return 1; + } + }); + + /** Statistics thread schedule pool (for heavy debugging, could remove) */ + private final ScheduledExecutorService scheduleThreadPool = + Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder() + .setNameFormat("BucketCache Statistics #%d") + .setDaemon(true) + .build()); + + private final int[] bucketSizes; + // Allocate or free space for the block + private final BucketAllocator bucketAllocator; + + // TODO (avf): perhaps use a Builder or a separate config object? + public BucketCache(String ioEngineName, long capacity, int writerThreadNum, + int writerQLen, int ioErrorsTolerationDuration, int[] bucketSizes, + Configuration conf) + throws IOException { + this.bucketSizes = bucketSizes; + this.ioEngine = getIOEngineFromName(ioEngineName, capacity, conf); + this.writerThreads = new WriterThread[writerThreadNum]; + this.cacheWaitSignals = new Object[writerThreadNum]; + long blockNumCapacity = capacity / 16384; + if (blockNumCapacity >= Integer.MAX_VALUE) { + // Enough for about 32TB of cache! + throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); + } + + this.blockSize = StoreFile.DEFAULT_BLOCKSIZE_SMALL; + this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; + + bucketAllocator = new BucketAllocator(bucketSizes, capacity); + for (int i = 0; i < writerThreads.length; ++i) { + writerQueues.add(new ArrayBlockingQueue(writerQLen)); + this.cacheWaitSignals[i] = new Object(); + } + + this.ramCache = new ConcurrentHashMap(); + + this.backingMap = new ConcurrentHashMap((int) blockNumCapacity); + + final String threadName = Thread.currentThread().getName(); + this.cacheEnabled = true; + for (int i = 0; i < writerThreads.length; ++i) { + writerThreads[i] = new WriterThread(writerQueues.get(i), i); + writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); + writerThreads[i].start(); + } + // Run the statistics thread periodically to print the cache statistics log + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), + statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); + LOG.info("Started bucket cache"); + } + + /** + * Get the IOEngine from the IO engine name + * @param ioEngineName Name of the io engine + * @param capacity Maximum capacity of the io engine + * @param conf Optional configuration object for additional parameters + * @return Instance of the correct IOEngine + * @throws IllegalArgumentException If the name of the io engine is invalid + * @throws IOException If there is an error instantiating the io engine + */ + private IOEngine getIOEngineFromName(String ioEngineName, long capacity, + Configuration conf) + throws IOException { + int requestedBufferSize = conf == null ? + CacheConfig.DEFAULT_L2_BUCKET_CACHE_BUFFER_SIZE : + conf.getInt(CacheConfig.L2_BUCKET_CACHE_BUFFER_SIZE_KEY, + CacheConfig.DEFAULT_L2_BUCKET_CACHE_BUFFER_SIZE); + int bufferSize = Math.max(requestedBufferSize, bucketSizes[0]); + boolean isDirect; + if (ioEngineName.startsWith("offheap")) + isDirect = true; + else if (ioEngineName.startsWith("heap")) + isDirect = false; + else + throw new IllegalArgumentException( + "Don't understand io engine name " + ioEngineName + + " for cache. Must be heap or offheap"); + LOG.info("Initiating ByteBufferIOEngine with " + ioEngineName + + " allocation..."); + if (bufferSize != requestedBufferSize) { + LOG.warn("Requested per-buffer size " + requestedBufferSize + + ", but actual per-buffer size will be: " + bufferSize); + } else { + LOG.info("Size per-buffer: " + StringUtils.humanReadableInt(bufferSize)); + } + return new ByteBufferIOEngine(capacity, bufferSize, isDirect); + } + + /** + * Cache the block with the specified name and buffer. + * @param cacheKey block's cache key + * @param buf block buffer + */ + public void cacheBlock(BlockCacheKey cacheKey, byte[] buf) { + cacheBlock(cacheKey, buf, false); + } + + /** + * Cache the block with the specified name and buffer. + * @param cacheKey block's cache key + * @param cachedItem block buffer + * @param inMemory if block is in-memory + */ + public void cacheBlock(BlockCacheKey cacheKey, byte[] cachedItem, boolean inMemory) { + cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); + } + + /** + * Cache the block to ramCache + * @param cacheKey block's cache key + * @param cachedItem block buffer + * @param inMemory if block is in-memory + * @param wait if true, blocking wait when queue is full + */ + public void cacheBlockWithWait(BlockCacheKey cacheKey, byte[] cachedItem, + boolean inMemory, boolean wait) { + if (!cacheEnabled) + return; + + if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) + return; + + /* + * Stuff the entry into the RAM cache so it can get drained to the + * persistent store + */ + RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, + accessCount.incrementAndGet(), inMemory); + ramCache.put(cacheKey, re); + int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); + BlockingQueue bq = writerQueues.get(queueNum); + boolean successfulAddition = bq.offer(re); + if (!successfulAddition && wait) { + synchronized (cacheWaitSignals[queueNum]) { + try { + cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + successfulAddition = bq.offer(re); + } + if (!successfulAddition) { + ramCache.remove(cacheKey); + failedBlockAdditions.incrementAndGet(); + } else { + this.blockNumber.incrementAndGet(); + this.heapSize.addAndGet(cachedItem.length); + blocksByHFile.put(cacheKey.getHfileName(), cacheKey); + } + } + + + /** + * Get the buffer of the block with the specified key. + * @param key block's cache key + * @param caching true if the caller caches blocks on cache misses + */ + public byte[] getBlock(BlockCacheKey key, boolean caching) { + return getBlock(key, caching, false); + } + + /** + * Get the buffer of the block with the specified key. + * @param key block's cache key + * @param caching true if the caller caches blocks on cache misses + * @param repeat Whether this is a repeat lookup for the same block + * @return buffer of specified cache key, or null if not in cache + */ + public byte[] getBlock(BlockCacheKey key, boolean caching, boolean repeat) { + if (!cacheEnabled) + return null; + RAMQueueEntry re = ramCache.get(key); + if (re != null) { + cacheStats.hit(caching); + re.access(accessCount.incrementAndGet()); + return re.getData(); + } + BucketEntry bucketEntry = backingMap.get(key); + if(bucketEntry!=null) { + long start = System.nanoTime(); + IdLock.Entry lockEntry = null; + try { + lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); + if (bucketEntry.equals(backingMap.get(key))) { + int len = bucketEntry.getLength(); + byte[] bytes = new byte[len]; + ioEngine.read(bytes, bucketEntry.offset()); + long timeTaken = System.nanoTime() - start; + cacheStats.hit(caching); + cacheStats.ioHit(timeTaken); + bucketEntry.access(accessCount.incrementAndGet()); + if (this.ioErrorStartTime > 0) { + ioErrorStartTime = -1; + } + return bytes; + } + } catch (IOException ioex) { + LOG.error("Failed reading block " + key + " from bucket cache", ioex); + checkIOErrorIsTolerated(); + } finally { + if (lockEntry != null) { + offsetLock.releaseLockEntry(lockEntry); + } + } + } + if(!repeat)cacheStats.miss(caching); + return null; + } + + public void clearCache() { + for (BlockCacheKey key: this.backingMap.keySet()) { + evictBlock(key); + } + } + + public boolean evictBlock(BlockCacheKey cacheKey) { + if (!cacheEnabled) return false; + RAMQueueEntry removedBlock = ramCache.remove(cacheKey); + if (removedBlock != null) { + this.blockNumber.decrementAndGet(); + this.heapSize.addAndGet(-1 * removedBlock.getData().length); + } + BucketEntry bucketEntry = backingMap.get(cacheKey); + if (bucketEntry == null) { return false; } + IdLock.Entry lockEntry = null; + try { + lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); + if (bucketEntry.equals(backingMap.remove(cacheKey))) { + bucketAllocator.freeBlock(bucketEntry.offset()); + realCacheSize.addAndGet(-1 * bucketEntry.getLength()); + blocksByHFile.remove(cacheKey.getHfileName(), cacheKey); + if (removedBlock == null) { + this.blockNumber.decrementAndGet(); + } + } else { + return false; + } + } catch (IOException ie) { + LOG.warn("Failed evicting block " + cacheKey); + return false; + } finally { + if (lockEntry != null) { + offsetLock.releaseLockEntry(lockEntry); + } + } + cacheStats.evicted(bucketEntry.getPriority()); + return true; + } + + /* + * Statistics thread. Periodically prints the cache statistics to the log. + */ + private static class StatisticsThread extends Thread { + BucketCache bucketCache; + + public StatisticsThread(BucketCache bucketCache) { + super("BucketCache.StatisticsThread"); + setDaemon(true); + this.bucketCache = bucketCache; + } + @Override + public void run() { + bucketCache.logStats(); + } + } + + public void logStats() { + if (!LOG.isDebugEnabled()) return; + // Log size + long totalSize = bucketAllocator.getTotalSize(); + long usedSize = bucketAllocator.getUsedSize(); + long freeSize = totalSize - usedSize; + long cacheSize = this.realCacheSize.get(); + LOG.debug("BucketCache Stats: " + + "failedBlockAdditions=" + this.failedBlockAdditions.get() + ", " + + "total=" + StringUtils.byteDesc(totalSize) + ", " + + "free=" + StringUtils.byteDesc(freeSize) + ", " + + "usedSize=" + StringUtils.byteDesc(usedSize) +", " + + "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " + + "accesses=" + cacheStats.getRequestCount() + ", " + + "hits=" + cacheStats.getHitCount() + ", " + + "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " + + "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " + + "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : + (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) + + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + + "cachingHits=" + cacheStats.getHitCachingCount() + ", " + + "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : + (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) + + "evictions=" + cacheStats.getEvictionCount() + ", " + + "evicted=" + cacheStats.getEvictedCount() + ", " + + "evictedPerRun=" + cacheStats.evictedPerEviction()); + cacheStats.reset(); + } + + private long acceptableSize() { + return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR); + } + + private long minSize() { + return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MIN_FACTOR); + } + + private long singleSize() { + return (long) Math.floor(bucketAllocator.getTotalSize() + * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR); + } + + private long multiSize() { + return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR + * DEFAULT_MIN_FACTOR); + } + + private long memorySize() { + return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR + * DEFAULT_MIN_FACTOR); + } + + /** + * Free the space if the used size reaches acceptableSize() or one size block + * couldn't be allocated. When freeing the space, we use the LRU algorithm and + * ensure there must be some blocks evicted + */ + private void freeSpace() { + // Ensure only one freeSpace progress at a time + if (!freeSpaceLock.tryLock()) return; + try { + freeInProgress = true; + long bytesToFreeWithoutExtra = 0; + /* + * Calculate free byte for each bucketSizeinfo + */ + StringBuffer msgBuffer = new StringBuffer(); + BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); + long[] bytesToFreeForBucket = new long[stats.length]; + for (int i = 0; i < stats.length; i++) { + bytesToFreeForBucket[i] = 0; + long freeGoal = (long) Math.floor(stats[i].totalCount() + * (1 - DEFAULT_MIN_FACTOR)); + freeGoal = Math.max(freeGoal, 1); + if (stats[i].freeCount() < freeGoal) { + bytesToFreeForBucket[i] = stats[i].itemSize() + * (freeGoal - stats[i].freeCount()); + bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; + msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" + + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); + } + } + msgBuffer.append("Free for total=" + + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); + + if (bytesToFreeWithoutExtra <= 0) { + return; + } + long currentSize = bucketAllocator.getUsedSize(); + long totalSize=bucketAllocator.getTotalSize(); + LOG.debug("Bucket cache free space started; Attempting to " + msgBuffer.toString() + + " of current used=" + StringUtils.byteDesc(currentSize) + + ",actual cacheSize=" + StringUtils.byteDesc(realCacheSize.get()) + + ",total=" + StringUtils.byteDesc(totalSize)); + + long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra + * (1 + DEFAULT_EXTRA_FREE_FACTOR)); + + // Instantiate priority buckets + BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra, + blockSize, singleSize()); + BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra, + blockSize, multiSize()); + BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra, + blockSize, memorySize()); + + // Scan entire map putting bucket entry into appropriate bucket entry + // group + for (Map.Entry bucketEntryWithKey : backingMap.entrySet()) { + switch (bucketEntryWithKey.getValue().getPriority()) { + case SINGLE: { + bucketSingle.add(bucketEntryWithKey); + break; + } + case MULTI: { + bucketMulti.add(bucketEntryWithKey); + break; + } + case MEMORY: { + bucketMemory.add(bucketEntryWithKey); + break; + } + } + } + + PriorityQueue bucketQueue = new PriorityQueue(3); + + bucketQueue.add(bucketSingle); + bucketQueue.add(bucketMulti); + bucketQueue.add(bucketMemory); + + int remainingBuckets = 3; + long bytesFreed = 0; + + BucketEntryGroup bucketGroup; + while ((bucketGroup = bucketQueue.poll()) != null) { + long overflow = bucketGroup.overflow(); + if (overflow > 0) { + long bucketBytesToFree = Math.min(overflow, + (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); + bytesFreed += bucketGroup.free(bucketBytesToFree); + } + remainingBuckets--; + } + + /** + * Check whether need extra free because some bucketSizeinfo still needs + * free space + */ + stats = bucketAllocator.getIndexStatistics(); + boolean needFreeForExtra = false; + for (int i = 0; i < stats.length; i++) { + long freeGoal = (long) Math.floor(stats[i].totalCount() + * (1 - DEFAULT_MIN_FACTOR)); + freeGoal = Math.max(freeGoal, 1); + if (stats[i].freeCount() < freeGoal) { + needFreeForExtra = true; + break; + } + } + + if (needFreeForExtra) { + bucketQueue.clear(); + remainingBuckets = 2; + + bucketQueue.add(bucketSingle); + bucketQueue.add(bucketMulti); + + while ((bucketGroup = bucketQueue.poll()) != null) { + long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) + / remainingBuckets; + bytesFreed += bucketGroup.free(bucketBytesToFree); + remainingBuckets--; + } + } + + if (LOG.isDebugEnabled()) { + long single = bucketSingle.totalSize(); + long multi = bucketMulti.totalSize(); + long memory = bucketMemory.totalSize(); + LOG.debug("Bucket cache free space completed; " + "freed=" + + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + + StringUtils.byteDesc(totalSize) + ", " + "single=" + + StringUtils.byteDesc(single) + ", " + "multi=" + + StringUtils.byteDesc(multi) + ", " + "memory=" + + StringUtils.byteDesc(memory)); + } + + } finally { + cacheStats.evict(); + freeInProgress = false; + freeSpaceLock.unlock(); + } + } + + // This handles flushing the RAM cache to IOEngine. + private class WriterThread extends HasThread { + BlockingQueue inputQueue; + final int threadIdx; + boolean writerEnabled = true; + + WriterThread(BlockingQueue queue, int threadNO) { + super(); + this.inputQueue = queue; + this.threadIdx = threadNO; + setDaemon(true); + } + + // Used for test + void disableWriter() { + this.writerEnabled = false; + } + + public void run() { + List entries = new ArrayList(); + try { + while (cacheEnabled && writerEnabled) { + try { + // Perform a block take first in order to avoid a drainTo() + // on an empty queue looping around and causing starvation. + entries.add(inputQueue.take()); + inputQueue.drainTo(entries); + synchronized (cacheWaitSignals[threadIdx]) { + cacheWaitSignals[threadIdx].notifyAll(); + } + } catch (InterruptedException ie) { + if (!cacheEnabled) break; + } + doDrain(entries); + } + } catch (Throwable t) { + LOG.warn("Failed doing drain", t); + } + LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); + } + + /** + * Flush the entries in ramCache to IOEngine and add bucket entry to + * backingMap + * @param entries + * @throws InterruptedException + */ + private void doDrain(List entries) + throws InterruptedException { + BucketEntry[] bucketEntries = new BucketEntry[entries.size()]; + RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()]; + int done = 0; + while (entries.size() > 0 && cacheEnabled) { + // Keep going in case we throw... + RAMQueueEntry ramEntry = null; + try { + ramEntry = entries.remove(entries.size() - 1); + if (ramEntry == null) { + LOG.warn("Couldn't get the entry from RAM queue, who steals it?"); + continue; + } + BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine, + bucketAllocator, realCacheSize); + ramEntries[done] = ramEntry; + bucketEntries[done++] = bucketEntry; + if (ioErrorStartTime > 0) { + ioErrorStartTime = -1; + } + } catch (BucketAllocatorException fle) { + LOG.warn("Failed allocating for block " + + (ramEntry == null ? "" : ramEntry.getKey()), fle); + } catch (CacheFullException cfe) { + if (!freeInProgress) { + freeSpace(); + } else { + Thread.sleep(50); + } + } catch (IOException ioex) { + LOG.error("Failed writing to bucket cache", ioex); + checkIOErrorIsTolerated(); + } + } + + // Make sure that the data pages we have written are on the media before + // we update the map. + try { + ioEngine.sync(); + } catch (IOException ioex) { + LOG.error("Faild syncing IO engine", ioex); + checkIOErrorIsTolerated(); + // Since we failed sync, free the blocks in bucket allocator + for (int i = 0; i < done; ++i) { + if (bucketEntries[i] != null) { + bucketAllocator.freeBlock(bucketEntries[i].offset()); + } + } + done = 0; + } + + for (int i = 0; i < done; ++i) { + if (bucketEntries[i] != null) { + backingMap.put(ramEntries[i].getKey(), bucketEntries[i]); + } + RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey()); + if (ramCacheEntry != null) { + heapSize.addAndGet(-1 * ramEntries[i].getData().length); + } + } + + if (bucketAllocator.getUsedSize() > acceptableSize()) { + freeSpace(); + } + } + } + + /** + * Check whether we tolerate IO error this time. If the duration of IOEngine + * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the + * cache + */ + private void checkIOErrorIsTolerated() { + long now = EnvironmentEdgeManager.currentTimeMillis(); + if (this.ioErrorStartTime > 0) { + if (cacheEnabled + && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) { + LOG.error("IO errors duration time has exceeded " + + ioErrorsTolerationDuration + + "ms, disabing cache, please check your IOEngine"); + disableCache(); + } + } else { + this.ioErrorStartTime = now; + } + } + + /** + * Used to shut down the cache -or- turn it off in the case of something + * broken. + */ + private void disableCache() { + if (!cacheEnabled) + return; + cacheEnabled = false; + ioEngine.shutdown(); + this.scheduleThreadPool.shutdown(); + for (int i = 0; i < writerThreads.length; ++i) + writerThreads[i].interrupt(); + this.ramCache.clear(); + this.backingMap.clear(); + } + + private void join() throws InterruptedException { + for (int i = 0; i < writerThreads.length; ++i) + writerThreads[i].join(); + } + + public void shutdown() { + disableCache(); + LOG.info("Shutting down bucket cache"); + } + + public CacheStats getStats() { + return cacheStats; + } + + BucketAllocator getAllocator() { + return this.bucketAllocator; + } + + public long heapSize() { + return this.heapSize.get(); + } + + /** + * Returns the total size of the block cache, in bytes. + * @return size of cache, in bytes + */ + public long size() { + return this.realCacheSize.get(); + } + + public long getFreeSize() { + return this.bucketAllocator.getTotalSize() - this.bucketAllocator.getUsedSize(); + } + + public long getBlockCount() { + return this.blockNumber.get(); + } + + public long getEvictedCount() { + return cacheStats.getEvictedCount(); + } + + /** + * Evicts all blocks for a specific HFile. This is an expensive operation + * implemented as a linear-time search through all blocks in the cache. + * Ideally this should be a search in a log-access-time map. + * + *

+ * This is used for evict-on-close to remove all blocks of a specific HFile. + * + * @return the number of blocks evicted + */ + public int evictBlocksByHfileName(String hfileName) { + // Copy the list to avoid ConcurrentModificationException + // as evictBlockKey removes the key from the index + Set keySet = blocksByHFile.values(hfileName); + if (keySet == null) { + return 0; + } + int numEvicted = 0; + List keysForHFile = + ImmutableList.copyOf(keySet); + for (BlockCacheKey key : keysForHFile) { + if (evictBlock(key)) { + ++numEvicted; + } + } + return numEvicted; + } + + + /** + * Item in cache. We expect this to be where most memory goes. Java uses 8 + * bytes just for object headers; after this, we want to use as little as + * possible - so we only use 8 bytes, but in order to do so we end up messing + * around with all this Java casting stuff. Offset stored as 5 bytes that make + * up the long. Doubt we'll see devices this big for ages. Offsets are divided + * by 256. So 5 bytes gives us 256TB or so. + */ + static class BucketEntry implements Serializable, Comparable { + private static final long serialVersionUID = -6741504807982257534L; + private int offsetBase; + private int length; + private byte offset1; + private volatile long accessTime; + private CachedBlock.BlockPriority priority; + + BucketEntry(long offset, int length, long accessTime, boolean inMemory) { + setOffset(offset); + this.length = length; + this.accessTime = accessTime; + if (inMemory) { + this.priority = CachedBlock.BlockPriority.MEMORY; + } else { + this.priority = CachedBlock.BlockPriority.SINGLE; + } + } + + long offset() { // Java has no unsigned numbers + long o = ((long) offsetBase) & 0xFFFFFFFF; + o += (((long) (offset1)) & 0xFF) << 32; + return o << 8; + } + + private void setOffset(long value) { + Preconditions.checkArgument((value & 0xFF) == 0); + value >>= 8; + offsetBase = (int) value; + offset1 = (byte) (value >> 32); + } + + public int getLength() { + return length; + } + + /** + * Block has been accessed. Update its local access time. + */ + public void access(long accessTime) { + this.accessTime = accessTime; + if (this.priority == CachedBlock.BlockPriority.SINGLE) { + this.priority = CachedBlock.BlockPriority.MULTI; + } + } + + public CachedBlock.BlockPriority getPriority() { + return this.priority; + } + + @Override + public int compareTo(BucketEntry that) { + if(this.accessTime == that.accessTime) return 0; + return this.accessTime < that.accessTime ? 1 : -1; + } + + @Override + public boolean equals(Object that) { + return this == that; + } + } + + /** + * Used to group bucket entries into priority buckets. There will be a + * BucketEntryGroup for each priority (single, multi, memory). Once bucketed, + * the eviction algorithm takes the appropriate number of elements out of each + * according to configuration parameters and their relative sizes. + */ + private class BucketEntryGroup implements Comparable { + + private CachedEntryQueue queue; + private long totalSize = 0; + private long bucketSize; + + public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { + this.bucketSize = bucketSize; + queue = new CachedEntryQueue(bytesToFree, blockSize); + totalSize = 0; + } + + public void add(Map.Entry block) { + totalSize += block.getValue().getLength(); + queue.add(block); + } + + /** + * Free specified number of bytes by freeing the least recently used + * buckets entries. + * @param toFree Number of bytes we want to free + * @return Number of bytes freed + */ + public long free(long toFree) { + Map.Entry entry; + long freedBytes = 0; + while ((entry = queue.pollLast()) != null) { + evictBlock(entry.getKey()); + freedBytes += entry.getValue().getLength(); + if (freedBytes >= toFree) { + return freedBytes; + } + } + return freedBytes; + } + + public long overflow() { + return totalSize - bucketSize; + } + + public long totalSize() { + return totalSize; + } + + @Override + public int compareTo(BucketEntryGroup that) { + if (this.overflow() == that.overflow()) + return 0; + return this.overflow() > that.overflow() ? 1 : -1; + } + + @Override + public boolean equals(Object that) { + return this == that; + } + + } + + /** + * Only used in test + * @throws InterruptedException + */ + void stopWriterThreads() throws InterruptedException { + for (WriterThread writerThread : writerThreads) { + writerThread.disableWriter(); + writerThread.interrupt(); + writerThread.join(); + } + } +} Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java (working copy) @@ -0,0 +1,54 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; + + +/** + * Thrown by {@link BucketAllocator#allocateBlock(int)} when cache is full for + * the requested size + */ +public class CacheFullException extends IOException { + private static final long serialVersionUID = 3265127301824638920L; + private int requestedSize, bucketIndex; + + CacheFullException(int requestedSize, int bucketIndex) { + super(); + this.requestedSize = requestedSize; + this.bucketIndex = bucketIndex; + } + + public int bucketIndex() { + return bucketIndex; + } + + public int requestedSize() { + return requestedSize; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(1024); + sb.append("Allocator requested size ").append(requestedSize); + sb.append(" for bucket ").append(bucketIndex); + return sb.toString(); + } +} + Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java (working copy) @@ -0,0 +1,79 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hfile.bucket; + +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Block Entry stored in the memory with key,data and so on + */ +class RAMQueueEntry { + private BlockCacheKey key; + private byte[] data; + private long accessTime; + private boolean inMemory; + + public RAMQueueEntry(BlockCacheKey bck, byte[] data, long accessTime, + boolean inMemory) { + this.key = bck; + this.data = data; + this.accessTime = accessTime; + this.inMemory = inMemory; + } + + public byte[] getData() { + return data; + } + + public BlockCacheKey getKey() { + return key; + } + + public void access(long accessTime) { + this.accessTime = accessTime; + } + + public BucketCache.BucketEntry writeToCache(final IOEngine ioEngine, + final BucketAllocator bucketAllocator, + final AtomicLong realCacheSize) throws CacheFullException, IOException, + BucketAllocatorException { + int len = data.length; + if (len == 0) { + return null; + } + long offset = bucketAllocator.allocateBlock(len); + BucketCache.BucketEntry bucketEntry = new BucketCache.BucketEntry(offset, len, accessTime, + inMemory); + try { + ioEngine.write(data, offset); + } catch (IOException ioe) { + // free it in bucket allocator + bucketAllocator.freeBlock(offset); + throw ioe; + } + + realCacheSize.addAndGet(len); + return bucketEntry; + } +} Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ConcurrentIndex.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ConcurrentIndex.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ConcurrentIndex.java (working copy) @@ -0,0 +1,171 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + + +import com.google.common.base.Supplier; +import com.google.common.collect.Multiset; + +import java.util.Comparator; +import java.util.ConcurrentModificationException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + * A simple concurrent map of sets. This is similar in concept to + * {@link Multiset}, with the following exceptions: + *

    + *
  • The set is thread-safe and concurrent: no external locking or + * synchronization is required. This is important for the use case where + * this class is used to index cached blocks by filename for their + * efficient eviction from cache when the file is closed or compacted.
  • + *
  • The expectation is that all entries may only be removed for a key + * once no more additions of values are being made under that key.
  • + *
+ * @see BucketCache#evictBlocksByHfileName(String) for example use of this + * class. + * @param Key type + * @param Value type + */ +public class ConcurrentIndex { + + /** Container for the sets, indexed by key */ + private final ConcurrentMap> container; + + /** + * A factory that constructs new instances of the sets if no set is + * associated with a given key. + */ + private final Supplier> valueSetFactory; + + /** + * Creates an instance with a specified factory object for sets to be + * associated with a given key. + * @param valueSetFactory The factory instance + */ + public ConcurrentIndex(Supplier> valueSetFactory) { + this.valueSetFactory = valueSetFactory; + this.container = new ConcurrentHashMap>(); + } + + /** + * Creates an instance using the {@link DefaultValueSetFactory} for sets, + * which in turn creates instances of {@link ConcurrentSkipListSet} + * @param valueComparator A {@link Comparator} for value types + */ + public ConcurrentIndex(Comparator valueComparator) { + this(new DefaultValueSetFactory(valueComparator)); + } + + /** + * Associate a new unique value with a specified key. Under the covers, the + * method employs optimistic concurrency: if no set is associated with a + * given key, we create a new set; if another thread comes in, creates, + * and associates a set with the same key in the mean-time, we simply add + * the value to the already created set. + * @param key The key + * @param value An additional unique value we want to associate with a key + */ + public void put(K key, V value) { + Set set = container.get(key); + if (set != null) { + set.add(value); + } else { + set = valueSetFactory.get(); + set.add(value); + Set existing = container.putIfAbsent(key, set); + if (existing != null) { + // If a set is already associated with a key, that means another + // writer has already come in and created the set for the given key. + // Pursuant to an optimistic concurrency policy, in this case we will + // simply add the value to the existing set associated with the key. + existing.add(value); + } + } + } + + /** + * Get all values associated with a specified key or null if no values are + * associated. Note: if the caller wishes to add or removes values + * to under the specified as they're iterating through the returned value, + * they should make a defensive copy; otherwise, a + * {@link ConcurrentModificationException} may be thrown. + * @see BucketCache#evictBlocksByHfileName(String) for example usage + * @param key The key + * @return All values associated with the specified key or null if no values + * are associated with the key. + */ + public Set values(K key) { + return container.get(key); + } + + /** + * Removes the association between a specified key and value. If as a + * result of removing a value a set becomes empty, we remove the given + * set from the mapping as well. + * @param key The specified key + * @param value The value to disassociate with the key + */ + public boolean remove(K key, V value) { + Set set = container.get(key); + boolean success = false; + if (set != null) { + success = set.remove(value); + if (set.isEmpty()) { + container.remove(key); + } + } + return success; + } + + /** + * Default factory class for the sets associated with given keys. Creates + * a {@link ConcurrentSkipListSet} using the comparator passed into the + * constructor. + * @see ConcurrentSkipListSet + * @see Supplier + * @param The value type. Should match value type of the + * ConcurrentIndex instances of this object are passed to. + */ + static class DefaultValueSetFactory implements Supplier> { + private final Comparator comparator; + + /** + * Creates an instance that passes a specified comparator to the + * {@link ConcurrentSkipListSet} + * @param comparator The specified comparator + */ + public DefaultValueSetFactory(Comparator comparator) { + this.comparator = comparator; + } + + /** + * Creates a new {@link ConcurrentSkipListSet} instance using the + * comparator specified when the class instance was constructor. + * @return The instantiated {@link ConcurrentSkipListSet} object + */ + @Override + public Set get() { + return new ConcurrentSkipListSet(comparator); + } + } +} Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java (working copy) @@ -0,0 +1,83 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; + +import org.apache.hadoop.hbase.util.ByteBufferArray; + +/** + * IO engine that stores data on the memory using an array of ByteBuffers + * {@link ByteBufferArray} + */ +public class ByteBufferIOEngine implements IOEngine { + + private ByteBufferArray bufferArray; + + /** + * Construct the ByteBufferIOEngine with the given capacity + * @param capacity + * @param direct true if allocate direct buffer + * @throws IOException + */ + public ByteBufferIOEngine(long capacity, int bufferSize, boolean direct) + throws IOException { + bufferArray = new ByteBufferArray(capacity, bufferSize, direct); + } + + /** + * Transfers data from the buffer array to the given byte buffer + * @param dst the given byte array into which bytes are to be written + * @param offset The offset in the ByteBufferArray of the first byte to be + * read + * @throws IOException + */ + @Override + public void read(byte[] dst, long offset) throws IOException { + bufferArray.getMultiple(offset, dst.length, dst, 0); + } + + /** + * Transfers data from the given byte buffer to the buffer array + * @param src the given byte array from which bytes are to be read + * @param offset The offset in the ByteBufferArray of the first byte to be + * written + * @throws IOException + */ + @Override + public void write(byte[] src, long offset) throws IOException { + bufferArray.putMultiple(offset, src.length, src, 0); + } + + /** + * No operation for the sync in the memory IO engine + */ + @Override + public void sync() { + + } + + /** + * No operation for the shutdown in the memory IO engine + */ + @Override + public void shutdown() { + + } +} Index: src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java (working copy) @@ -0,0 +1,59 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +/** + * Interface for a secondary level block cache that deals with byte arrays + * identical to what is written to disk (i.e., usually encoded and compressed), + * as opposed to HFile objects. + */ +public interface L2Cache { + /** + * Retrieve a block from the L2Cache. The block is retrieved as a byte + * array, in the same exact format as it is stored on disk. + * @param hfileName Filename associated with the block + * @param dataBlockOffset Offset in the file + * @return + */ + public byte[] getRawBlock(String hfileName, long dataBlockOffset); + + + /** + * Add a block to the L2Cache. The block must be represented by a + * byte array identical to what would be written to disk. + * @param hfileName Filename associated with the block + * @param dataBlockOffset Offset in the file + * @param rawBlock The exact byte representation of the block + */ + public void cacheRawBlock(String hfileName, long dataBlockOffset, + byte[] rawBlock); + + /** + * Evict all blocks matching a given filename. This operation should be + * efficient and can be called on each close of a store file. + * @param hfileName Filename whose blocks to evict + */ + public int evictBlocksByHfileName(String hfileName); + + /** + * Shutdown the cache + */ + public void shutdown(); +} Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (working copy) @@ -105,9 +105,10 @@ closeIStream, cacheConf, hfs); trailer.expectMajorVersion(2); validateMinorVersion(path, trailer.getMinorVersion()); - HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis, - fsdisNoFsChecksum, - compressAlgo, fileSize, trailer.getMinorVersion(), hfs, path); + HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis, fsdisNoFsChecksum, + compressAlgo, fileSize, trailer.getMinorVersion(), hfs, path, + cacheConf.isL2CacheEnabled() ? cacheConf.getL2Cache() : null, + cacheConf.isL2CacheEnabled() ? name : null); this.fsBlockReader = fsBlockReaderV2; // upcast // Comparator class name is stored in the trailer in version 2. @@ -219,6 +220,7 @@ BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset, DataBlockEncoding.NONE, BlockType.META); + boolean cacheInL2 = cacheBlock && cacheConf.isL2CacheEnabled(); cacheBlock &= cacheConf.shouldCacheDataOnRead(); if (cacheConf.isBlockCacheEnabled()) { HFileBlock cachedBlock = @@ -233,7 +235,7 @@ } HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, - blockSize, -1, true); + blockSize, -1, true, cacheInL2); passSchemaMetricsTo(metaBlock); final long delta = System.nanoTime() - startTimeNs; @@ -292,6 +294,7 @@ boolean useLock = false; IdLock.Entry lockEntry = null; + HFileBlock cachedBlock = null; try { while (true) { @@ -304,7 +307,7 @@ if (cacheConf.isBlockCacheEnabled()) { // Try and get the block from the block cache. If the useLock variable is true then this // is the second time through the loop and it should not be counted as a block cache miss. - HFileBlock cachedBlock = (HFileBlock) + cachedBlock = (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, useLock); if (cachedBlock != null) { BlockCategory blockCategory = @@ -337,10 +340,39 @@ continue; } - // Load block from filesystem. + // First, check if the block exists in L2 cache + cachedBlock = null; + try { + cachedBlock = getBlockFromL2Cache(name, dataBlockOffset, + expectedBlockType, isCompaction); + } catch (Throwable t) { + // If exception is encountered when attempting to read from the L2 + // cache, we should go on to try to read from disk and log the + // exception. + LOG.warn("Error occured attempting to retrieve from the L2 cache! " + + "[ hfileName = " + name + ", offset = " + dataBlockOffset + + ", expectedBlockType =" + expectedBlockType + ", isCompaction = " + + isCompaction + " ]", t); + } + if (cachedBlock != null) { + if (cacheBlock && cacheConf.shouldCacheBlockOnRead( + cachedBlock.getBlockType().getCategory())) { + // If L1 BlockCache is configured to cache blocks on read, then + // cache the block in the L1 cache. Updates to the L1 cache need to + // happen under a lock, which is why this logic is located here. + // TODO (avf): implement "evict on promotion" to avoid double caching + cacheConf.getBlockCache().cacheBlock(cacheKey, cachedBlock, + cacheConf.isInMemory()); + } + getSchemaMetrics().updateOnCacheHit(cachedBlock.getBlockType().getCategory(), + isCompaction); + // Return early if a block exists in the L2 cache + return cachedBlock; + } + // In case of an L2 cache miss, load block from filesystem. long startTimeNs = System.nanoTime(); HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, - onDiskBlockSize, -1, pread); + onDiskBlockSize, -1, pread, cacheBlock && !isCompaction); hfileBlock = dataBlockEncoder.diskToCacheFormat(hfileBlock, isCompaction); validateBlockType(hfileBlock, expectedBlockType); @@ -372,6 +404,42 @@ } /** + * If the L2 cache is enabled, retrieve the on-disk representation of a + * block (i.e., compressed and encoded byte array) from the L2 cache, + * de-compress, decode, and then construct an in-memory representation of the + * block. + * @param hfileName Name of the HFile that contains the block (used as part + * of the cache key) + * @param offset Offset in the HFile containing the block (used as another + * part of the cache key) + * @param expectedBlockType Expected type of the block + * @param isCompaction Indicates if this is a compaction related read. This + * value is passed along to + * {@link HFileDataBlockEncoder#diskToCacheFormat( + * HFileBlock, boolean)} + * + * @return The constructed and initiated block or null if the L2 cache is + * disabled or if no block is associated with the given filename and + * offset in the L2 cache. + * @throws IOException If we are unable to decompress and decode the block. + */ + public HFileBlock getBlockFromL2Cache(String hfileName, long offset, + BlockType expectedBlockType, boolean isCompaction) throws IOException { + if (cacheConf.isL2CacheEnabled()) { + byte[] bytes = cacheConf.getL2Cache().getRawBlock(hfileName, offset); + if (bytes != null) { + HFileBlock hfileBlock = HFileBlock.fromBytes(bytes, compressAlgo, includesMemstoreTS, + offset, super.getTrailer().getMinorVersion()); + hfileBlock = dataBlockEncoder.diskToCacheFormat(hfileBlock, isCompaction); + validateBlockType(hfileBlock, expectedBlockType); + passSchemaMetricsTo(hfileBlock); + return hfileBlock; + } + } + return null; + } + + /** * Compares the actual type of a block retrieved from cache or disk with its * expected type and throws an exception in case of a mismatch. Expected * block type of {@link BlockType#DATA} is considered to match the actual @@ -424,13 +492,26 @@ } public void close(boolean evictOnClose) throws IOException { - if (evictOnClose && cacheConf.isBlockCacheEnabled()) { + close(evictOnClose, cacheConf.shouldL2EvictOnClose()); + } + + @Override + public void close(boolean evictL1OnClose, boolean evictL2OnClose) + throws IOException { + if (evictL1OnClose && cacheConf.isBlockCacheEnabled()) { int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name); if (LOG.isTraceEnabled()) { LOG.trace("On close, file=" + name + " evicted=" + numEvicted - + " block(s)"); + + " block(s) from L1 cache"); } } + if (cacheConf.isL2CacheEnabled() && evictL2OnClose) { + int numEvicted = cacheConf.getL2Cache().evictBlocksByHfileName(name); + if (LOG.isTraceEnabled()) { + LOG.trace("On close, file=" + name + " evicted=" + numEvicted + + " block(s) from L2 cache"); + } + } if (closeIStream) { if (istream != istreamNoFsChecksum && istreamNoFsChecksum != null) { istreamNoFsChecksum.close(); Index: src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java (working copy) @@ -51,6 +51,12 @@ private final AtomicLong evictionCount = new AtomicLong(0); /** The total number of blocks that have been evicted */ private final AtomicLong evictedBlockCount = new AtomicLong(0); + /** The total number of single-access blocks that have been evicted */ + private final AtomicLong evictedSingleCount = new AtomicLong(0); + /** The total number of multi-access blocks that have been evicted */ + private final AtomicLong evictedMultiCount = new AtomicLong(0); + /** The total number of in-memory blocks that have been evicted */ + private final AtomicLong evictedMemoryCount = new AtomicLong(0); /** The number of metrics periods to include in window */ private final int numPeriodsInWindow; @@ -103,6 +109,25 @@ evictedBlockCount.incrementAndGet(); } + public void evicted(CachedBlock.BlockPriority priority) { + evictedBlockCount.incrementAndGet(); + switch (priority) { + case SINGLE: + evictedSingleCount.incrementAndGet(); + break; + case MULTI: + evictedMultiCount.incrementAndGet(); + break; + case MEMORY: + evictedMemoryCount.incrementAndGet(); + break; + } + } + + public void evicted(CachedBlock block) { + evicted(block.getPriority()); + } + public long getRequestCount() { return getHitCount() + getMissCount(); } @@ -135,24 +160,39 @@ return evictedBlockCount.get(); } + public long getEvictedSingleCount() { + return evictedSingleCount.get(); + } + + public long getEvictedMultiCount() { + return evictedMultiCount.get(); + } + + public long getEvictedMemoryCount() { + return evictedMemoryCount.get(); + } + public double getHitRatio() { - return ((float)getHitCount()/(float)getRequestCount()); + return getRequestCount() == 0 ? 0.0 : ((double) getHitCount() / (double) getRequestCount()); } public double getHitCachingRatio() { - return ((float)getHitCachingCount()/(float)getRequestCachingCount()); + return getRequestCachingCount() == 0 ? 0.0 + : ((double) getHitCachingCount() / (double) getRequestCachingCount()); } public double getMissRatio() { - return ((float)getMissCount()/(float)getRequestCount()); + return getRequestCount() == 0 ? 0.0 : ((double) getMissCount() / (double) getRequestCount()); } public double getMissCachingRatio() { - return ((float)getMissCachingCount()/(float)getRequestCachingCount()); + return getRequestCachingCount() == 0 ? 0.0 + : ((double) getMissCachingCount() / (double) getRequestCachingCount()); } public double evictedPerEviction() { - return ((float)getEvictedCount()/(float)getEvictionCount()); + return getEvictionCount() == 0 ? 0.0 + : ((double) getEvictedCount() / (double) getEvictionCount()); } public void rollMetricsPeriod() { Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (working copy) @@ -521,6 +521,9 @@ /** Close method with optional evictOnClose */ void close(boolean evictOnClose) throws IOException; + /** Close method with optional evictOnClose for L1 and L2 caches */ + void close(boolean evictL1OnClose, boolean evictL2OnClose) throws IOException; + DataBlockEncoding getEncodingOnDisk(); } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/L2CacheFactory.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/L2CacheFactory.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/L2CacheFactory.java (working copy) @@ -0,0 +1,37 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.conf.Configuration; + +/** + * Abstract factory for {@link L2Cache} + */ +public interface L2CacheFactory { + + /** + * Given a configuration, create, configure, and return an L2 cache + * instance. The implementation may cache an already created L2 cache + * instance and return the cached instance instead. + * @param conf The HBase configuration needed to create the instance + * @return The created and configured instance + */ + public L2Cache getL2Cache(Configuration conf); +} Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (working copy) @@ -137,9 +137,11 @@ // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); + boolean cacheL2IndexesOnWrite = cacheConf.shouldL2CacheDataOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter, cacheIndexesOnWrite ? cacheConf.getBlockCache(): null, - cacheIndexesOnWrite ? name : null); + cacheL2IndexesOnWrite ? cacheConf.getL2Cache() : null, + (cacheIndexesOnWrite || cacheL2IndexesOnWrite) ? name : null); dataBlockIndexWriter.setMaxChunkSize( HFileBlockIndex.getMaxChunkSize(conf)); inlineBlockWriters.add(dataBlockIndexWriter); @@ -196,10 +198,14 @@ totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); HFile.offerWriteLatency(System.nanoTime() - startTimeNs); - + // If a write is succesfull, cached the written block in the L1 and L2 + // caches if (cacheConf.shouldCacheDataOnWrite()) { doCacheOnWrite(lastDataBlockOffset); } + if (cacheConf.isL2CacheEnabled() && cacheConf.shouldL2CacheDataOnWrite()) { + doCacheInL2Cache(lastDataBlockOffset); + } } /** Gives inline block writers an opportunity to contribute blocks. */ @@ -214,10 +220,13 @@ ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(), fsBlockWriter.getUncompressedSizeWithoutHeader()); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); - + // If a write is successful, cache the block in the L1 and L2 caches if (cacheThisBlock) { doCacheOnWrite(offset); } + if (cacheConf.isL2CacheEnabled() && cacheConf.shouldL2CacheDataOnWrite()) { + doCacheInL2Cache(offset); + } } } } @@ -240,6 +249,15 @@ } /** + * Associates the compressed and encoded block at a specific offset in the current HFile in the + * L2 Cache. + * @param offset Offset at which the block has been written + */ + private void doCacheInL2Cache(long offset) throws IOException { + cacheConf.getL2Cache().cacheRawBlock(name, offset, fsBlockWriter.getHeaderAndData()); + } + + /** * Ready a new block for writing. * * @throws IOException Index: src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (working copy) @@ -788,6 +788,7 @@ } /** Clears the cache. Used in tests. */ + @Override public void clearCache() { map.clear(); } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (working copy) @@ -272,6 +272,48 @@ buf.rewind(); } + /** + * A factory method to create an HFileBlock from a raw (compressed and + * encoded) byte array. + *

+ * The reason that this is static member of HFileBlock is that it needs + * to be called by classes that read from {@link L2Cache}, but at the mean + * time needs to have access to private members of HFileBlock (such as the + * internal buffers). + * + * TODO (avf): re-factor HFileBlock such that this method could be + * a non-static member of another class + * @param rawBytes The compressed and encoded byte array (essentially this + * is the block as it would appear on disk or in + * {@link L2Cache} + * @param compressAlgo Compression algorithm used to encode the block + * @param includeMemStoreTs Should memstore timestamp be included? + * @param offset Offset within the HFile at which the block is located + * @return An instantiated, uncompressed, decoded in-memory representation + * of the HFileBlock that can be scanned through or cached in the + * L1 block cache + * @throws IOException If there is an error de-compressed, de-coding, or + * otherwise parsing the raw byte array encoding the block. + */ + public static HFileBlock fromBytes(byte[] rawBytes, Algorithm compressAlgo, + boolean includeMemStoreTs, long offset, int minorVersion) throws IOException { + HFileBlock b; + if (compressAlgo == Algorithm.NONE) { + b = new HFileBlock(ByteBuffer.wrap(rawBytes), minorVersion); + b.assumeUncompressed(); + } else { + b = new HFileBlock(ByteBuffer.wrap(rawBytes, 0, headerSize(minorVersion)), minorVersion); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(rawBytes, + headerSize(minorVersion), rawBytes.length - headerSize(minorVersion))); + b.allocateBuffer(true); + AbstractFSReader.decompress(compressAlgo, b.buf.array(), b.buf.arrayOffset() + + headerSize(minorVersion), dis, b.uncompressedSizeWithoutHeader); + } + b.includesMemstoreTS = includeMemStoreTs; + b.offset = offset; + return b; + } + public BlockType getBlockType() { return blockType; } @@ -1102,6 +1144,20 @@ } /** + * Returns the header or the compressed data (or uncompressed data when not + * using compression) as a byte array. Can be called in the "writing" state + * or in the "block ready" state. If called in the "writing" state, + * transitions the writer to the "block ready" state. + * + * @return header and data as they would be stored on disk in a byte array + * @throws IOException + */ + public byte[] getHeaderAndData() throws IOException { + ensureBlockReady(); + return onDiskBytesWithHeader; + } + + /** * Releases the compressor this writer uses to compress blocks into the * compressor pool. Needs to be called before the writer is discarded. */ @@ -1266,12 +1322,13 @@ * @param offset * @param onDiskSize the on-disk size of the entire block, including all * applicable headers, or -1 if unknown + * @param addToL2Cache if true add the compressed block to L2 cache * @param uncompressedSize the uncompressed size of the compressed part of * the block, or -1 if unknown * @return the newly read block */ HFileBlock readBlockData(long offset, long onDiskSize, - int uncompressedSize, boolean pread) throws IOException; + int uncompressedSize, boolean pread, boolean addToL2Cache) throws IOException; /** * Creates a block iterator over the given portion of the {@link HFile}. @@ -1347,7 +1404,7 @@ public HFileBlock nextBlock() throws IOException { if (offset >= endOffset) return null; - HFileBlock b = readBlockData(offset, -1, -1, false); + HFileBlock b = readBlockData(offset, -1, -1, false, false); offset += b.getOnDiskSizeWithHeader(); return b; } @@ -1449,15 +1506,32 @@ * uncompressed data size, header not included * @throws IOException */ - protected void decompress(byte[] dest, int destOffset, - InputStream bufferedBoundedStream, + protected void decompress(byte[] dest, int destOffset, InputStream bufferedBoundedStream, int uncompressedSize) throws IOException { + decompress(compressAlgo, dest, destOffset, bufferedBoundedStream, uncompressedSize); + } + + /** + * Decompresses a given stream using a specified compression algorithm. + *

+ * This method is static so that it can be used by methods that construct + * an HFileBlock from a raw byte array. + * @param compressAlgo The specified compression algorithm + * @param dest Write decompressed bytes into this byte array + * @param destOffset Offset within the dest byte array + * @param bufferedBoundedStream Input stream from which compressed data is + * read + * @param uncompressedSize The expected un-compressed size of the data + * @throws IOException If there is an error during de-compression + */ + protected static void decompress(Algorithm compressAlgo, byte[] dest, + int destOffset, InputStream bufferedBoundedStream, + int uncompressedSize) throws IOException { Decompressor decompressor = null; try { decompressor = compressAlgo.getDecompressor(); InputStream is = compressAlgo.createDecompressionStream( bufferedBoundedStream, decompressor, 0); - IOUtils.readFully(is, dest, destOffset, uncompressedSize); is.close(); } finally { @@ -1529,7 +1603,7 @@ */ @Override public HFileBlock readBlockData(long offset, long onDiskSizeWithMagic, - int uncompressedSizeWithMagic, boolean pread) throws IOException { + int uncompressedSizeWithMagic, boolean pread, boolean addToL2Cache) throws IOException { if (uncompressedSizeWithMagic <= 0) { throw new IOException("Invalid uncompressedSize=" + uncompressedSizeWithMagic + " for a version 1 block"); @@ -1606,7 +1680,15 @@ /** Reads version 2 blocks from the filesystem. */ static class FSReaderV2 extends AbstractFSReader { + /** L2 cache instance or null if l2 cache is disabled */ + private final L2Cache l2Cache; + /** + * Name of the current hfile. Used to compose the key in for the + * L2 cache if enabled. + */ + private final String hfileNameForL2Cache; + // The configuration states that we should validate hbase checksums private final boolean useHBaseChecksumConfigured; @@ -1635,9 +1717,9 @@ } }; - public FSReaderV2(FSDataInputStream istream, - FSDataInputStream istreamNoFsChecksum, Algorithm compressAlgo, - long fileSize, int minorVersion, HFileSystem hfs, Path path) + public FSReaderV2(FSDataInputStream istream, FSDataInputStream istreamNoFsChecksum, + Algorithm compressAlgo, long fileSize, int minorVersion, HFileSystem hfs, Path path, + L2Cache l2Cache, String hfileNameForL2Cache) throws IOException { super(istream, istreamNoFsChecksum, compressAlgo, fileSize, minorVersion, hfs, path); @@ -1659,6 +1741,8 @@ useHBaseChecksum = false; } this.useHBaseChecksumConfigured = useHBaseChecksum; + this.l2Cache = l2Cache; + this.hfileNameForL2Cache = hfileNameForL2Cache; } /** @@ -1668,7 +1752,7 @@ FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo, long fileSize) throws IOException { this(istream, istream, compressAlgo, fileSize, - HFileReaderV2.MAX_MINOR_VERSION, null, null); + HFileReaderV2.MAX_MINOR_VERSION, null, null, null, null); } /** @@ -1681,10 +1765,12 @@ * @param uncompressedSize the uncompressed size of the the block. Always * expected to be -1. This parameter is only used in version 1. * @param pread whether to use a positional read + * @param addToL2Cache if true, will cache the block on read in the L2 + * cache, if the L2 cache is enabled. */ @Override public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, - int uncompressedSize, boolean pread) throws IOException { + int uncompressedSize, boolean pread, boolean addToL2Cache) throws IOException { // It is ok to get a reference to the stream here without any // locks because it is marked final. @@ -1700,10 +1786,8 @@ is = this.istream; } - HFileBlock blk = readBlockDataInternal(is, offset, - onDiskSizeWithHeaderL, - uncompressedSize, pread, - doVerificationThruHBaseChecksum); + HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, uncompressedSize, + pread, doVerificationThruHBaseChecksum, addToL2Cache); if (blk == null) { HFile.LOG.warn("HBase checksum verification failed for file " + path + " at offset " + @@ -1731,9 +1815,8 @@ this.useHBaseChecksum = false; doVerificationThruHBaseChecksum = false; is = this.istream; - blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, - uncompressedSize, pread, - doVerificationThruHBaseChecksum); + blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, uncompressedSize, pread, + doVerificationThruHBaseChecksum, addToL2Cache); if (blk != null) { HFile.LOG.warn("HDFS checksum verification suceeded for file " + path + " at offset " + @@ -1776,10 +1859,9 @@ * If HBase checksum is switched off, then use HDFS checksum. * @return the HFileBlock or null if there is a HBase checksum mismatch */ - private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, - long onDiskSizeWithHeaderL, - int uncompressedSize, boolean pread, boolean verifyChecksum) - throws IOException { + private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, + long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread, boolean verifyChecksum, + boolean addToL2Cache) throws IOException { if (offset < 0) { throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize=" + onDiskSizeWithHeaderL @@ -1841,7 +1923,9 @@ + preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); - + if (addToL2Cache) { + cacheBlockInL2Cache(offset, headerAndData.array()); + } b = new HFileBlock(headerAndData, getMinorVersion()); b.assumeUncompressed(); b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader); @@ -1860,9 +1944,9 @@ preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); - if (header == null) + if (header == null) { header = onDiskBlock; - + } try { b = new HFileBlock(ByteBuffer.wrap(header, 0, hdrSize), getMinorVersion()); @@ -1880,7 +1964,15 @@ !validateBlockChecksum(b, onDiskBlock, hdrSize)) { return null; // checksum mismatch } - + if (l2Cache != null && addToL2Cache) { + if (preReadHeaderSize > 0) { + // If we plan to add block to L2 cache, we need to copy the + // header information into the byte array so that it can be + // cached in the L2 cache. + System.arraycopy(header, 0, onDiskBlock, 0, preReadHeaderSize); + } + cacheBlockInL2Cache(offset, onDiskBlock); + } DataInputStream dis = new DataInputStream(new ByteArrayInputStream( onDiskBlock, hdrSize, onDiskSizeWithoutHeader)); @@ -1941,7 +2033,9 @@ !validateBlockChecksum(b, b.buf.array(), hdrSize)) { return null; // checksum mismatch } - + if (addToL2Cache) { + cacheBlockInL2Cache(offset, b.buf.array()); + } if (b.nextBlockOnDiskSizeWithHeader > 0) { setNextBlockHeader(offset, b); } @@ -1949,7 +2043,6 @@ // Allocate enough space for the block's header and compressed data. byte[] compressedBytes = new byte[b.getOnDiskSizeWithHeader() + hdrSize]; - b.nextBlockOnDiskSizeWithHeader = readAtOffset(is, compressedBytes, hdrSize, b.onDiskSizeWithoutHeader, true, offset + hdrSize, pread); @@ -1957,6 +2050,13 @@ !validateBlockChecksum(b, compressedBytes, hdrSize)) { return null; // checksum mismatch } + if (l2Cache != null && addToL2Cache) { + // If l2 cache is enabled, we need to copy the header bytes to + // the compressed bytes array, so that they can be cached in the + // L2 cache. + System.arraycopy(headerBuf.array(), 0, compressedBytes, 0, hdrSize); + cacheBlockInL2Cache(offset, compressedBytes); + } DataInputStream dis = new DataInputStream(new ByteArrayInputStream( compressedBytes, hdrSize, b.onDiskSizeWithoutHeader)); @@ -2001,6 +2101,20 @@ } /** + * If L2 cache is enabled, associates current hfile name and + * offset with the specified byte array representing the + * compressed and encoded block. + * @param offset The block's offset within the hfile + * @param blockBytes The block's bytes as they appear on disk (i.e., + * correctly encoded and compressed) + */ + void cacheBlockInL2Cache(long offset, byte[] blockBytes) { + if (l2Cache != null) { + l2Cache.cacheRawBlock(hfileNameForL2Cache, offset, blockBytes); + } + } + + /** * Generates the checksum for the header as well as the data and * then validates that it matches the value stored in the header. * If there is a checksum mismatch, then return false. Otherwise Index: src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java (working copy) @@ -0,0 +1,37 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.conf.Configuration; + +/** + * Abstract factory for {@link BlockCache} + */ +public interface BlockCacheFactory { + + /** + * Given a configuration, create, configure, and return a block cache + * instance. The implementation may cache an already created block + * cache instance and return the cached instance instead. + * @param conf The HBase configuration needed to create the instance + * @return The created and configured instance + */ + public BlockCache getBlockCache(Configuration conf); +} Index: src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java (working copy) @@ -111,6 +111,11 @@ public long getBlockCount(); /** + * Clear the cache. Used in unit tests. Don't call this in production. + */ + public void clearCache(); + + /** * Performs a BlockCache summary and returns a List of BlockCacheColumnFamilySummary objects. * This method could be fairly heavyweight in that it evaluates the entire HBase file-system * against what is in the RegionServer BlockCache. Index: src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java (working copy) @@ -0,0 +1,156 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.util.DirectMemoryUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.StringUtils; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryUsage; + +/** + * A singleton implementation of {@link L2CacheFactory} that creates + * {@link L2BucketCache} instances. This is a lazy singleton: + * first call to getInstance() will initialize and cache an instance, + * subsequent invocations will return the cached instance. All (static and + * per-instance) class methods are thread-safe. + *

+ * TODO (avf): remove all singleton functionality once Guice or other DI + * mechanism is available + */ +public class L2BucketCacheFactory implements L2CacheFactory { + + private static final Log LOG = LogFactory.getLog(L2BucketCacheFactory.class); + + /** Cached singleton instance or null if not initialized */ + private static L2BucketCacheFactory instance; + + private L2BucketCacheFactory() { } // Private as this is a singleton + + /** + * Returns a single cached instance of this class, initializing if needed. + * @return A cached and instantiated instance of this class + */ + public synchronized static L2BucketCacheFactory getInstance() { + if (instance == null) { + instance = new L2BucketCacheFactory(); + } + return instance; + } + + /** Cached instance of the L2Cache or null if not initialized */ + private L2Cache l2Cache; + + // Allows to short circuit getL2Cache() calls if the cache is disabled + private boolean l2CacheDisabled; + + private static String bucketSizesToString(int[] bucketSizes) { + StringBuilder sb = new StringBuilder(); + sb.append("Configured bucket sizes: "); + for (int bucketSize : bucketSizes) { + sb.append(StringUtils.humanReadableInt(bucketSize)); + sb.append(" "); + } + return sb.toString(); + } + /** + * Returns a cached initialized instance of {@link L2BucketCache}. Follows + * lazy initialization pattern: first invocation creates and initializes the + * instance, subsequent invocations return the cached instance. + * @param conf The configuration to pass to {@link L2BucketCache} + * @return The {@link L2BucketCache} instance + */ + @Override + public synchronized L2Cache getL2Cache(Configuration conf) { + Preconditions.checkNotNull(conf); + + if (l2Cache != null) { + return l2Cache; + } + if (l2CacheDisabled) { + return null; + } + + // If no IOEngine is specified for the L2 cache, assume the L2 cache + // is disabled + String bucketCacheIOEngineName = + conf.get(CacheConfig.L2_BUCKET_CACHE_IOENGINE_KEY, null); + if (bucketCacheIOEngineName == null) { + l2CacheDisabled = true; + return null; + } + + long maxMem = 0; + if (bucketCacheIOEngineName.equals("offheap")) { + maxMem = DirectMemoryUtils.getDirectMemorySize(); + } else if (bucketCacheIOEngineName.equals("heap")) { + MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + maxMem = mu.getMax(); + } + + // Unless a percentage of absolute size is set, assume the cache is + // disabled. This is analogous to default behaviour with LruBlockCache. + float bucketCachePercentage = + conf.getFloat(CacheConfig.L2_BUCKET_CACHE_SIZE_KEY, 0F); + long bucketCacheSize = (long) (bucketCachePercentage < 1 + ?maxMem * bucketCachePercentage : + bucketCachePercentage * 1024 * 1024); + if (bucketCacheSize > 0) { + int writerThreads = conf.getInt( + CacheConfig.L2_BUCKET_CACHE_WRITER_THREADS_KEY, + BucketCache.DEFAULT_WRITER_THREADS); + int writerQueueLen = conf.getInt(CacheConfig.L2_BUCKET_CACHE_QUEUE_KEY, + BucketCache.DEFAULT_WRITER_QUEUE_ITEMS); + int ioErrorsTolerationDuration = + conf.getInt(CacheConfig.L2_BUCKET_CACHE_IOENGINE_ERRORS_TOLERATED_DURATION_KEY, + BucketCache.DEFAULT_ERROR_TOLERATION_DURATION); + try { + int[] bucketSizes = CacheConfig.getL2BucketSizes(conf); + LOG.info(bucketSizesToString(bucketSizes)); + long bucketCacheInitStartMs = EnvironmentEdgeManager.currentTimeMillis(); + BucketCache bucketCache = new BucketCache(bucketCacheIOEngineName, + bucketCacheSize, writerThreads, writerQueueLen, + ioErrorsTolerationDuration, bucketSizes, conf); + l2Cache = new L2BucketCache(bucketCache); + long bucketCacheInitElapsedMs = + EnvironmentEdgeManager.currentTimeMillis() - bucketCacheInitStartMs; + LOG.info("L2BucketCache instantiated in " + bucketCacheInitElapsedMs + + " ms.; bucketCacheIOEngine=" + bucketCacheIOEngineName + + ", bucketCacheSize=" + bucketCacheSize + + ", writerThreads=" + writerThreads + ", writerQueueLen=" + + writerQueueLen + ", ioErrorsTolerationDuration=" + + ioErrorsTolerationDuration); + } catch (IOException ioe) { + LOG.error("Can't instantiate L2 cache with bucket cache engine", + ioe); + throw new RuntimeException(ioe); + } + } + return l2Cache; + } +} Index: src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCacheFactory.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCacheFactory.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCacheFactory.java (working copy) @@ -0,0 +1,114 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; +import org.apache.hadoop.util.StringUtils; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryUsage; + +/** + * A singleton implementation of {@link BlockCacheFactory} that creates + * {@link LruBlockCache} instances. This is a lazy singleton: + * first call to getInstance() will initialize and cache an instance, + * subsequent invocations will return the cached instances. All (static + * and per-instance) class methods are thread-safe. + *

+ * TODO (avf): remove all singleton functionality once Guice or other DI + * mechanism is available + */ +public class LruBlockCacheFactory implements BlockCacheFactory { + + private static final Log LOG = LogFactory.getLog(LruBlockCache.class); + + /** Cached singleton instance or null if not initialized */ + private static LruBlockCacheFactory instance; + + private LruBlockCacheFactory() { } // Private as this is a singleton + + public synchronized static LruBlockCacheFactory getInstance() { + if (instance == null) { + instance = new LruBlockCacheFactory(); + } + return instance; + } + + /** Cached instance of the BlockCache or null if not initialized */ + private LruBlockCache blockCache; + + // ALlows to short circuit getBlockCache calls if the cache is disabled + private boolean blockCacheDisabled; + + /** + * Returns the current underlying block cache instance or null if + * block cache is disabled or not initialized. Used by + * {@link SchemaMetrics} + * @return The current block cache instance, null if disabled or + * or not initialized + */ + public synchronized LruBlockCache getCurrentBlockCacheInstance() { + return blockCache; + } + + /** + * Returns a cached initialized instance of {@link LruBlockCache}. + * Follows lazy initialization pattern: first invocation creates and + * initializes the instance, subsequent invocations return the cached + * instance. This replaced a static "instantiateBlockCache()" method + * in CacheConfig. + * * @param conf The HBase configuration needed to create the instance + * @return The {@link LruBlockCache} instance. + */ + @Override + public synchronized BlockCache getBlockCache(Configuration conf) { + Preconditions.checkNotNull(conf); + + if (blockCache != null) + return blockCache; + if (blockCacheDisabled) + return null; + + float cachePercentage = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, + HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); + if (cachePercentage == 0L) { + blockCacheDisabled = true; + return null; + } + if (cachePercentage > 1.0) { + throw new IllegalArgumentException(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY + + " must be between 0.0 and 1.0, not > 1.0"); + } + + // Calculate the amount of heap to give the heap. + MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + long cacheSize = (long)(mu.getMax() * cachePercentage); + LOG.info("Allocating LruBlockCache with maximum size " + + StringUtils.humanReadableInt(cacheSize)); + blockCache = new LruBlockCache(cacheSize, StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf); + return blockCache; + } +} Index: src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java (working copy) @@ -171,4 +171,7 @@ return onHeapCache.getBlockCount() + offHeapCache.getBlockCount(); } + @Override + public void clearCache() { + } } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java (working copy) @@ -37,7 +37,7 @@ ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + ClassSize.STRING + ClassSize.BYTE_BUFFER); - static enum BlockPriority { + public static enum BlockPriority { /** * Accessed a single time (used for scan-resistance) */ @@ -50,7 +50,7 @@ * Block from in-memory store */ MEMORY - }; + } private final BlockCacheKey cacheKey; private final Cacheable buf; Index: src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (working copy) @@ -95,4 +95,8 @@ public DataBlockEncoding getDataBlockEncoding() { return encoding; } + + public long getOffset() { + return offset; + } } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java (working copy) @@ -134,5 +134,8 @@ return 0; } + @Override + public void clearCache() { + } } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (working copy) @@ -17,18 +17,16 @@ */ package org.apache.hadoop.hbase.io.hfile; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryUsage; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.hfile.bucket.IOEngine; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; -import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.DirectMemoryUtils; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hbase.util.Strings; /** * Stores all of the cache objects and configuration for a single HFile. @@ -70,6 +68,86 @@ public static final String EVICT_BLOCKS_ON_CLOSE_KEY = "hbase.rs.evictblocksonclose"; + /** + * IO engine to be used by the L2 cache. Current accepted settings: + * "heap", and "offheap". If omitted, L2 cache will be disabled. + * @see IOEngine + */ + public static final String L2_BUCKET_CACHE_IOENGINE_KEY = + "hfile.l2.bucketcache.ioengine"; + + /** + * Size of the L2 cache. If < 1.0, this is taken as a percentage of available + * (depending on configuration) direct or heap memory, otherwise it is taken + * as an absolute size in kb. If omitted or set to 0, L2 cache will be + * disabled. + */ + public static final String L2_BUCKET_CACHE_SIZE_KEY = + "hfile.l2.bucketcache.size"; + + /** + * Number of writer threads for the BucketCache instance used for the L2 + * cache. + * @see L2BucketCache + */ + public static final String L2_BUCKET_CACHE_WRITER_THREADS_KEY = + "hfile.l2.bucketcache.writer.threads"; + + /** + * Maximum length of the write queue for the BucketCache instance used for + * the L2 cache. + * @see L2BucketCache + */ + public static final String L2_BUCKET_CACHE_QUEUE_KEY = + "hfile.l2.bucketcache.queue.length"; + + /** + * If L2 cache is enabled, this configuration key controls whether or not + * blocks are cached upon writes. Default: true if L2 cache is enabled. + */ + public static final String L2_CACHE_BLOCKS_ON_FLUSH_KEY = + "hfile.l2.cacheblocksonwrite"; + + /** + * Configuration key to enable evicting all blocks associated with an hfile + * after this hfile has been compacted. Default: true. + */ + public static final String L2_EVICT_ON_CLOSE_KEY = + "hfile.l2.evictblocksonclose"; + + /** + * Configuration key to evict keys from the L2 cache once they have been + * cached in the L1 cache (i.e., the regular Lru Block Cache). Default: false. + * Not yet implemented! + * + * TODO (avf): needs to be implemented to be honoured correctly + */ + public static final String L2_EVICT_ON_PROMOTION_KEY = + "hfile.l2.evictblocksonpromotion"; + + /** + * Duration that BucketCache instanced used by the L2 cache will tolerate IO + * errors until the cache is disabled. Default: 1 minute. + */ + public static final String L2_BUCKET_CACHE_IOENGINE_ERRORS_TOLERATED_DURATION_KEY = + "hfile.l2.bucketcache.ioengine.errors.tolerated.duration"; + + /** + * Size of individual buckets for the bucket allocator. This is expected to + * a small (e.g., 5-15 items) list of comma-separated integer values + * centered around the size (in bytes) of an average compressed block. + */ + public static final String L2_BUCKET_CACHE_BUCKET_SIZES_KEY = + "hfile.l2.bucketcache.bucket.sizes"; + + /** + * Size (in bytes) of an individual buffer inside ByteBufferArray which is used + * by the ByteBufferIOEngine. Default is 4mb. Larger byte buffers might mean + * greater lock contention; smaller byte buffer means more operations per + * write and higher memory overhead for maintaining per-buffer locks. + */ + public static final String L2_BUCKET_CACHE_BUFFER_SIZE_KEY = + "hfile.l2.bucketcache.buffer.size"; // Defaults public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; @@ -79,10 +157,21 @@ public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false; public static final boolean DEFAULT_EVICT_ON_CLOSE = false; public static final boolean DEFAULT_COMPRESSED_CACHE = false; + public static final boolean DEFAULT_L2_CACHE_BLOCKS_ON_FLUSH = true; + public static final boolean DEFAULT_L2_EVICT_ON_PROMOTION = false; + public static final boolean DEFAULT_L2_EVICT_ON_CLOSE = true; + public static final int[] DEFAULT_L2_BUCKET_CACHE_BUCKET_SIZES = { 4 * 1024 + 1024, + 8 * 1024 + 1024, 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024, + 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024, 192 * 1024 + 1024, + 256 * 1024 + 1024, 384 * 1024 + 1024, 512 * 1024 + 1024 }; + public static final int DEFAULT_L2_BUCKET_CACHE_BUFFER_SIZE = 4 * 1024 * 1024; /** Local reference to the block cache, null if completely disabled */ private final BlockCache blockCache; + /** Local reference to the l2 cache, null if disabled */ + private final L2Cache l2Cache; + /** * Whether blocks should be cached on read (default is on if there is a * cache but this can be turned off on a per-family or per-request basis) @@ -107,28 +196,67 @@ /** Whether data blocks should be stored in compressed form in the cache */ private final boolean cacheCompressed; + /** Whether to cache all blocks on write in the L2 cache */ + private final boolean l2CacheDataOnWrite; + /** + * TODO (avf): once implemented, whether to evict blocks from the L2 + * cache once they have been cached in the L1 cache + */ + private final boolean l2EvictOnPromotion; + + /** + * Whether blocks of a file should be evicted from the L2 cache when the file + * is closed. + */ + private final boolean l2EvictOnClose; + + /** + * Parses a comma separated list of bucket sizes and returns a list of + * sorted integers representing the various bucket sizes. Bucket sizes should + * roughly correspond to block sizes encountered in production. + * @param conf The configuration + * @return + */ + public static int[] getL2BucketSizes(Configuration conf) { + String[] bucketSizesStr = conf.getStrings(L2_BUCKET_CACHE_BUCKET_SIZES_KEY); + if (bucketSizesStr == null) { + return DEFAULT_L2_BUCKET_CACHE_BUCKET_SIZES; + } + int[] retVal = new int[bucketSizesStr.length]; + for (int i = 0; i < bucketSizesStr.length; ++i) { + try { + retVal[i] = Integer.valueOf(bucketSizesStr[i]); + } catch (NumberFormatException nfe) { + LOG.fatal("Error parsing value " + bucketSizesStr[i], nfe); + throw nfe; + } + } + Arrays.sort(retVal); + return retVal; + } + + /** * Create a cache configuration using the specified configuration object and * family descriptor. * @param conf hbase configuration * @param family column family configuration */ public CacheConfig(Configuration conf, HColumnDescriptor family) { - this(CacheConfig.instantiateBlockCache(conf), - family.isBlockCacheEnabled(), - family.isInMemory(), - // For the following flags we enable them regardless of per-schema settings - // if they are enabled in the global configuration. - conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, - DEFAULT_CACHE_DATA_ON_WRITE) || family.shouldCacheDataOnWrite(), - conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, - DEFAULT_CACHE_INDEXES_ON_WRITE) || family.shouldCacheIndexesOnWrite(), - conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, - DEFAULT_CACHE_BLOOMS_ON_WRITE) || family.shouldCacheBloomsOnWrite(), - conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, - DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(), - conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE) - ); + this(LruBlockCacheFactory.getInstance().getBlockCache(conf), L2BucketCacheFactory.getInstance() + .getL2Cache(conf), family.isBlockCacheEnabled(), family.isInMemory(), + // For the following flags we enable them regardless of per-schema settings + // if they are enabled in the global configuration. + conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE) + || family.shouldCacheDataOnWrite(), conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, + DEFAULT_CACHE_INDEXES_ON_WRITE) || family.shouldCacheIndexesOnWrite(), conf.getBoolean( + CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_BLOOMS_ON_WRITE) + || family.shouldCacheBloomsOnWrite(), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, + DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(), conf.getBoolean( + CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE), conf.getBoolean( + L2_CACHE_BLOCKS_ON_FLUSH_KEY, DEFAULT_L2_CACHE_BLOCKS_ON_FLUSH), conf.getBoolean( + L2_EVICT_ON_PROMOTION_KEY, DEFAULT_L2_EVICT_ON_PROMOTION), conf.getBoolean( + L2_EVICT_ON_CLOSE_KEY, DEFAULT_L2_EVICT_ON_CLOSE)); } /** @@ -137,19 +265,18 @@ * @param conf hbase configuration */ public CacheConfig(Configuration conf) { - this(CacheConfig.instantiateBlockCache(conf), - DEFAULT_CACHE_DATA_ON_READ, - DEFAULT_IN_MEMORY, // This is a family-level setting so can't be set - // strictly from conf - conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), - conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, - DEFAULT_CACHE_INDEXES_ON_WRITE), - conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, - DEFAULT_CACHE_BLOOMS_ON_WRITE), - conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), - conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, - DEFAULT_COMPRESSED_CACHE) - ); + this(LruBlockCacheFactory.getInstance().getBlockCache(conf), L2BucketCacheFactory.getInstance() + .getL2Cache(conf), DEFAULT_CACHE_DATA_ON_READ, DEFAULT_IN_MEMORY, // This is a family-level + // setting so can't be set + // strictly from conf + conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), conf.getBoolean( + CACHE_INDEX_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_INDEXES_ON_WRITE), conf.getBoolean( + CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_BLOOMS_ON_WRITE), conf.getBoolean( + EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), conf.getBoolean( + CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE), conf.getBoolean( + L2_CACHE_BLOCKS_ON_FLUSH_KEY, DEFAULT_L2_CACHE_BLOCKS_ON_FLUSH), conf.getBoolean( + L2_EVICT_ON_PROMOTION_KEY, DEFAULT_L2_EVICT_ON_PROMOTION), conf.getBoolean( + L2_EVICT_ON_CLOSE_KEY, DEFAULT_L2_EVICT_ON_CLOSE)); } /** @@ -164,12 +291,14 @@ * @param evictOnClose whether blocks should be evicted when HFile is closed * @param cacheCompressed whether to store blocks as compressed in the cache */ - CacheConfig(final BlockCache blockCache, + CacheConfig(final BlockCache blockCache, final L2Cache l2Cache, final boolean cacheDataOnRead, final boolean inMemory, final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheBloomsOnWrite, final boolean evictOnClose, - final boolean cacheCompressed) { + final boolean cacheCompressed, final boolean l2CacheDataOnWrite, + final boolean l2EvictOnPromotion, final boolean l2EvictOnClose) { this.blockCache = blockCache; + this.l2Cache = l2Cache; this.cacheDataOnRead = cacheDataOnRead; this.inMemory = inMemory; this.cacheDataOnWrite = cacheDataOnWrite; @@ -177,6 +306,9 @@ this.cacheBloomsOnWrite = cacheBloomsOnWrite; this.evictOnClose = evictOnClose; this.cacheCompressed = cacheCompressed; + this.l2CacheDataOnWrite = l2CacheDataOnWrite; + this.l2EvictOnPromotion = l2EvictOnPromotion; + this.l2EvictOnClose = l2EvictOnClose; } /** @@ -184,10 +316,10 @@ * @param cacheConf */ public CacheConfig(CacheConfig cacheConf) { - this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory, - cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, - cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose, - cacheConf.cacheCompressed); + this(cacheConf.blockCache, cacheConf.l2Cache, cacheConf.cacheDataOnRead, cacheConf.inMemory, + cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, cacheConf.cacheBloomsOnWrite, + cacheConf.evictOnClose, cacheConf.cacheCompressed, cacheConf.l2CacheDataOnWrite, + cacheConf.l2EvictOnPromotion, cacheConf.l2EvictOnClose); } /** @@ -290,70 +422,202 @@ return isBlockCacheEnabled() && this.cacheCompressed; } + public boolean shouldL2CacheDataOnWrite() { + return l2CacheDataOnWrite; + } + + public boolean shouldL2EvictOnPromotion() { + return l2EvictOnPromotion; + } + + public boolean shouldL2EvictOnClose() { + return l2EvictOnClose; + } + @Override public String toString() { - if (!isBlockCacheEnabled()) { + if (!isBlockCacheEnabled() && !isL2CacheEnabled()) { return "CacheConfig:disabled"; } - return "CacheConfig:enabled " + - "[cacheDataOnRead=" + shouldCacheDataOnRead() + "] " + - "[cacheDataOnWrite=" + shouldCacheDataOnWrite() + "] " + - "[cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() + "] " + - "[cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() + "] " + - "[cacheEvictOnClose=" + shouldEvictOnClose() + "] " + - "[cacheCompressed=" + shouldCacheCompressed() + "]"; + StringBuilder sb = new StringBuilder("CacheConfig:enabled"); + if (isL2CacheEnabled()) { + sb = Strings.appendKeyValue(sb, "l2CacheDataOnWrite", + shouldL2CacheDataOnWrite()); + sb = Strings.appendKeyValue(sb, "l2EvictOnClose", shouldL2EvictOnClose()); + sb = Strings.appendKeyValue(sb, "l2EvictOnPromotion", + shouldL2EvictOnPromotion()); + } + if (isBlockCacheEnabled()) { + sb = Strings.appendKeyValue(sb, "cacheDataOnRead", shouldCacheDataOnRead()); + sb = Strings.appendKeyValue(sb, "cacheDataOnWrite", shouldCacheDataOnWrite()); + sb = Strings.appendKeyValue(sb, "cacheIndexesOnWrite", shouldCacheIndexesOnWrite()); + sb = Strings.appendKeyValue(sb, "cacheBloomsOnWrite", shouldCacheBloomsOnWrite()); + sb = Strings.appendKeyValue(sb, "cacheEvictOnClose", shouldEvictOnClose()); + sb = Strings.appendKeyValue(sb, "cacheCompressed", shouldCacheCompressed()); + } + return sb.toString(); } - // Static block cache reference and methods + public boolean isL2CacheEnabled() { + return l2Cache != null; + } + public L2Cache getL2Cache() { + return l2Cache; + } + /** - * Static reference to the block cache, or null if no caching should be used - * at all. + * A builder for the CacheConfig class. Used to avoid having multiple + * constructors. */ - private static BlockCache globalBlockCache; + public static class CacheConfigBuilder { + private final L2CacheFactory l2CacheFactory; + private final BlockCacheFactory blockCacheFactory; + private final Configuration conf; - /** Boolean whether we have disabled the block cache entirely. */ - private static boolean blockCacheDisabled = false; + private BlockCache blockCache; + private L2Cache l2Cache; + private boolean cacheDataOnRead = DEFAULT_CACHE_DATA_ON_READ; + private boolean inMemory = DEFAULT_IN_MEMORY; + private boolean cacheDataOnFlush; + private boolean cacheIndexesOnWrite; + private boolean cacheBloomsOnWrite; + private boolean evictOnClose; + private boolean cacheCompressed; + private boolean l2CacheDataOnWrite; + private boolean l2EvictOnPromotion; + private boolean l2EvictOnClose; - /** - * Returns the block cache or null in case none should be used. - * - * @param conf The current configuration. - * @return The block cache or null. - */ - private static synchronized BlockCache instantiateBlockCache( - Configuration conf) { - if (globalBlockCache != null) return globalBlockCache; - if (blockCacheDisabled) return null; + /** + * Creates an empty builder, to be used for unit tests. + */ + public CacheConfigBuilder() { + this(null, null, null); + } - float cachePercentage = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, - HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); - if (cachePercentage == 0L) { - blockCacheDisabled = true; - return null; + /** + * Use {@link LruBlockCacheFactory} to construct the block cache and + * {@link L2BucketCacheFactory} to construct the L2 cache. Reads all + * other cache configuration from the specified configuration object. + * @param conf The HBase configuration that contains cache related settings + */ + public CacheConfigBuilder(Configuration conf) { + this(conf, LruBlockCacheFactory.getInstance(), + L2BucketCacheFactory.getInstance()); } - if (cachePercentage > 1.0) { - throw new IllegalArgumentException(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY + - " must be between 0.0 and 1.0, and not > 1.0"); + + /** + * Uses the specified {@link BlockCacheFactory} and {@link L2CacheFactory} + * to constructor the respective caches. Reads all cache configuration + * from the specified configuration object. + * @param conf The HBase configuration that contains cache related settings + * @param blockCacheFactory The factory used to construct the block cache + * @param l2CacheFactory The factory used to construct the l2 cache + */ + public CacheConfigBuilder(Configuration conf, + BlockCacheFactory blockCacheFactory, L2CacheFactory l2CacheFactory) { + this.blockCacheFactory = blockCacheFactory; + this.l2CacheFactory = l2CacheFactory; + this.conf = conf; + cacheDataOnFlush = conf.getBoolean( + CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE); + cacheIndexesOnWrite = conf.getBoolean( + CACHE_INDEX_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_INDEXES_ON_WRITE); + cacheBloomsOnWrite = conf.getBoolean( + CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_BLOOMS_ON_WRITE); + evictOnClose = conf.getBoolean( + EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE); + cacheCompressed = conf.getBoolean( + CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE); + l2CacheDataOnWrite = conf.getBoolean( + L2_CACHE_BLOCKS_ON_FLUSH_KEY, DEFAULT_L2_CACHE_BLOCKS_ON_FLUSH); + l2EvictOnPromotion = conf.getBoolean( + L2_EVICT_ON_PROMOTION_KEY, DEFAULT_L2_EVICT_ON_PROMOTION); + l2EvictOnClose = conf.getBoolean( + L2_EVICT_ON_CLOSE_KEY, DEFAULT_L2_EVICT_ON_CLOSE); } - // Calculate the amount of heap to give the heap. - MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); - long cacheSize = (long)(mu.getMax() * cachePercentage); - int blockSize = conf.getInt("hbase.offheapcache.minblocksize", - HFile.DEFAULT_BLOCKSIZE); - long offHeapCacheSize = - (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) * - DirectMemoryUtils.getDirectMemorySize()); - LOG.info("Allocating LruBlockCache with maximum size " + - StringUtils.humanReadableInt(cacheSize)); - if (offHeapCacheSize <= 0) { - globalBlockCache = new LruBlockCache(cacheSize, - StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf); - } else { - globalBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize, - StoreFile.DEFAULT_BLOCKSIZE_SMALL, blockSize, conf); + public CacheConfigBuilder withBlockCache(BlockCache blockCache) { + this.blockCache = blockCache; + return this; } - return globalBlockCache; + + public CacheConfigBuilder withL2Cache(L2Cache l2Cache) { + this.l2Cache = l2Cache; + return this; + } + + public CacheConfigBuilder withCacheDataOnRead(boolean cacheDataOnRead) { + this.cacheDataOnRead = cacheDataOnRead; + return this; + } + + public CacheConfigBuilder withInMemory(boolean inMemory) { + this.inMemory = inMemory; + return this; + } + + public CacheConfigBuilder withCacheDataOnFlush(boolean cacheDataOnFlush) { + this.cacheDataOnFlush = cacheDataOnFlush; + return this; + } + + public CacheConfigBuilder withCacheIndexesOnWrite(boolean cacheIndexesOnWrite) { + this.cacheIndexesOnWrite = cacheIndexesOnWrite; + return this; + } + + public CacheConfigBuilder withCacheBloomsOnWrite(boolean cacheBloomsOnWrite) { + this.cacheBloomsOnWrite = cacheBloomsOnWrite; + return this; + } + + public CacheConfigBuilder withEvictOnClose(boolean evictOnClose) { + this.evictOnClose = evictOnClose; + return this; + } + + public CacheConfigBuilder withCacheCompressed(boolean cacheCompressed){ + this.cacheCompressed = cacheCompressed; + return this; + } + + public CacheConfigBuilder withL2CacheDataOnWrite(boolean l2CacheDataOnFlush) { + this.l2CacheDataOnWrite = l2CacheDataOnFlush; + return this; + } + + public CacheConfigBuilder withL2EvictOnPromotion(boolean l2EvictOnPromotion) { + this.l2EvictOnPromotion = l2EvictOnPromotion; + return this; + } + + public CacheConfigBuilder withL2EvictOnClose(boolean l2EvictOnClose){ + this.l2EvictOnClose = l2EvictOnClose; + return this; + } + + /** + * Create a cache configuration based on the current state of this + * builder object. If the block cache and/or L2 cache have not been + * explicitly passed in using the builder methods, use the respective + * factories to create these caches. + * @return The fully instantiated and configured {@link CacheConfig} + * instance + */ + public CacheConfig build() { + if (blockCache == null && blockCacheFactory != null) { + blockCache = blockCacheFactory.getBlockCache(conf); + } + if (l2Cache == null && l2CacheFactory != null) { + l2Cache = l2CacheFactory.getL2Cache(conf); + } + return new CacheConfig(blockCache, l2Cache, + cacheDataOnRead, inMemory, + cacheDataOnFlush, cacheIndexesOnWrite, + cacheBloomsOnWrite, evictOnClose, + cacheCompressed, l2CacheDataOnWrite, + l2EvictOnPromotion, l2EvictOnClose); + } } } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java (working copy) @@ -0,0 +1,103 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; + +/** + * An implementation of L2 cache based on {@link BucketCache} + */ +public class L2BucketCache implements L2Cache { + + private static final Log LOG = LogFactory.getLog(L2BucketCache.class); + + private final BucketCache bucketCache; + + public L2BucketCache(BucketCache bucketCache) { + this.bucketCache = bucketCache; + } + + @Override + public byte[] getRawBlock(String hfileName, long dataBlockOffset) { + long startTimeNs = System.nanoTime(); + BlockCacheKey cacheKey = new BlockCacheKey(hfileName, dataBlockOffset); + byte[] fromCache = bucketCache.getBlock(cacheKey, true); + if (LOG.isTraceEnabled()) { + // Log elapsed time to retrieve a block from the cache + long elapsedNs = System.nanoTime() - startTimeNs; + LOG.trace("getRawBlock() " + (fromCache == null ?"MISS" : "HIT") + + " on hfileName=" + hfileName + ", offset=" + dataBlockOffset + + " in " + elapsedNs + " ns."); + } + return fromCache; + } + + @Override + public void cacheRawBlock(String hfileName, long dataBlockOffset, byte[] block) { + long startTimeNs = System.nanoTime(); + BlockCacheKey cacheKey = new BlockCacheKey(hfileName, dataBlockOffset); + bucketCache.cacheBlock(cacheKey, block); + if (LOG.isTraceEnabled()) { + long elapsedNs = System.nanoTime() - startTimeNs; + LOG.trace("cacheRawBlock() on hfileName=" + hfileName + ", offset=" + + dataBlockOffset + " in " + elapsedNs + " ns."); + } + } + + @Override + public int evictBlocksByHfileName(String hfileName) { + long startTimeNs = System.nanoTime(); // Measure eviction perf + int numEvicted = bucketCache.evictBlocksByHfileName(hfileName); + if (LOG.isTraceEnabled()) { + long elapsedNs = System.nanoTime() - startTimeNs; + LOG.trace("evictBlockByHfileName() on hfileName=" + hfileName + ": " + + numEvicted + " blocks evicted in " + elapsedNs + " ns."); + } + return numEvicted; + } + + @Override + public void shutdown() { + bucketCache.shutdown(); + } + + public CacheStats getStats() { + return bucketCache.getStats(); + } + + public long size() { + return bucketCache.getBlockCount(); + } + + public long getFreeSize() { + return bucketCache.getFreeSize(); + } + + public long getCurrentSize() { + return bucketCache.size(); + } + + public long getEvictedCount() { + return bucketCache.getEvictedCount(); + } +} Index: src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java (working copy) @@ -423,4 +423,7 @@ throw new UnsupportedOperationException(); } + @Override + public void clearCache() { + } } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java (working copy) @@ -354,4 +354,9 @@ + ClassSize.ATOMIC_LONG); } } + + @Override + public void clearCache() { + backingMap.clear(); + } } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (working copy) @@ -238,9 +238,8 @@ // Cache Miss, please load. } - HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, - nextOffset - offset, metaBlockIndexReader.getRootBlockDataSize(block), - true); + HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset - offset, + metaBlockIndexReader.getRootBlockDataSize(block), true, false); passSchemaMetricsTo(hfileBlock); hfileBlock.expectType(BlockType.META); @@ -314,8 +313,8 @@ nextOffset = dataBlockIndexReader.getRootBlockOffset(block + 1); } - HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset - - offset, dataBlockIndexReader.getRootBlockDataSize(block), pread); + HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset - offset, + dataBlockIndexReader.getRootBlockDataSize(block), pread, false); passSchemaMetricsTo(hfileBlock); hfileBlock.expectType(BlockType.DATA); @@ -389,6 +388,12 @@ getSchemaMetrics().flushMetrics(); } + @Override + public void close(boolean evictL1OnClose, boolean evictL2OnClose) + throws IOException { + close(evictL1OnClose); // HFileReaderV1 does not support L2 cache + } + protected abstract static class AbstractScannerV1 extends AbstractHFileReader.Scanner { protected int currBlock; Index: src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java (working copy) @@ -0,0 +1,182 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.nio.ByteBuffer; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; + +/** + * This class manages an array of ByteBuffers with a default size 4MB. These + * buffers are sequential and could be considered as a large buffer.It supports + * reading/writing data from this large buffer with a position and offset + */ +public class ByteBufferArray { + + private static final Log LOG = LogFactory.getLog(ByteBufferArray.class); + + private final ByteBuffer buffers[]; + private final Lock locks[]; + private final int bufferSize; + private final int bufferCount; + + /** + * We allocate a number of byte buffers as the capacity. In order not to out + * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}), + * we will allocate one additional buffer with capacity 0; + * @param capacity total size of the byte buffer array + * @param directByteBuffer true if we allocate direct buffer + */ + public ByteBufferArray(long capacity, int bufferSize, boolean directByteBuffer) { + if (bufferSize > (capacity / 16)) { + bufferSize = (int) roundUp(capacity / 16, 32768); + } + this.bufferSize = bufferSize; + this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize); + LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity) + + " , sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" + + bufferCount); + buffers = new ByteBuffer[bufferCount + 1]; + locks = new Lock[bufferCount + 1]; + for (int i = 0; i <= bufferCount; i++) { + locks[i] = new ReentrantLock(); + if (i < bufferCount) { + buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize) : + ByteBuffer.allocate(bufferSize); + } else { + buffers[i] = ByteBuffer.allocate(0); + } + + } + } + + private static long roundUp(long n, long to) { + return ((n + to - 1) / to) * to; + } + + /** + * Transfers bytes from this buffer array into the given destination array + * @param start start offset of this buffer array + * @param len The maximum number of bytes to be written to the given array + * @param dstArray The array into which bytes are to be written + * @param dstOffset The offset within the given array of the first byte to be + * written + */ + public void getMultiple(long start, int len, byte[] dstArray, int dstOffset) { + multiple(start, len, dstArray, dstOffset, new Visitor() { + public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) { + bb.get(array, arrayIdx, len); + } + }); + } + + /** + * Transfers bytes from the given source array into this buffer array + * @param start start offset of this buffer array + * @param len The maximum number of bytes to be read from the given array + * @param srcArray The array from which bytes are to be read + * @param srcOffset The offset within the given array of the first byte to be + * read + */ + public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) { + multiple(start, len, srcArray, srcOffset, new Visitor() { + public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) { + bb.put(array, arrayIdx, len); + } + }); + } + + private interface Visitor { + /** + * Visit the given byte buffer, if it is a read action, we will transfer the + * bytes from the buffer to the destination array, else if it is a write + * action, we will transfer the bytes from the source array to the buffer + * @param bb byte buffer + * @param array a source or destination byte array + * @param arrayOffset offset of the byte array + * @param len read/write length + */ + void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len); + } + + /** + * Access(read or write) this buffer array with a position and length as the + * given array. Here we will only lock one buffer even if it may be need visit + * several buffers. The consistency is guaranteed by the caller. + * @param start start offset of this buffer array + * @param len The maximum number of bytes to be accessed + * @param array The array from/to which bytes are to be read/written + * @param arrayOffset The offset within the given array of the first byte to + * be read or written + * @param visitor implement of how to visit the byte buffer + */ + void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) { + Preconditions.checkArgument(len >= 0); + Preconditions.checkArgument(len + arrayOffset <= array.length); + long end = start + len; + int startBuffer = Preconditions.checkPositionIndex( + (int) (start / bufferSize), bufferCount); + int endBuffer = (int) (end / bufferSize); + int endOffset = (int) (end % bufferSize); + Preconditions.checkArgument(endBuffer >= 0 && endBuffer < bufferCount + || (endBuffer == bufferCount && endOffset == 0)); + + int startOffset = (int) (start % bufferSize); + if (startBuffer >= locks.length || startBuffer < 0) { + String msg = "Failed multiple, start=" + start + ",startBuffer=" + + startBuffer + ",bufferSize=" + bufferSize; + LOG.error(msg); + throw new RuntimeException(msg); + } + int srcIndex = 0, cnt = -1; + for (int i = startBuffer; i <= endBuffer; ++i) { + Lock lock = locks[i]; + lock.lock(); + try { + ByteBuffer bb = buffers[i]; + if (i == startBuffer) { + cnt = bufferSize - startOffset; + if (cnt > len) { + cnt = len; + } + bb.limit(startOffset + cnt).position(startOffset); + } else if (i == endBuffer) { + cnt = endOffset; + bb.limit(cnt).position(0); + } else { + cnt = bufferSize ; + bb.limit(cnt).position(0); + } + visitor.visit(bb, array, srcIndex + arrayOffset, cnt); + srcIndex += cnt; + } finally { + lock.unlock(); + } + } + if (srcIndex != len) { + throw new IllegalStateException("srcIndex(" + srcIndex + ") != len(" + + len + ")!"); + } + } +} Index: src/main/java/org/apache/hadoop/hbase/util/ConditionUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/ConditionUtil.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/ConditionUtil.java (working copy) @@ -0,0 +1,62 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + + +import com.google.common.base.Preconditions; + +/** + * Extends the patterns in {@link Preconditions} + */ +public class ConditionUtil { + + /** + * Checks if a specified offset is >= 0 + * @param offset The offset to check + * @return The specified offset if it is >= 0 + * @throws IndexOutOfBoundsException If specified offset is negative + */ + public static long checkPositiveOffset(long offset) { + return checkOffset(offset, -1); + } + + /** + * Check if an offset is >= 0 but less than a maximum limit (if one is + * specified). + * @see {@link Preconditions#checkPositionIndex(int, int)} + * @param offset The offset to check + * @param limit The maximum limit or -1 if none + * @return The specified offset if it is positive and if the a limit is + * specified lower than that limit. + * @throws IllegalStateException If the offset is negative, or if a limit + * is specified and the offset is greater than the limit. + */ + public static long checkOffset(long offset, long limit) { + if (offset < 0) { + throw new IndexOutOfBoundsException("Negative offset: " + offset); + } + if (limit != -1 && offset >= limit) { + throw new IndexOutOfBoundsException("Offset (" + offset + + ") is greater than limit (" + limit + ")"); + } + return offset; + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheStats; +import org.apache.hadoop.hbase.io.hfile.L2BucketCache; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; @@ -823,6 +824,10 @@ cacheConfig.getBlockCache().shutdown(); } + if (cacheConfig.isL2CacheEnabled()) { + cacheConfig.getL2Cache().shutdown(); + } + // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); @@ -1633,6 +1638,20 @@ percent = (int) (ratio * 100); this.metrics.blockCacheHitCachingRatioPastNPeriods.set(percent); } + + L2BucketCache l2BucketCache = (L2BucketCache)cacheConfig.getL2Cache(); + if (l2BucketCache != null) { + this.metrics.l2CacheCount.set(l2BucketCache.size()); + this.metrics.l2CacheFree.set(l2BucketCache.getFreeSize()); + this.metrics.l2CacheSize.set(l2BucketCache.getCurrentSize()); + this.metrics.l2CacheEvictedCount.set(l2BucketCache.getEvictedCount()); + CacheStats l2CacheStats = l2BucketCache.getStats(); + this.metrics.l2CacheHitCount.set(l2CacheStats.getHitCount()); + this.metrics.l2CacheMissCount.set(l2CacheStats.getMissCount()); + double ratio = l2CacheStats.getHitRatioPastNPeriods(); + int percent = (int) (ratio * 100); + this.metrics.l2CacheHitRatioPastNPeriods.set(percent); + } float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex( getServerName().getHostname()); int percent = (int) (localityIndex * 100); Index: src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java (working copy) @@ -130,6 +130,43 @@ /** Block hit caching ratio for past N periods */ public final MetricsIntValue blockCacheHitCachingRatioPastNPeriods = new MetricsIntValue("blockCacheHitCachingRatioPastNPeriods", registry); + /** + * L2 cache size + */ + public final MetricsLongValue l2CacheSize = new MetricsLongValue("l2CacheSize", registry); + + /** + * L2 cache free size + */ + public final MetricsLongValue l2CacheFree = new MetricsLongValue("l2CacheFree", registry); + + /** + * L2 cache item count + */ + public final MetricsLongValue l2CacheCount = new MetricsLongValue("l2CacheCount", registry); + + /** + * L2 cache hit count + */ + public final MetricsLongValue l2CacheHitCount = new MetricsLongValue("l2CacheHitCount", registry); + + /** + * L2 cache miss count + */ + public final MetricsLongValue l2CacheMissCount = new MetricsLongValue("l2CacheMissCount", + registry); + + /** + * L2 evicted item count + */ + public final MetricsLongValue l2CacheEvictedCount = new MetricsLongValue("l2CacheEvictedCount", + registry); + + /** + * L2 cache hit ratio + */ + public final MetricsIntValue l2CacheHitRatioPastNPeriods = new MetricsIntValue("l2CacheHitRatioPastNPeriods", registry); + /* * Count of requests to the regionservers since last call to metrics update */ @@ -387,6 +424,13 @@ this.hdfsBlocksLocalityIndex.pushMetric(this.metricsRecord); this.blockCacheHitRatioPastNPeriods.pushMetric(this.metricsRecord); this.blockCacheHitCachingRatioPastNPeriods.pushMetric(this.metricsRecord); + this.l2CacheSize.pushMetric(this.metricsRecord); + this.l2CacheFree.pushMetric(this.metricsRecord); + this.l2CacheCount.pushMetric(this.metricsRecord); + this.l2CacheHitCount.pushMetric(this.metricsRecord); + this.l2CacheMissCount.pushMetric(this.metricsRecord); + this.l2CacheEvictedCount.pushMetric(this.metricsRecord); + this.l2CacheHitRatioPastNPeriods.pushMetric(this.metricsRecord); // Mix in HFile and HLog metrics // Be careful. Here is code for MTVR from up in hadoop: @@ -573,6 +617,20 @@ Long.valueOf(this.blockCacheHitRatio.get())+"%"); sb = Strings.appendKeyValue(sb, this.blockCacheHitCachingRatio.getName(), Long.valueOf(this.blockCacheHitCachingRatio.get())+"%"); + sb = Strings.appendKeyValue(sb, this.l2CacheSize.getName(), + this.l2CacheSize.get()); + sb = Strings.appendKeyValue(sb, this.l2CacheFree.getName(), + this.l2CacheFree.get()); + sb = Strings.appendKeyValue(sb, this.l2CacheCount.getName(), + this.l2CacheCount.get()); + sb = Strings.appendKeyValue(sb, this.l2CacheHitCount.getName(), + this.l2CacheHitCount.get()); + sb = Strings.appendKeyValue(sb, this.l2CacheMissCount.getName(), + this.l2CacheMissCount.get()); + sb = Strings.appendKeyValue(sb, this.l2CacheEvictedCount.getName(), + this.l2CacheEvictedCount.get()); + sb = Strings.appendKeyValue(sb, this.l2CacheHitRatioPastNPeriods.getName(), + this.l2CacheHitRatioPastNPeriods.get()); sb = Strings.appendKeyValue(sb, this.hdfsBlocksLocalityIndex.getName(), Long.valueOf(this.hdfsBlocksLocalityIndex.get())); sb = Strings.appendKeyValue(sb, "slowHLogAppendCount", Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -108,6 +108,7 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.L2Cache; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseServer; @@ -6032,7 +6033,12 @@ log.close(); // TODO: is this still right? BlockCache bc = new CacheConfig(c).getBlockCache(); - if (bc != null) bc.shutdown(); + L2Cache l2c = new CacheConfig(c).getL2Cache(); + try { + if (bc != null) bc.shutdown(); + } finally { + if (l2c != null) l2c.shutdown(); + } } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1531202) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -690,8 +690,13 @@ */ public synchronized void closeReader(boolean evictOnClose) throws IOException { + closeReader(evictOnClose, evictOnClose); + } + + public synchronized void closeReader(boolean evictL1OnClose, boolean evictL2OnClose) + throws IOException { if (this.reader != null) { - this.reader.close(evictOnClose); + this.reader.close(evictL1OnClose, evictL2OnClose); this.reader = null; } } @@ -1464,6 +1469,10 @@ reader.close(evictOnClose); } + public void close(boolean evictL1OnClose, boolean evictL2OnClose) throws IOException { + reader.close(evictL1OnClose, evictL2OnClose); + } + /** * Check if this storeFile may contain keys within the TimeRange that * have not expired (i.e. not older than oldestUnexpiredTS).