Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (revision 1530525)
+++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (working copy)
@@ -284,7 +284,7 @@
int numBlocksRead = 0;
long pos = 0;
while (pos < totalSize) {
- b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread);
+ b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread, false);
b.sanityCheck();
pos += block.length;
numBlocksRead++;
@@ -322,7 +322,7 @@
FSDataInputStream is = fs.open(path);
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, algo,
totalSize);
- HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
+ HFileBlock b = hbr.readBlockData(0, -1, -1, pread, false);
is.close();
assertEquals(0, HFile.getChecksumFailuresCount());
@@ -335,13 +335,15 @@
if (algo == GZ) {
is = fs.open(path);
hbr = new HFileBlock.FSReaderV2(is, algo, totalSize);
- b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS +
- b.totalChecksumBytes(), -1, pread);
+ b = hbr
+ .readBlockData(0,
+ 2173 + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS + b.totalChecksumBytes(), -1, pread,
+ false);
assertEquals(blockStr, b.toString());
int wrongCompressedSize = 2172;
try {
- b = hbr.readBlockData(0, wrongCompressedSize
- + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS, -1, pread);
+ b = hbr.readBlockData(0, wrongCompressedSize + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS,
+ -1, pread, false);
fail("Exception expected");
} catch (IOException ex) {
String expectedPrefix = "On-disk size without header provided is "
@@ -399,7 +401,7 @@
HFileBlock b;
int pos = 0;
for (int blockId = 0; blockId < numBlocks; ++blockId) {
- b = hbr.readBlockData(pos, -1, -1, pread);
+ b = hbr.readBlockData(pos, -1, -1, pread, false);
assertEquals(0, HFile.getChecksumFailuresCount());
b.sanityCheck();
pos += b.getOnDiskSizeWithHeader();
@@ -524,7 +526,7 @@
if (detailedLogging) {
LOG.info("Reading block #" + i + " at offset " + curOffset);
}
- HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread);
+ HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread, false);
if (detailedLogging) {
LOG.info("Block #" + i + ": " + b);
}
@@ -538,8 +540,8 @@
// Now re-load this block knowing the on-disk size. This tests a
// different branch in the loader.
- HFileBlock b2 = hbr.readBlockData(curOffset,
- b.getOnDiskSizeWithHeader(), -1, pread);
+ HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), -1, pread,
+ false);
b2.sanityCheck();
assertEquals(b.getBlockType(), b2.getBlockType());
@@ -647,7 +649,7 @@
HFileBlock b;
try {
long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
- b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread);
+ b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread, false);
} catch (IOException ex) {
LOG.error("Error in client " + clientId + " trying to read block at "
+ offset + ", pread=" + pread + ", withOnDiskSize=" +
Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java (revision 1530525)
+++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java (working copy)
@@ -187,7 +187,7 @@
int numBlocksRead = 0;
long pos = 0;
while (pos < totalSize) {
- b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread);
+ b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread, false);
b.sanityCheck();
pos += block.length;
numBlocksRead++;
@@ -221,8 +221,8 @@
FSDataInputStream is = fs.open(path);
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, is, algo,
- totalSize, MINOR_VERSION, fs, path);
- HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
+ totalSize, MINOR_VERSION, fs, path, null, null);
+ HFileBlock b = hbr.readBlockData(0, -1, -1, pread, false);
is.close();
b.sanityCheck();
@@ -233,15 +233,15 @@
if (algo == GZ) {
is = fs.open(path);
- hbr = new HFileBlock.FSReaderV2(is, is, algo, totalSize, MINOR_VERSION,
- fs, path);
- b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE_NO_CHECKSUM +
- b.totalChecksumBytes(), -1, pread);
+ hbr = new HFileBlock.FSReaderV2(is, is, algo, totalSize, MINOR_VERSION, fs, path, null,
+ null);
+ b = hbr.readBlockData(0,
+ 2173 + HFileBlock.HEADER_SIZE_NO_CHECKSUM + b.totalChecksumBytes(), -1, pread, false);
assertEquals(blockStr, b.toString());
int wrongCompressedSize = 2172;
try {
- b = hbr.readBlockData(0, wrongCompressedSize
- + HFileBlock.HEADER_SIZE_NO_CHECKSUM, -1, pread);
+ b = hbr.readBlockData(0, wrongCompressedSize + HFileBlock.HEADER_SIZE_NO_CHECKSUM, -1,
+ pread, false);
fail("Exception expected");
} catch (IOException ex) {
String expectedPrefix = "On-disk size without header provided is "
@@ -291,15 +291,15 @@
os.close();
FSDataInputStream is = fs.open(path);
- HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, is, algo,
- totalSize, MINOR_VERSION, fs, path);
+ HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, is, algo, totalSize,
+ MINOR_VERSION, fs, path, null, null);
hbr.setDataBlockEncoder(dataBlockEncoder);
hbr.setIncludesMemstoreTS(includesMemstoreTS);
HFileBlock b;
int pos = 0;
for (int blockId = 0; blockId < numBlocks; ++blockId) {
- b = hbr.readBlockData(pos, -1, -1, pread);
+ b = hbr.readBlockData(pos, -1, -1, pread, false);
b.sanityCheck();
pos += b.getOnDiskSizeWithHeader();
Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java (revision 1530525)
+++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java (working copy)
@@ -158,10 +158,9 @@
assertEquals(useChecksums?1:0, trailer.getMinorVersion());
assertEquals(entryCount, trailer.getEntryCount());
- HFileBlock.FSReader blockReader =
- new HFileBlock.FSReaderV2(fsdis,fsdis, compressAlgo, fileSize,
- this.useChecksums?HFileReaderV2.MAX_MINOR_VERSION:HFileReaderV2.MIN_MINOR_VERSION,
- null, null);
+ HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(fsdis, fsdis, compressAlgo,
+ fileSize, this.useChecksums ? HFileReaderV2.MAX_MINOR_VERSION
+ : HFileReaderV2.MIN_MINOR_VERSION, null, null, null, null);
// Comparator class name is stored in the trailer in version 2.
RawComparator comparator = trailer.createComparator();
HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
@@ -206,7 +205,7 @@
fsdis.seek(0);
long curBlockPos = 0;
while (curBlockPos <= trailer.getLastDataBlockOffset()) {
- HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false);
+ HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false, false);
assertEquals(BlockType.DATA, block.getBlockType());
ByteBuffer buf = block.getBufferWithoutHeader();
while (buf.hasRemaining()) {
@@ -249,7 +248,7 @@
while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
trailer.getLoadOnOpenDataOffset());
- HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false);
+ HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false, false);
assertEquals(BlockType.META, block.getBlockType());
Text t = new Text();
block.readInto(t);
Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java (working copy)
@@ -0,0 +1,270 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests L2 bucket cache for correctness
+ */
+@Category(MediumTests.class)
+@RunWith(Parameterized.class)
+public class TestL2BucketCache {
+
+ private static final Log LOG = LogFactory.getLog(TestL2BucketCache.class);
+
+ private static final int DATA_BLOCK_SIZE = 2048;
+ private static final int NUM_KV = 25000;
+ private static final int INDEX_BLOCK_SIZE = 512;
+ private static final int BLOOM_BLOCK_SIZE = 4096;
+ private static final StoreFile.BloomType BLOOM_TYPE =
+ StoreFile.BloomType.ROWCOL;
+
+ private static final HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+ private static final HFileDataBlockEncoderImpl ENCODER =
+ new HFileDataBlockEncoderImpl(DataBlockEncoding.PREFIX);
+
+ private BucketCache underlyingCache;
+ private MockedL2Cache mockedL2Cache;
+
+ private Configuration conf;
+ private CacheConfig cacheConf;
+ private FileSystem fs;
+ private Path storeFilePath;
+
+ private final Random rand = new Random(12983177L);
+ private final String ioEngineName;
+
+ public TestL2BucketCache(String ioEngineName) {
+ this.ioEngineName = ioEngineName;
+ }
+
+ @Parameterized.Parameters
+ public static Collection
+ * The reason that this is static member of HFileBlock is that it needs
+ * to be called by classes that read from {@link L2Cache}, but at the mean
+ * time needs to have access to private members of HFileBlock (such as the
+ * internal buffers).
+ *
+ * TODO (avf): re-factor HFileBlock such that this method could be
+ * a non-static member of another class
+ * @param rawBytes The compressed and encoded byte array (essentially this
+ * is the block as it would appear on disk or in
+ * {@link L2Cache}
+ * @param compressAlgo Compression algorithm used to encode the block
+ * @param includeMemStoreTs Should memstore timestamp be included?
+ * @param offset Offset within the HFile at which the block is located
+ * @return An instantiated, uncompressed, decoded in-memory representation
+ * of the HFileBlock that can be scanned through or cached in the
+ * L1 block cache
+ * @throws IOException If there is an error de-compressed, de-coding, or
+ * otherwise parsing the raw byte array encoding the block.
+ */
+ public static HFileBlock fromBytes(byte[] rawBytes, Algorithm compressAlgo,
+ boolean includeMemStoreTs, long offset, int minorVersion) throws IOException {
+ HFileBlock b;
+ if (compressAlgo == Algorithm.NONE) {
+ b = new HFileBlock(ByteBuffer.wrap(rawBytes), minorVersion);
+ b.assumeUncompressed();
+ } else {
+ b = new HFileBlock(ByteBuffer.wrap(rawBytes, 0, headerSize(minorVersion)), minorVersion);
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(rawBytes,
+ headerSize(minorVersion), rawBytes.length - headerSize(minorVersion)));
+ b.allocateBuffer(true);
+ AbstractFSReader.decompress(compressAlgo, b.buf.array(), b.buf.arrayOffset()
+ + headerSize(minorVersion), dis, b.uncompressedSizeWithoutHeader);
+ }
+ b.includesMemstoreTS = includeMemStoreTs;
+ b.offset = offset;
+ return b;
+ }
+
public BlockType getBlockType() {
return blockType;
}
@@ -1102,6 +1144,20 @@
}
/**
+ * Returns the header or the compressed data (or uncompressed data when not
+ * using compression) as a byte array. Can be called in the "writing" state
+ * or in the "block ready" state. If called in the "writing" state,
+ * transitions the writer to the "block ready" state.
+ *
+ * @return header and data as they would be stored on disk in a byte array
+ * @throws IOException
+ */
+ public byte[] getHeaderAndData() throws IOException {
+ ensureBlockReady();
+ return onDiskBytesWithHeader;
+ }
+
+ /**
* Releases the compressor this writer uses to compress blocks into the
* compressor pool. Needs to be called before the writer is discarded.
*/
@@ -1266,12 +1322,13 @@
* @param offset
* @param onDiskSize the on-disk size of the entire block, including all
* applicable headers, or -1 if unknown
+ * @param addToL2Cache if true add the compressed block to L2 cache
* @param uncompressedSize the uncompressed size of the compressed part of
* the block, or -1 if unknown
* @return the newly read block
*/
HFileBlock readBlockData(long offset, long onDiskSize,
- int uncompressedSize, boolean pread) throws IOException;
+ int uncompressedSize, boolean pread, boolean addToL2Cache) throws IOException;
/**
* Creates a block iterator over the given portion of the {@link HFile}.
@@ -1347,7 +1404,7 @@
public HFileBlock nextBlock() throws IOException {
if (offset >= endOffset)
return null;
- HFileBlock b = readBlockData(offset, -1, -1, false);
+ HFileBlock b = readBlockData(offset, -1, -1, false, false);
offset += b.getOnDiskSizeWithHeader();
return b;
}
@@ -1449,15 +1506,32 @@
* uncompressed data size, header not included
* @throws IOException
*/
- protected void decompress(byte[] dest, int destOffset,
- InputStream bufferedBoundedStream,
+ protected void decompress(byte[] dest, int destOffset, InputStream bufferedBoundedStream,
int uncompressedSize) throws IOException {
+ decompress(compressAlgo, dest, destOffset, bufferedBoundedStream, uncompressedSize);
+ }
+
+ /**
+ * Decompresses a given stream using a specified compression algorithm.
+ *
+ * This method is static so that it can be used by methods that construct
+ * an HFileBlock from a raw byte array.
+ * @param compressAlgo The specified compression algorithm
+ * @param dest Write decompressed bytes into this byte array
+ * @param destOffset Offset within the dest byte array
+ * @param bufferedBoundedStream Input stream from which compressed data is
+ * read
+ * @param uncompressedSize The expected un-compressed size of the data
+ * @throws IOException If there is an error during de-compression
+ */
+ protected static void decompress(Algorithm compressAlgo, byte[] dest,
+ int destOffset, InputStream bufferedBoundedStream,
+ int uncompressedSize) throws IOException {
Decompressor decompressor = null;
try {
decompressor = compressAlgo.getDecompressor();
InputStream is = compressAlgo.createDecompressionStream(
bufferedBoundedStream, decompressor, 0);
-
IOUtils.readFully(is, dest, destOffset, uncompressedSize);
is.close();
} finally {
@@ -1529,7 +1603,7 @@
*/
@Override
public HFileBlock readBlockData(long offset, long onDiskSizeWithMagic,
- int uncompressedSizeWithMagic, boolean pread) throws IOException {
+ int uncompressedSizeWithMagic, boolean pread, boolean addToL2Cache) throws IOException {
if (uncompressedSizeWithMagic <= 0) {
throw new IOException("Invalid uncompressedSize="
+ uncompressedSizeWithMagic + " for a version 1 block");
@@ -1606,7 +1680,15 @@
/** Reads version 2 blocks from the filesystem. */
static class FSReaderV2 extends AbstractFSReader {
+ /** L2 cache instance or null if l2 cache is disabled */
+ private final L2Cache l2Cache;
+ /**
+ * Name of the current hfile. Used to compose the key in for the
+ * L2 cache if enabled.
+ */
+ private final String hfileNameForL2Cache;
+
// The configuration states that we should validate hbase checksums
private final boolean useHBaseChecksumConfigured;
@@ -1635,9 +1717,9 @@
}
};
- public FSReaderV2(FSDataInputStream istream,
- FSDataInputStream istreamNoFsChecksum, Algorithm compressAlgo,
- long fileSize, int minorVersion, HFileSystem hfs, Path path)
+ public FSReaderV2(FSDataInputStream istream, FSDataInputStream istreamNoFsChecksum,
+ Algorithm compressAlgo, long fileSize, int minorVersion, HFileSystem hfs, Path path,
+ L2Cache l2Cache, String hfileNameForL2Cache)
throws IOException {
super(istream, istreamNoFsChecksum, compressAlgo, fileSize,
minorVersion, hfs, path);
@@ -1659,6 +1741,8 @@
useHBaseChecksum = false;
}
this.useHBaseChecksumConfigured = useHBaseChecksum;
+ this.l2Cache = l2Cache;
+ this.hfileNameForL2Cache = hfileNameForL2Cache;
}
/**
@@ -1668,7 +1752,7 @@
FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo,
long fileSize) throws IOException {
this(istream, istream, compressAlgo, fileSize,
- HFileReaderV2.MAX_MINOR_VERSION, null, null);
+ HFileReaderV2.MAX_MINOR_VERSION, null, null, null, null);
}
/**
@@ -1681,10 +1765,12 @@
* @param uncompressedSize the uncompressed size of the the block. Always
* expected to be -1. This parameter is only used in version 1.
* @param pread whether to use a positional read
+ * @param addToL2Cache if true, will cache the block on read in the L2
+ * cache, if the L2 cache is enabled.
*/
@Override
public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
- int uncompressedSize, boolean pread) throws IOException {
+ int uncompressedSize, boolean pread, boolean addToL2Cache) throws IOException {
// It is ok to get a reference to the stream here without any
// locks because it is marked final.
@@ -1700,10 +1786,8 @@
is = this.istream;
}
- HFileBlock blk = readBlockDataInternal(is, offset,
- onDiskSizeWithHeaderL,
- uncompressedSize, pread,
- doVerificationThruHBaseChecksum);
+ HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, uncompressedSize,
+ pread, doVerificationThruHBaseChecksum, addToL2Cache);
if (blk == null) {
HFile.LOG.warn("HBase checksum verification failed for file " +
path + " at offset " +
@@ -1731,9 +1815,8 @@
this.useHBaseChecksum = false;
doVerificationThruHBaseChecksum = false;
is = this.istream;
- blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
- uncompressedSize, pread,
- doVerificationThruHBaseChecksum);
+ blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, uncompressedSize, pread,
+ doVerificationThruHBaseChecksum, addToL2Cache);
if (blk != null) {
HFile.LOG.warn("HDFS checksum verification suceeded for file " +
path + " at offset " +
@@ -1776,10 +1859,9 @@
* If HBase checksum is switched off, then use HDFS checksum.
* @return the HFileBlock or null if there is a HBase checksum mismatch
*/
- private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
- long onDiskSizeWithHeaderL,
- int uncompressedSize, boolean pread, boolean verifyChecksum)
- throws IOException {
+ private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
+ long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread, boolean verifyChecksum,
+ boolean addToL2Cache) throws IOException {
if (offset < 0) {
throw new IOException("Invalid offset=" + offset + " trying to read "
+ "block (onDiskSize=" + onDiskSizeWithHeaderL
@@ -1841,7 +1923,9 @@
+ preReadHeaderSize, onDiskSizeWithHeader
- preReadHeaderSize, true, offset + preReadHeaderSize,
pread);
-
+ if (addToL2Cache) {
+ cacheBlockInL2Cache(offset, headerAndData.array());
+ }
b = new HFileBlock(headerAndData, getMinorVersion());
b.assumeUncompressed();
b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
@@ -1860,9 +1944,9 @@
preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
true, offset + preReadHeaderSize, pread);
- if (header == null)
+ if (header == null) {
header = onDiskBlock;
-
+ }
try {
b = new HFileBlock(ByteBuffer.wrap(header, 0, hdrSize),
getMinorVersion());
@@ -1880,7 +1964,15 @@
!validateBlockChecksum(b, onDiskBlock, hdrSize)) {
return null; // checksum mismatch
}
-
+ if (l2Cache != null && addToL2Cache) {
+ if (preReadHeaderSize > 0) {
+ // If we plan to add block to L2 cache, we need to copy the
+ // header information into the byte array so that it can be
+ // cached in the L2 cache.
+ System.arraycopy(header, 0, onDiskBlock, 0, preReadHeaderSize);
+ }
+ cacheBlockInL2Cache(offset, onDiskBlock);
+ }
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
onDiskBlock, hdrSize, onDiskSizeWithoutHeader));
@@ -1941,7 +2033,9 @@
!validateBlockChecksum(b, b.buf.array(), hdrSize)) {
return null; // checksum mismatch
}
-
+ if (addToL2Cache) {
+ cacheBlockInL2Cache(offset, b.buf.array());
+ }
if (b.nextBlockOnDiskSizeWithHeader > 0) {
setNextBlockHeader(offset, b);
}
@@ -1949,7 +2043,6 @@
// Allocate enough space for the block's header and compressed data.
byte[] compressedBytes = new byte[b.getOnDiskSizeWithHeader()
+ hdrSize];
-
b.nextBlockOnDiskSizeWithHeader = readAtOffset(is, compressedBytes,
hdrSize, b.onDiskSizeWithoutHeader, true, offset
+ hdrSize, pread);
@@ -1957,6 +2050,13 @@
!validateBlockChecksum(b, compressedBytes, hdrSize)) {
return null; // checksum mismatch
}
+ if (l2Cache != null && addToL2Cache) {
+ // If l2 cache is enabled, we need to copy the header bytes to
+ // the compressed bytes array, so that they can be cached in the
+ // L2 cache.
+ System.arraycopy(headerBuf.array(), 0, compressedBytes, 0, hdrSize);
+ cacheBlockInL2Cache(offset, compressedBytes);
+ }
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
compressedBytes, hdrSize, b.onDiskSizeWithoutHeader));
@@ -2001,6 +2101,20 @@
}
/**
+ * If L2 cache is enabled, associates current hfile name and
+ * offset with the specified byte array representing the
+ * compressed and encoded block.
+ * @param offset The block's offset within the hfile
+ * @param blockBytes The block's bytes as they appear on disk (i.e.,
+ * correctly encoded and compressed)
+ */
+ void cacheBlockInL2Cache(long offset, byte[] blockBytes) {
+ if (l2Cache != null) {
+ l2Cache.cacheRawBlock(hfileNameForL2Cache, offset, blockBytes);
+ }
+ }
+
+ /**
* Generates the checksum for the header as well as the data and
* then validates that it matches the value stored in the header.
* If there is a checksum mismatch, then return false. Otherwise
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java (revision 1530525)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java (working copy)
@@ -111,6 +111,11 @@
public long getBlockCount();
/**
+ * Clear the cache. Used in unit tests. Don't call this in production.
+ */
+ public void clearCache();
+
+ /**
* Performs a BlockCache summary and returns a List of BlockCacheColumnFamilySummary objects.
* This method could be fairly heavyweight in that it evaluates the entire HBase file-system
* against what is in the RegionServer BlockCache.
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (revision 1530525)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (working copy)
@@ -238,9 +238,8 @@
// Cache Miss, please load.
}
- HFileBlock hfileBlock = fsBlockReader.readBlockData(offset,
- nextOffset - offset, metaBlockIndexReader.getRootBlockDataSize(block),
- true);
+ HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset - offset,
+ metaBlockIndexReader.getRootBlockDataSize(block), true, false);
passSchemaMetricsTo(hfileBlock);
hfileBlock.expectType(BlockType.META);
@@ -314,8 +313,8 @@
nextOffset = dataBlockIndexReader.getRootBlockOffset(block + 1);
}
- HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset
- - offset, dataBlockIndexReader.getRootBlockDataSize(block), pread);
+ HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset - offset,
+ dataBlockIndexReader.getRootBlockDataSize(block), pread, false);
passSchemaMetricsTo(hfileBlock);
hfileBlock.expectType(BlockType.DATA);
@@ -389,6 +388,12 @@
getSchemaMetrics().flushMetrics();
}
+ @Override
+ public void close(boolean evictL1OnClose, boolean evictL2OnClose)
+ throws IOException {
+ close(evictL1OnClose); // HFileReaderV1 does not support L2 cache
+ }
+
protected abstract static class AbstractScannerV1
extends AbstractHFileReader.Scanner {
protected int currBlock;
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java (working copy)
@@ -0,0 +1,156 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.util.DirectMemoryUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryUsage;
+
+/**
+ * A singleton implementation of {@link L2CacheFactory} that creates
+ * {@link L2BucketCache} instances. This is a lazy singleton:
+ * first call to getInstance() will initialize and cache an instance,
+ * subsequent invocations will return the cached instance. All (static and
+ * per-instance) class methods are thread-safe.
+ *
+ * TODO (avf): remove all singleton functionality once Guice or other DI
+ * mechanism is available
+ */
+public class L2BucketCacheFactory implements L2CacheFactory {
+
+ private static final Log LOG = LogFactory.getLog(L2BucketCacheFactory.class);
+
+ /** Cached singleton instance or null if not initialized */
+ private static L2BucketCacheFactory instance;
+
+ private L2BucketCacheFactory() { } // Private as this is a singleton
+
+ /**
+ * Returns a single cached instance of this class, initializing if needed.
+ * @return A cached and instantiated instance of this class
+ */
+ public synchronized static L2BucketCacheFactory getInstance() {
+ if (instance == null) {
+ instance = new L2BucketCacheFactory();
+ }
+ return instance;
+ }
+
+ /** Cached instance of the L2Cache or null if not initialized */
+ private L2Cache l2Cache;
+
+ // Allows to short circuit getL2Cache() calls if the cache is disabled
+ private boolean l2CacheDisabled;
+
+ private static String bucketSizesToString(int[] bucketSizes) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Configured bucket sizes: ");
+ for (int bucketSize : bucketSizes) {
+ sb.append(StringUtils.humanReadableInt(bucketSize));
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+ /**
+ * Returns a cached initialized instance of {@link L2BucketCache}. Follows
+ * lazy initialization pattern: first invocation creates and initializes the
+ * instance, subsequent invocations return the cached instance.
+ * @param conf The configuration to pass to {@link L2BucketCache}
+ * @return The {@link L2BucketCache} instance
+ */
+ @Override
+ public synchronized L2Cache getL2Cache(Configuration conf) {
+ Preconditions.checkNotNull(conf);
+
+ if (l2Cache != null) {
+ return l2Cache;
+ }
+ if (l2CacheDisabled) {
+ return null;
+ }
+
+ // If no IOEngine is specified for the L2 cache, assume the L2 cache
+ // is disabled
+ String bucketCacheIOEngineName =
+ conf.get(CacheConfig.L2_BUCKET_CACHE_IOENGINE_KEY, null);
+ if (bucketCacheIOEngineName == null) {
+ l2CacheDisabled = true;
+ return null;
+ }
+
+ long maxMem = 0;
+ if (bucketCacheIOEngineName.equals("offheap")) {
+ maxMem = DirectMemoryUtils.getDirectMemorySize();
+ } else if (bucketCacheIOEngineName.equals("heap")) {
+ MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+ maxMem = mu.getMax();
+ }
+
+ // Unless a percentage of absolute size is set, assume the cache is
+ // disabled. This is analogous to default behaviour with LruBlockCache.
+ float bucketCachePercentage =
+ conf.getFloat(CacheConfig.L2_BUCKET_CACHE_SIZE_KEY, 0F);
+ long bucketCacheSize = (long) (bucketCachePercentage < 1
+ ?maxMem * bucketCachePercentage :
+ bucketCachePercentage * 1024 * 1024);
+ if (bucketCacheSize > 0) {
+ int writerThreads = conf.getInt(
+ CacheConfig.L2_BUCKET_CACHE_WRITER_THREADS_KEY,
+ BucketCache.DEFAULT_WRITER_THREADS);
+ int writerQueueLen = conf.getInt(CacheConfig.L2_BUCKET_CACHE_QUEUE_KEY,
+ BucketCache.DEFAULT_WRITER_QUEUE_ITEMS);
+ int ioErrorsTolerationDuration =
+ conf.getInt(CacheConfig.L2_BUCKET_CACHE_IOENGINE_ERRORS_TOLERATED_DURATION_KEY,
+ BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
+ try {
+ int[] bucketSizes = CacheConfig.getL2BucketSizes(conf);
+ LOG.info(bucketSizesToString(bucketSizes));
+ long bucketCacheInitStartMs = EnvironmentEdgeManager.currentTimeMillis();
+ BucketCache bucketCache = new BucketCache(bucketCacheIOEngineName,
+ bucketCacheSize, writerThreads, writerQueueLen,
+ ioErrorsTolerationDuration, bucketSizes, conf);
+ l2Cache = new L2BucketCache(bucketCache);
+ long bucketCacheInitElapsedMs =
+ EnvironmentEdgeManager.currentTimeMillis() - bucketCacheInitStartMs;
+ LOG.info("L2BucketCache instantiated in " + bucketCacheInitElapsedMs +
+ " ms.; bucketCacheIOEngine=" + bucketCacheIOEngineName +
+ ", bucketCacheSize=" + bucketCacheSize +
+ ", writerThreads=" + writerThreads + ", writerQueueLen=" +
+ writerQueueLen + ", ioErrorsTolerationDuration=" +
+ ioErrorsTolerationDuration);
+ } catch (IOException ioe) {
+ LOG.error("Can't instantiate L2 cache with bucket cache engine",
+ ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+ return l2Cache;
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCacheFactory.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCacheFactory.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCacheFactory.java (working copy)
@@ -0,0 +1,114 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.util.StringUtils;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryUsage;
+
+/**
+ * A singleton implementation of {@link BlockCacheFactory} that creates
+ * {@link LruBlockCache} instances. This is a lazy singleton:
+ * first call to getInstance() will initialize and cache an instance,
+ * subsequent invocations will return the cached instances. All (static
+ * and per-instance) class methods are thread-safe.
+ *
+ * TODO (avf): remove all singleton functionality once Guice or other DI
+ * mechanism is available
+ */
+public class LruBlockCacheFactory implements BlockCacheFactory {
+
+ private static final Log LOG = LogFactory.getLog(LruBlockCache.class);
+
+ /** Cached singleton instance or null if not initialized */
+ private static LruBlockCacheFactory instance;
+
+ private LruBlockCacheFactory() { } // Private as this is a singleton
+
+ public synchronized static LruBlockCacheFactory getInstance() {
+ if (instance == null) {
+ instance = new LruBlockCacheFactory();
+ }
+ return instance;
+ }
+
+ /** Cached instance of the BlockCache or null if not initialized */
+ private LruBlockCache blockCache;
+
+ // ALlows to short circuit getBlockCache calls if the cache is disabled
+ private boolean blockCacheDisabled;
+
+ /**
+ * Returns the current underlying block cache instance or null if
+ * block cache is disabled or not initialized. Used by
+ * {@link SchemaMetrics}
+ * @return The current block cache instance, null if disabled or
+ * or not initialized
+ */
+ public synchronized LruBlockCache getCurrentBlockCacheInstance() {
+ return blockCache;
+ }
+
+ /**
+ * Returns a cached initialized instance of {@link LruBlockCache}.
+ * Follows lazy initialization pattern: first invocation creates and
+ * initializes the instance, subsequent invocations return the cached
+ * instance. This replaced a static "instantiateBlockCache()" method
+ * in CacheConfig.
+ * * @param conf The HBase configuration needed to create the instance
+ * @return The {@link LruBlockCache} instance.
+ */
+ @Override
+ public synchronized BlockCache getBlockCache(Configuration conf) {
+ Preconditions.checkNotNull(conf);
+
+ if (blockCache != null)
+ return blockCache;
+ if (blockCacheDisabled)
+ return null;
+
+ float cachePercentage = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY,
+ HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
+ if (cachePercentage == 0L) {
+ blockCacheDisabled = true;
+ return null;
+ }
+ if (cachePercentage > 1.0) {
+ throw new IllegalArgumentException(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY +
+ " must be between 0.0 and 1.0, not > 1.0");
+ }
+
+ // Calculate the amount of heap to give the heap.
+ MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+ long cacheSize = (long)(mu.getMax() * cachePercentage);
+ LOG.info("Allocating LruBlockCache with maximum size " +
+ StringUtils.humanReadableInt(cacheSize));
+ blockCache = new LruBlockCache(cacheSize, StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf);
+ return blockCache;
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (revision 1530525)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (working copy)
@@ -773,12 +773,15 @@
/** Block cache, or null if cache-on-write is disabled */
private BlockCache blockCache;
+ /** L2Cache, or null if cache-on-write is disabled */
+ private L2Cache l2Cache;
+
/** Name to use for computing cache keys */
private String nameForCaching;
/** Creates a single-level block index writer */
public BlockIndexWriter() {
- this(null, null, null);
+ this(null, null, null, null);
singleLevelOnly = true;
}
@@ -790,14 +793,14 @@
* on write into this block cache.
*/
public BlockIndexWriter(HFileBlock.Writer blockWriter,
- BlockCache blockCache, String nameForCaching) {
- if ((blockCache == null) != (nameForCaching == null)) {
- throw new IllegalArgumentException("Block cache and file name for " +
- "caching must be both specified or both null");
+ BlockCache blockCache, L2Cache l2Cache, String nameForCaching) {
+ if (nameForCaching == null && (blockCache != null || l2Cache != null)) {
+ throw new IllegalArgumentException("If BlockCache OR L2Cache are " +
+ " not null, then nameForCaching must NOT be null");
}
-
this.blockWriter = blockWriter;
this.blockCache = blockCache;
+ this.l2Cache = l2Cache;
this.nameForCaching = nameForCaching;
this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
}
@@ -958,6 +961,9 @@
beginOffset, DataBlockEncoding.NONE,
blockForCaching.getBlockType()), blockForCaching);
}
+ if (l2Cache != null) {
+ l2Cache.cacheRawBlock(nameForCaching, beginOffset, blockWriter.getHeaderAndData());
+ }
// Add intermediate index block size
totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java (working copy)
@@ -0,0 +1,59 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A class implementing IOEngine interface could support data services for
+ * {@link BucketCache}.
+ */
+public interface IOEngine {
+
+ /**
+ * Transfers data from IOEngine to the given byte buffer
+ * @param dst the given byte array into which bytes are to be written
+ * @param offset The offset in the IO engine where the first byte to be read
+ * @throws IOException
+ */
+ void read(byte[] dst, long offset) throws IOException;
+
+ /**
+ * Transfers data from the given byte buffer to IOEngine
+ * @param src the given byte array from which bytes are to be read
+ * @param offset The offset in the IO engine where the first byte to be
+ * written
+ * @throws IOException
+ */
+ void write(byte[] src, long offset) throws IOException;
+
+ /**
+ * Sync the data to IOEngine after writing
+ * @throws IOException
+ */
+ void sync() throws IOException;
+
+ /**
+ * Shutdown the IOEngine
+ */
+ void shutdown();
+}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java (working copy)
@@ -0,0 +1,34 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.io.IOException;
+
+
+/**
+ * Thrown by {@link BucketAllocator}
+ */
+public class BucketAllocatorException extends IOException {
+ private static final long serialVersionUID = 2479119906660788096L;
+
+ BucketAllocatorException(String reason) {
+ super(reason);
+ }
+}
+
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java (working copy)
@@ -0,0 +1,120 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+
+/**
+ * A memory-bound queue that will grow until an element brings total size larger
+ * than maxSize. From then on, only entries that are sorted larger than the
+ * smallest current entry will be inserted/replaced.
+ *
+ *
+ * Use this when you want to find the largest elements (according to their
+ * ordering, not their heap size) that consume as close to the specified maxSize
+ * as possible. Default behavior is to grow just above rather than just below
+ * specified max.
+ */
+public class CachedEntryQueue {
+
+ private MinMaxPriorityQueue> queue;
+
+ private long cacheSize;
+ private long maxSize;
+
+ /**
+ * @param maxSize the target size of elements in the queue
+ * @param blockSize expected average size of blocks
+ */
+ public CachedEntryQueue(long maxSize, long blockSize) {
+ int initialSize = (int) (maxSize / blockSize);
+ if (initialSize == 0)
+ initialSize++;
+ queue = MinMaxPriorityQueue
+ .orderedBy(new Comparator>() {
+ public int compare(Entry entry1,
+ Entry entry2) {
+ return entry1.getValue().compareTo(entry2.getValue());
+ }
+
+ }).expectedSize(initialSize).create();
+ cacheSize = 0;
+ this.maxSize = maxSize;
+ }
+
+ /**
+ * Attempt to add the specified entry to this queue.
+ *
+ *
+ * If the queue is smaller than the max size, or if the specified element is
+ * ordered after the smallest element in the queue, the element will be added
+ * to the queue. Otherwise, there is no side effect of this call.
+ * @param entry a bucket entry with key to try to add to the queue
+ */
+ public void add(Map.Entry entry) {
+ if (cacheSize < maxSize) {
+ queue.add(entry);
+ cacheSize += entry.getValue().getLength();
+ } else {
+ BucketEntry head = queue.peek().getValue();
+ if (entry.getValue().compareTo(head) > 0) {
+ cacheSize += entry.getValue().getLength();
+ cacheSize -= head.getLength();
+ if (cacheSize > maxSize) {
+ queue.poll();
+ } else {
+ cacheSize += head.getLength();
+ }
+ queue.add(entry);
+ }
+ }
+ }
+
+ /**
+ * @return The next element in this queue, or {@code null} if the queue is
+ * empty.
+ */
+ public Map.Entry poll() {
+ return queue.poll();
+ }
+
+ /**
+ * @return The last element in this queue, or {@code null} if the queue is
+ * empty.
+ */
+ public Map.Entry pollLast() {
+ return queue.pollLast();
+ }
+
+ /**
+ * Total size of all elements in this queue.
+ * @return size of all elements currently in queue, in bytes
+ */
+ public long cacheSize() {
+ return cacheSize;
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java (working copy)
@@ -0,0 +1,993 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.CachedBlock;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A modified version of BucketCache (by TaoBao/FusionIO) imported from
+ * HBASE-7404 patch. Simplified to only handle byte arrays (for use as a
+ * L2 cache).
+ */
+public class BucketCache implements HeapSize {
+
+ private static final Log LOG = LogFactory.getLog(BucketCache.class);
+
+ // TODO: these must be configurable
+ /** Priority buckets */
+ private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
+ private static final float DEFAULT_MULTI_FACTOR = 0.50f;
+ private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
+ private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
+
+ private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
+ private static final float DEFAULT_MIN_FACTOR = 0.85f;
+
+ /** Statistics thread */
+ private static final int statThreadPeriod = 3 * 60;
+
+ public final static int DEFAULT_WRITER_THREADS = 3;
+ public final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
+
+ // Store/read block data
+ private final IOEngine ioEngine;
+
+ // Store the block in this map before writing it to cache
+ private final ConcurrentHashMap ramCache;
+ // In this map, store the block's meta data like offset, length
+ private final ConcurrentHashMap backingMap;
+
+ /**
+ * Flag if the cache is enabled or not... We shut it off if there are IO
+ * errors for some time, so that Bucket IO exceptions/errors don't bring down
+ * the HBase server.
+ */
+ private volatile boolean cacheEnabled;
+
+ private final ArrayList> writerQueues =
+ new ArrayList>();
+
+ private final WriterThread writerThreads[];
+
+ /** Volatile boolean to track if free space is in process or not */
+ private volatile boolean freeInProgress = false;
+ private final Lock freeSpaceLock = new ReentrantLock();
+
+ private final AtomicLong realCacheSize = new AtomicLong(0);
+ private final AtomicLong heapSize = new AtomicLong(0);
+ /** Current number of cached elements */
+ private final AtomicLong blockNumber = new AtomicLong(0);
+ private final AtomicLong failedBlockAdditions = new AtomicLong(0);
+
+ /** Cache access count (sequential ID) */
+ private final AtomicLong accessCount = new AtomicLong(0);
+
+ private final Object[] cacheWaitSignals;
+ private static final int DEFAULT_CACHE_WAIT_TIME = 50;
+
+
+ // Used in test now. If the flag is false and the cache speed is very fast,
+ // bucket cache will skip some blocks when caching. If the flag is true, we
+ // will wait blocks flushed to IOEngine for some time when caching
+ boolean wait_when_cache = false;
+
+ private BucketCacheStats cacheStats = new BucketCacheStats();
+
+ /** Approximate block size */
+ private final long blockSize;
+
+ /** Duration of IO errors tolerated before we disable cache, 1 min as default */
+ private final int ioErrorsTolerationDuration;
+ // 1 min
+ public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
+ // Start time of first IO error when reading or writing IO Engine, it will be
+ // reset after a successful read/write.
+ private volatile long ioErrorStartTime = -1;
+
+ /** Minimum buffer size for ByteBufferIOEngine */
+ public static final int MIN_BUFFER_SIZE = 4 * 1024 * 1024;
+
+ /**
+ * A "sparse lock" implementation allowing to lock on a particular block
+ * identified by offset. The purpose of this is to avoid freeing the block
+ * which is being read.
+ *
+ * TODO:We could extend the IdLock to IdReadWriteLock for better.
+ */
+ private IdLock offsetLock = new IdLock();
+
+ private final ConcurrentIndex blocksByHFile =
+ new ConcurrentIndex(new Comparator() {
+ @Override
+ public int compare(BlockCacheKey a, BlockCacheKey b) {
+ if (a.getOffset() == b.getOffset()) {
+ return 0;
+ } else if (a.getOffset() < b.getOffset()) {
+ return -1;
+ }
+ return 1;
+ }
+ });
+
+ /** Statistics thread schedule pool (for heavy debugging, could remove) */
+ private final ScheduledExecutorService scheduleThreadPool =
+ Executors.newScheduledThreadPool(1,
+ new ThreadFactoryBuilder()
+ .setNameFormat("BucketCache Statistics #%d")
+ .setDaemon(true)
+ .build());
+
+ private final int[] bucketSizes;
+ // Allocate or free space for the block
+ private final BucketAllocator bucketAllocator;
+
+ // TODO (avf): perhaps use a Builder or a separate config object?
+ public BucketCache(String ioEngineName, long capacity, int writerThreadNum,
+ int writerQLen, int ioErrorsTolerationDuration, int[] bucketSizes,
+ Configuration conf)
+ throws IOException {
+ this.bucketSizes = bucketSizes;
+ this.ioEngine = getIOEngineFromName(ioEngineName, capacity, conf);
+ this.writerThreads = new WriterThread[writerThreadNum];
+ this.cacheWaitSignals = new Object[writerThreadNum];
+ long blockNumCapacity = capacity / 16384;
+ if (blockNumCapacity >= Integer.MAX_VALUE) {
+ // Enough for about 32TB of cache!
+ throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
+ }
+
+ this.blockSize = StoreFile.DEFAULT_BLOCKSIZE_SMALL;
+ this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
+
+ bucketAllocator = new BucketAllocator(bucketSizes, capacity);
+ for (int i = 0; i < writerThreads.length; ++i) {
+ writerQueues.add(new ArrayBlockingQueue(writerQLen));
+ this.cacheWaitSignals[i] = new Object();
+ }
+
+ this.ramCache = new ConcurrentHashMap();
+
+ this.backingMap = new ConcurrentHashMap((int) blockNumCapacity);
+
+ final String threadName = Thread.currentThread().getName();
+ this.cacheEnabled = true;
+ for (int i = 0; i < writerThreads.length; ++i) {
+ writerThreads[i] = new WriterThread(writerQueues.get(i), i);
+ writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
+ writerThreads[i].start();
+ }
+ // Run the statistics thread periodically to print the cache statistics log
+ this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
+ statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
+ LOG.info("Started bucket cache");
+ }
+
+ /**
+ * Get the IOEngine from the IO engine name
+ * @param ioEngineName Name of the io engine
+ * @param capacity Maximum capacity of the io engine
+ * @param conf Optional configuration object for additional parameters
+ * @return Instance of the correct IOEngine
+ * @throws IllegalArgumentException If the name of the io engine is invalid
+ * @throws IOException If there is an error instantiating the io engine
+ */
+ private IOEngine getIOEngineFromName(String ioEngineName, long capacity,
+ Configuration conf)
+ throws IOException {
+ int requestedBufferSize = conf == null ?
+ CacheConfig.DEFAULT_L2_BUCKET_CACHE_BUFFER_SIZE :
+ conf.getInt(CacheConfig.L2_BUCKET_CACHE_BUFFER_SIZE_KEY,
+ CacheConfig.DEFAULT_L2_BUCKET_CACHE_BUFFER_SIZE);
+ int bufferSize = Math.max(requestedBufferSize, bucketSizes[0]);
+ boolean isDirect;
+ if (ioEngineName.startsWith("offheap"))
+ isDirect = true;
+ else if (ioEngineName.startsWith("heap"))
+ isDirect = false;
+ else
+ throw new IllegalArgumentException(
+ "Don't understand io engine name " + ioEngineName +
+ " for cache. Must be heap or offheap");
+ LOG.info("Initiating ByteBufferIOEngine with " + ioEngineName +
+ " allocation...");
+ if (bufferSize != requestedBufferSize) {
+ LOG.warn("Requested per-buffer size " + requestedBufferSize +
+ ", but actual per-buffer size will be: " + bufferSize);
+ } else {
+ LOG.info("Size per-buffer: " + StringUtils.humanReadableInt(bufferSize));
+ }
+ return new ByteBufferIOEngine(capacity, bufferSize, isDirect);
+ }
+
+ /**
+ * Cache the block with the specified name and buffer.
+ * @param cacheKey block's cache key
+ * @param buf block buffer
+ */
+ public void cacheBlock(BlockCacheKey cacheKey, byte[] buf) {
+ cacheBlock(cacheKey, buf, false);
+ }
+
+ /**
+ * Cache the block with the specified name and buffer.
+ * @param cacheKey block's cache key
+ * @param cachedItem block buffer
+ * @param inMemory if block is in-memory
+ */
+ public void cacheBlock(BlockCacheKey cacheKey, byte[] cachedItem, boolean inMemory) {
+ cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
+ }
+
+ /**
+ * Cache the block to ramCache
+ * @param cacheKey block's cache key
+ * @param cachedItem block buffer
+ * @param inMemory if block is in-memory
+ * @param wait if true, blocking wait when queue is full
+ */
+ public void cacheBlockWithWait(BlockCacheKey cacheKey, byte[] cachedItem,
+ boolean inMemory, boolean wait) {
+ if (!cacheEnabled)
+ return;
+
+ if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey))
+ return;
+
+ /*
+ * Stuff the entry into the RAM cache so it can get drained to the
+ * persistent store
+ */
+ RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem,
+ accessCount.incrementAndGet(), inMemory);
+ ramCache.put(cacheKey, re);
+ int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
+ BlockingQueue bq = writerQueues.get(queueNum);
+ boolean successfulAddition = bq.offer(re);
+ if (!successfulAddition && wait) {
+ synchronized (cacheWaitSignals[queueNum]) {
+ try {
+ cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ successfulAddition = bq.offer(re);
+ }
+ if (!successfulAddition) {
+ ramCache.remove(cacheKey);
+ failedBlockAdditions.incrementAndGet();
+ } else {
+ this.blockNumber.incrementAndGet();
+ this.heapSize.addAndGet(cachedItem.length);
+ blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
+ }
+ }
+
+
+ /**
+ * Get the buffer of the block with the specified key.
+ * @param key block's cache key
+ * @param caching true if the caller caches blocks on cache misses
+ */
+ public byte[] getBlock(BlockCacheKey key, boolean caching) {
+ return getBlock(key, caching, false);
+ }
+
+ /**
+ * Get the buffer of the block with the specified key.
+ * @param key block's cache key
+ * @param caching true if the caller caches blocks on cache misses
+ * @param repeat Whether this is a repeat lookup for the same block
+ * @return buffer of specified cache key, or null if not in cache
+ */
+ public byte[] getBlock(BlockCacheKey key, boolean caching, boolean repeat) {
+ if (!cacheEnabled)
+ return null;
+ RAMQueueEntry re = ramCache.get(key);
+ if (re != null) {
+ cacheStats.hit(caching);
+ re.access(accessCount.incrementAndGet());
+ return re.getData();
+ }
+ BucketEntry bucketEntry = backingMap.get(key);
+ if(bucketEntry!=null) {
+ long start = System.nanoTime();
+ IdLock.Entry lockEntry = null;
+ try {
+ lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
+ if (bucketEntry.equals(backingMap.get(key))) {
+ int len = bucketEntry.getLength();
+ byte[] bytes = new byte[len];
+ ioEngine.read(bytes, bucketEntry.offset());
+ long timeTaken = System.nanoTime() - start;
+ cacheStats.hit(caching);
+ cacheStats.ioHit(timeTaken);
+ bucketEntry.access(accessCount.incrementAndGet());
+ if (this.ioErrorStartTime > 0) {
+ ioErrorStartTime = -1;
+ }
+ return bytes;
+ }
+ } catch (IOException ioex) {
+ LOG.error("Failed reading block " + key + " from bucket cache", ioex);
+ checkIOErrorIsTolerated();
+ } finally {
+ if (lockEntry != null) {
+ offsetLock.releaseLockEntry(lockEntry);
+ }
+ }
+ }
+ if(!repeat)cacheStats.miss(caching);
+ return null;
+ }
+
+ public void clearCache() {
+ for (BlockCacheKey key: this.backingMap.keySet()) {
+ evictBlock(key);
+ }
+ }
+
+ public boolean evictBlock(BlockCacheKey cacheKey) {
+ if (!cacheEnabled) return false;
+ RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
+ if (removedBlock != null) {
+ this.blockNumber.decrementAndGet();
+ this.heapSize.addAndGet(-1 * removedBlock.getData().length);
+ }
+ BucketEntry bucketEntry = backingMap.get(cacheKey);
+ if (bucketEntry == null) { return false; }
+ IdLock.Entry lockEntry = null;
+ try {
+ lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
+ if (bucketEntry.equals(backingMap.remove(cacheKey))) {
+ bucketAllocator.freeBlock(bucketEntry.offset());
+ realCacheSize.addAndGet(-1 * bucketEntry.getLength());
+ blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
+ if (removedBlock == null) {
+ this.blockNumber.decrementAndGet();
+ }
+ } else {
+ return false;
+ }
+ } catch (IOException ie) {
+ LOG.warn("Failed evicting block " + cacheKey);
+ return false;
+ } finally {
+ if (lockEntry != null) {
+ offsetLock.releaseLockEntry(lockEntry);
+ }
+ }
+ cacheStats.evicted(bucketEntry.getPriority());
+ return true;
+ }
+
+ /*
+ * Statistics thread. Periodically prints the cache statistics to the log.
+ */
+ private static class StatisticsThread extends Thread {
+ BucketCache bucketCache;
+
+ public StatisticsThread(BucketCache bucketCache) {
+ super("BucketCache.StatisticsThread");
+ setDaemon(true);
+ this.bucketCache = bucketCache;
+ }
+ @Override
+ public void run() {
+ bucketCache.logStats();
+ }
+ }
+
+ public void logStats() {
+ if (!LOG.isDebugEnabled()) return;
+ // Log size
+ long totalSize = bucketAllocator.getTotalSize();
+ long usedSize = bucketAllocator.getUsedSize();
+ long freeSize = totalSize - usedSize;
+ long cacheSize = this.realCacheSize.get();
+ LOG.debug("BucketCache Stats: " +
+ "failedBlockAdditions=" + this.failedBlockAdditions.get() + ", " +
+ "total=" + StringUtils.byteDesc(totalSize) + ", " +
+ "free=" + StringUtils.byteDesc(freeSize) + ", " +
+ "usedSize=" + StringUtils.byteDesc(usedSize) +", " +
+ "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " +
+ "accesses=" + cacheStats.getRequestCount() + ", " +
+ "hits=" + cacheStats.getHitCount() + ", " +
+ "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
+ "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
+ "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," :
+ (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
+ "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
+ "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
+ "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," :
+ (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
+ "evictions=" + cacheStats.getEvictionCount() + ", " +
+ "evicted=" + cacheStats.getEvictedCount() + ", " +
+ "evictedPerRun=" + cacheStats.evictedPerEviction());
+ cacheStats.reset();
+ }
+
+ private long acceptableSize() {
+ return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR);
+ }
+
+ private long minSize() {
+ return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MIN_FACTOR);
+ }
+
+ private long singleSize() {
+ return (long) Math.floor(bucketAllocator.getTotalSize()
+ * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR);
+ }
+
+ private long multiSize() {
+ return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR
+ * DEFAULT_MIN_FACTOR);
+ }
+
+ private long memorySize() {
+ return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR
+ * DEFAULT_MIN_FACTOR);
+ }
+
+ /**
+ * Free the space if the used size reaches acceptableSize() or one size block
+ * couldn't be allocated. When freeing the space, we use the LRU algorithm and
+ * ensure there must be some blocks evicted
+ */
+ private void freeSpace() {
+ // Ensure only one freeSpace progress at a time
+ if (!freeSpaceLock.tryLock()) return;
+ try {
+ freeInProgress = true;
+ long bytesToFreeWithoutExtra = 0;
+ /*
+ * Calculate free byte for each bucketSizeinfo
+ */
+ StringBuffer msgBuffer = new StringBuffer();
+ BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
+ long[] bytesToFreeForBucket = new long[stats.length];
+ for (int i = 0; i < stats.length; i++) {
+ bytesToFreeForBucket[i] = 0;
+ long freeGoal = (long) Math.floor(stats[i].totalCount()
+ * (1 - DEFAULT_MIN_FACTOR));
+ freeGoal = Math.max(freeGoal, 1);
+ if (stats[i].freeCount() < freeGoal) {
+ bytesToFreeForBucket[i] = stats[i].itemSize()
+ * (freeGoal - stats[i].freeCount());
+ bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
+ msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
+ + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
+ }
+ }
+ msgBuffer.append("Free for total="
+ + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
+
+ if (bytesToFreeWithoutExtra <= 0) {
+ return;
+ }
+ long currentSize = bucketAllocator.getUsedSize();
+ long totalSize=bucketAllocator.getTotalSize();
+ LOG.debug("Bucket cache free space started; Attempting to " + msgBuffer.toString()
+ + " of current used=" + StringUtils.byteDesc(currentSize)
+ + ",actual cacheSize=" + StringUtils.byteDesc(realCacheSize.get())
+ + ",total=" + StringUtils.byteDesc(totalSize));
+
+ long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
+ * (1 + DEFAULT_EXTRA_FREE_FACTOR));
+
+ // Instantiate priority buckets
+ BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra,
+ blockSize, singleSize());
+ BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra,
+ blockSize, multiSize());
+ BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra,
+ blockSize, memorySize());
+
+ // Scan entire map putting bucket entry into appropriate bucket entry
+ // group
+ for (Map.Entry bucketEntryWithKey : backingMap.entrySet()) {
+ switch (bucketEntryWithKey.getValue().getPriority()) {
+ case SINGLE: {
+ bucketSingle.add(bucketEntryWithKey);
+ break;
+ }
+ case MULTI: {
+ bucketMulti.add(bucketEntryWithKey);
+ break;
+ }
+ case MEMORY: {
+ bucketMemory.add(bucketEntryWithKey);
+ break;
+ }
+ }
+ }
+
+ PriorityQueue bucketQueue = new PriorityQueue(3);
+
+ bucketQueue.add(bucketSingle);
+ bucketQueue.add(bucketMulti);
+ bucketQueue.add(bucketMemory);
+
+ int remainingBuckets = 3;
+ long bytesFreed = 0;
+
+ BucketEntryGroup bucketGroup;
+ while ((bucketGroup = bucketQueue.poll()) != null) {
+ long overflow = bucketGroup.overflow();
+ if (overflow > 0) {
+ long bucketBytesToFree = Math.min(overflow,
+ (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
+ bytesFreed += bucketGroup.free(bucketBytesToFree);
+ }
+ remainingBuckets--;
+ }
+
+ /**
+ * Check whether need extra free because some bucketSizeinfo still needs
+ * free space
+ */
+ stats = bucketAllocator.getIndexStatistics();
+ boolean needFreeForExtra = false;
+ for (int i = 0; i < stats.length; i++) {
+ long freeGoal = (long) Math.floor(stats[i].totalCount()
+ * (1 - DEFAULT_MIN_FACTOR));
+ freeGoal = Math.max(freeGoal, 1);
+ if (stats[i].freeCount() < freeGoal) {
+ needFreeForExtra = true;
+ break;
+ }
+ }
+
+ if (needFreeForExtra) {
+ bucketQueue.clear();
+ remainingBuckets = 2;
+
+ bucketQueue.add(bucketSingle);
+ bucketQueue.add(bucketMulti);
+
+ while ((bucketGroup = bucketQueue.poll()) != null) {
+ long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed)
+ / remainingBuckets;
+ bytesFreed += bucketGroup.free(bucketBytesToFree);
+ remainingBuckets--;
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ long single = bucketSingle.totalSize();
+ long multi = bucketMulti.totalSize();
+ long memory = bucketMemory.totalSize();
+ LOG.debug("Bucket cache free space completed; " + "freed="
+ + StringUtils.byteDesc(bytesFreed) + ", " + "total="
+ + StringUtils.byteDesc(totalSize) + ", " + "single="
+ + StringUtils.byteDesc(single) + ", " + "multi="
+ + StringUtils.byteDesc(multi) + ", " + "memory="
+ + StringUtils.byteDesc(memory));
+ }
+
+ } finally {
+ cacheStats.evict();
+ freeInProgress = false;
+ freeSpaceLock.unlock();
+ }
+ }
+
+ // This handles flushing the RAM cache to IOEngine.
+ private class WriterThread extends HasThread {
+ BlockingQueue inputQueue;
+ final int threadIdx;
+ boolean writerEnabled = true;
+
+ WriterThread(BlockingQueue queue, int threadNO) {
+ super();
+ this.inputQueue = queue;
+ this.threadIdx = threadNO;
+ setDaemon(true);
+ }
+
+ // Used for test
+ void disableWriter() {
+ this.writerEnabled = false;
+ }
+
+ public void run() {
+ List entries = new ArrayList();
+ try {
+ while (cacheEnabled && writerEnabled) {
+ try {
+ // Perform a block take first in order to avoid a drainTo()
+ // on an empty queue looping around and causing starvation.
+ entries.add(inputQueue.take());
+ inputQueue.drainTo(entries);
+ synchronized (cacheWaitSignals[threadIdx]) {
+ cacheWaitSignals[threadIdx].notifyAll();
+ }
+ } catch (InterruptedException ie) {
+ if (!cacheEnabled) break;
+ }
+ doDrain(entries);
+ }
+ } catch (Throwable t) {
+ LOG.warn("Failed doing drain", t);
+ }
+ LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
+ }
+
+ /**
+ * Flush the entries in ramCache to IOEngine and add bucket entry to
+ * backingMap
+ * @param entries
+ * @throws InterruptedException
+ */
+ private void doDrain(List entries)
+ throws InterruptedException {
+ BucketEntry[] bucketEntries = new BucketEntry[entries.size()];
+ RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()];
+ int done = 0;
+ while (entries.size() > 0 && cacheEnabled) {
+ // Keep going in case we throw...
+ RAMQueueEntry ramEntry = null;
+ try {
+ ramEntry = entries.remove(entries.size() - 1);
+ if (ramEntry == null) {
+ LOG.warn("Couldn't get the entry from RAM queue, who steals it?");
+ continue;
+ }
+ BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine,
+ bucketAllocator, realCacheSize);
+ ramEntries[done] = ramEntry;
+ bucketEntries[done++] = bucketEntry;
+ if (ioErrorStartTime > 0) {
+ ioErrorStartTime = -1;
+ }
+ } catch (BucketAllocatorException fle) {
+ LOG.warn("Failed allocating for block "
+ + (ramEntry == null ? "" : ramEntry.getKey()), fle);
+ } catch (CacheFullException cfe) {
+ if (!freeInProgress) {
+ freeSpace();
+ } else {
+ Thread.sleep(50);
+ }
+ } catch (IOException ioex) {
+ LOG.error("Failed writing to bucket cache", ioex);
+ checkIOErrorIsTolerated();
+ }
+ }
+
+ // Make sure that the data pages we have written are on the media before
+ // we update the map.
+ try {
+ ioEngine.sync();
+ } catch (IOException ioex) {
+ LOG.error("Faild syncing IO engine", ioex);
+ checkIOErrorIsTolerated();
+ // Since we failed sync, free the blocks in bucket allocator
+ for (int i = 0; i < done; ++i) {
+ if (bucketEntries[i] != null) {
+ bucketAllocator.freeBlock(bucketEntries[i].offset());
+ }
+ }
+ done = 0;
+ }
+
+ for (int i = 0; i < done; ++i) {
+ if (bucketEntries[i] != null) {
+ backingMap.put(ramEntries[i].getKey(), bucketEntries[i]);
+ }
+ RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey());
+ if (ramCacheEntry != null) {
+ heapSize.addAndGet(-1 * ramEntries[i].getData().length);
+ }
+ }
+
+ if (bucketAllocator.getUsedSize() > acceptableSize()) {
+ freeSpace();
+ }
+ }
+ }
+
+ /**
+ * Check whether we tolerate IO error this time. If the duration of IOEngine
+ * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
+ * cache
+ */
+ private void checkIOErrorIsTolerated() {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ if (this.ioErrorStartTime > 0) {
+ if (cacheEnabled
+ && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
+ LOG.error("IO errors duration time has exceeded "
+ + ioErrorsTolerationDuration
+ + "ms, disabing cache, please check your IOEngine");
+ disableCache();
+ }
+ } else {
+ this.ioErrorStartTime = now;
+ }
+ }
+
+ /**
+ * Used to shut down the cache -or- turn it off in the case of something
+ * broken.
+ */
+ private void disableCache() {
+ if (!cacheEnabled)
+ return;
+ cacheEnabled = false;
+ ioEngine.shutdown();
+ this.scheduleThreadPool.shutdown();
+ for (int i = 0; i < writerThreads.length; ++i)
+ writerThreads[i].interrupt();
+ this.ramCache.clear();
+ this.backingMap.clear();
+ }
+
+ private void join() throws InterruptedException {
+ for (int i = 0; i < writerThreads.length; ++i)
+ writerThreads[i].join();
+ }
+
+ public void shutdown() {
+ disableCache();
+ LOG.info("Shutting down bucket cache");
+ }
+
+ public CacheStats getStats() {
+ return cacheStats;
+ }
+
+ BucketAllocator getAllocator() {
+ return this.bucketAllocator;
+ }
+
+ public long heapSize() {
+ return this.heapSize.get();
+ }
+
+ /**
+ * Returns the total size of the block cache, in bytes.
+ * @return size of cache, in bytes
+ */
+ public long size() {
+ return this.realCacheSize.get();
+ }
+
+ public long getFreeSize() {
+ return this.bucketAllocator.getTotalSize() - this.bucketAllocator.getUsedSize();
+ }
+
+ public long getBlockCount() {
+ return this.blockNumber.get();
+ }
+
+ public long getEvictedCount() {
+ return cacheStats.getEvictedCount();
+ }
+
+ /**
+ * Evicts all blocks for a specific HFile. This is an expensive operation
+ * implemented as a linear-time search through all blocks in the cache.
+ * Ideally this should be a search in a log-access-time map.
+ *
+ *
+ * This is used for evict-on-close to remove all blocks of a specific HFile.
+ *
+ * @return the number of blocks evicted
+ */
+ public int evictBlocksByHfileName(String hfileName) {
+ // Copy the list to avoid ConcurrentModificationException
+ // as evictBlockKey removes the key from the index
+ Set keySet = blocksByHFile.values(hfileName);
+ if (keySet == null) {
+ return 0;
+ }
+ int numEvicted = 0;
+ List keysForHFile =
+ ImmutableList.copyOf(keySet);
+ for (BlockCacheKey key : keysForHFile) {
+ if (evictBlock(key)) {
+ ++numEvicted;
+ }
+ }
+ return numEvicted;
+ }
+
+
+ /**
+ * Item in cache. We expect this to be where most memory goes. Java uses 8
+ * bytes just for object headers; after this, we want to use as little as
+ * possible - so we only use 8 bytes, but in order to do so we end up messing
+ * around with all this Java casting stuff. Offset stored as 5 bytes that make
+ * up the long. Doubt we'll see devices this big for ages. Offsets are divided
+ * by 256. So 5 bytes gives us 256TB or so.
+ */
+ static class BucketEntry implements Serializable, Comparable {
+ private static final long serialVersionUID = -6741504807982257534L;
+ private int offsetBase;
+ private int length;
+ private byte offset1;
+ private volatile long accessTime;
+ private CachedBlock.BlockPriority priority;
+
+ BucketEntry(long offset, int length, long accessTime, boolean inMemory) {
+ setOffset(offset);
+ this.length = length;
+ this.accessTime = accessTime;
+ if (inMemory) {
+ this.priority = CachedBlock.BlockPriority.MEMORY;
+ } else {
+ this.priority = CachedBlock.BlockPriority.SINGLE;
+ }
+ }
+
+ long offset() { // Java has no unsigned numbers
+ long o = ((long) offsetBase) & 0xFFFFFFFF;
+ o += (((long) (offset1)) & 0xFF) << 32;
+ return o << 8;
+ }
+
+ private void setOffset(long value) {
+ Preconditions.checkArgument((value & 0xFF) == 0);
+ value >>= 8;
+ offsetBase = (int) value;
+ offset1 = (byte) (value >> 32);
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ /**
+ * Block has been accessed. Update its local access time.
+ */
+ public void access(long accessTime) {
+ this.accessTime = accessTime;
+ if (this.priority == CachedBlock.BlockPriority.SINGLE) {
+ this.priority = CachedBlock.BlockPriority.MULTI;
+ }
+ }
+
+ public CachedBlock.BlockPriority getPriority() {
+ return this.priority;
+ }
+
+ @Override
+ public int compareTo(BucketEntry that) {
+ if(this.accessTime == that.accessTime) return 0;
+ return this.accessTime < that.accessTime ? 1 : -1;
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ return this == that;
+ }
+ }
+
+ /**
+ * Used to group bucket entries into priority buckets. There will be a
+ * BucketEntryGroup for each priority (single, multi, memory). Once bucketed,
+ * the eviction algorithm takes the appropriate number of elements out of each
+ * according to configuration parameters and their relative sizes.
+ */
+ private class BucketEntryGroup implements Comparable {
+
+ private CachedEntryQueue queue;
+ private long totalSize = 0;
+ private long bucketSize;
+
+ public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
+ this.bucketSize = bucketSize;
+ queue = new CachedEntryQueue(bytesToFree, blockSize);
+ totalSize = 0;
+ }
+
+ public void add(Map.Entry block) {
+ totalSize += block.getValue().getLength();
+ queue.add(block);
+ }
+
+ /**
+ * Free specified number of bytes by freeing the least recently used
+ * buckets entries.
+ * @param toFree Number of bytes we want to free
+ * @return Number of bytes freed
+ */
+ public long free(long toFree) {
+ Map.Entry entry;
+ long freedBytes = 0;
+ while ((entry = queue.pollLast()) != null) {
+ evictBlock(entry.getKey());
+ freedBytes += entry.getValue().getLength();
+ if (freedBytes >= toFree) {
+ return freedBytes;
+ }
+ }
+ return freedBytes;
+ }
+
+ public long overflow() {
+ return totalSize - bucketSize;
+ }
+
+ public long totalSize() {
+ return totalSize;
+ }
+
+ @Override
+ public int compareTo(BucketEntryGroup that) {
+ if (this.overflow() == that.overflow())
+ return 0;
+ return this.overflow() > that.overflow() ? 1 : -1;
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ return this == that;
+ }
+
+ }
+
+ /**
+ * Only used in test
+ * @throws InterruptedException
+ */
+ void stopWriterThreads() throws InterruptedException {
+ for (WriterThread writerThread : writerThreads) {
+ writerThread.disableWriter();
+ writerThread.interrupt();
+ writerThread.join();
+ }
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java (working copy)
@@ -0,0 +1,54 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.io.IOException;
+
+
+/**
+ * Thrown by {@link BucketAllocator#allocateBlock(int)} when cache is full for
+ * the requested size
+ */
+public class CacheFullException extends IOException {
+ private static final long serialVersionUID = 3265127301824638920L;
+ private int requestedSize, bucketIndex;
+
+ CacheFullException(int requestedSize, int bucketIndex) {
+ super();
+ this.requestedSize = requestedSize;
+ this.bucketIndex = bucketIndex;
+ }
+
+ public int bucketIndex() {
+ return bucketIndex;
+ }
+
+ public int requestedSize() {
+ return requestedSize;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(1024);
+ sb.append("Allocator requested size ").append(requestedSize);
+ sb.append(" for bucket ").append(bucketIndex);
+ return sb.toString();
+ }
+}
+
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java (working copy)
@@ -0,0 +1,79 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Block Entry stored in the memory with key,data and so on
+ */
+class RAMQueueEntry {
+ private BlockCacheKey key;
+ private byte[] data;
+ private long accessTime;
+ private boolean inMemory;
+
+ public RAMQueueEntry(BlockCacheKey bck, byte[] data, long accessTime,
+ boolean inMemory) {
+ this.key = bck;
+ this.data = data;
+ this.accessTime = accessTime;
+ this.inMemory = inMemory;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public BlockCacheKey getKey() {
+ return key;
+ }
+
+ public void access(long accessTime) {
+ this.accessTime = accessTime;
+ }
+
+ public BucketCache.BucketEntry writeToCache(final IOEngine ioEngine,
+ final BucketAllocator bucketAllocator,
+ final AtomicLong realCacheSize) throws CacheFullException, IOException,
+ BucketAllocatorException {
+ int len = data.length;
+ if (len == 0) {
+ return null;
+ }
+ long offset = bucketAllocator.allocateBlock(len);
+ BucketCache.BucketEntry bucketEntry = new BucketCache.BucketEntry(offset, len, accessTime,
+ inMemory);
+ try {
+ ioEngine.write(data, offset);
+ } catch (IOException ioe) {
+ // free it in bucket allocator
+ bucketAllocator.freeBlock(offset);
+ throw ioe;
+ }
+
+ realCacheSize.addAndGet(len);
+ return bucketEntry;
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ConcurrentIndex.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ConcurrentIndex.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ConcurrentIndex.java (working copy)
@@ -0,0 +1,171 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Multiset;
+
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+/**
+ * A simple concurrent map of sets. This is similar in concept to
+ * {@link Multiset}, with the following exceptions:
+ *
+ *
The set is thread-safe and concurrent: no external locking or
+ * synchronization is required. This is important for the use case where
+ * this class is used to index cached blocks by filename for their
+ * efficient eviction from cache when the file is closed or compacted.
+ *
The expectation is that all entries may only be removed for a key
+ * once no more additions of values are being made under that key.
+ *
+ * @see BucketCache#evictBlocksByHfileName(String) for example use of this
+ * class.
+ * @param Key type
+ * @param Value type
+ */
+public class ConcurrentIndex {
+
+ /** Container for the sets, indexed by key */
+ private final ConcurrentMap> container;
+
+ /**
+ * A factory that constructs new instances of the sets if no set is
+ * associated with a given key.
+ */
+ private final Supplier> valueSetFactory;
+
+ /**
+ * Creates an instance with a specified factory object for sets to be
+ * associated with a given key.
+ * @param valueSetFactory The factory instance
+ */
+ public ConcurrentIndex(Supplier> valueSetFactory) {
+ this.valueSetFactory = valueSetFactory;
+ this.container = new ConcurrentHashMap>();
+ }
+
+ /**
+ * Creates an instance using the {@link DefaultValueSetFactory} for sets,
+ * which in turn creates instances of {@link ConcurrentSkipListSet}
+ * @param valueComparator A {@link Comparator} for value types
+ */
+ public ConcurrentIndex(Comparator valueComparator) {
+ this(new DefaultValueSetFactory(valueComparator));
+ }
+
+ /**
+ * Associate a new unique value with a specified key. Under the covers, the
+ * method employs optimistic concurrency: if no set is associated with a
+ * given key, we create a new set; if another thread comes in, creates,
+ * and associates a set with the same key in the mean-time, we simply add
+ * the value to the already created set.
+ * @param key The key
+ * @param value An additional unique value we want to associate with a key
+ */
+ public void put(K key, V value) {
+ Set set = container.get(key);
+ if (set != null) {
+ set.add(value);
+ } else {
+ set = valueSetFactory.get();
+ set.add(value);
+ Set existing = container.putIfAbsent(key, set);
+ if (existing != null) {
+ // If a set is already associated with a key, that means another
+ // writer has already come in and created the set for the given key.
+ // Pursuant to an optimistic concurrency policy, in this case we will
+ // simply add the value to the existing set associated with the key.
+ existing.add(value);
+ }
+ }
+ }
+
+ /**
+ * Get all values associated with a specified key or null if no values are
+ * associated. Note: if the caller wishes to add or removes values
+ * to under the specified as they're iterating through the returned value,
+ * they should make a defensive copy; otherwise, a
+ * {@link ConcurrentModificationException} may be thrown.
+ * @see BucketCache#evictBlocksByHfileName(String) for example usage
+ * @param key The key
+ * @return All values associated with the specified key or null if no values
+ * are associated with the key.
+ */
+ public Set values(K key) {
+ return container.get(key);
+ }
+
+ /**
+ * Removes the association between a specified key and value. If as a
+ * result of removing a value a set becomes empty, we remove the given
+ * set from the mapping as well.
+ * @param key The specified key
+ * @param value The value to disassociate with the key
+ */
+ public boolean remove(K key, V value) {
+ Set set = container.get(key);
+ boolean success = false;
+ if (set != null) {
+ success = set.remove(value);
+ if (set.isEmpty()) {
+ container.remove(key);
+ }
+ }
+ return success;
+ }
+
+ /**
+ * Default factory class for the sets associated with given keys. Creates
+ * a {@link ConcurrentSkipListSet} using the comparator passed into the
+ * constructor.
+ * @see ConcurrentSkipListSet
+ * @see Supplier
+ * @param The value type. Should match value type of the
+ * ConcurrentIndex instances of this object are passed to.
+ */
+ static class DefaultValueSetFactory implements Supplier> {
+ private final Comparator comparator;
+
+ /**
+ * Creates an instance that passes a specified comparator to the
+ * {@link ConcurrentSkipListSet}
+ * @param comparator The specified comparator
+ */
+ public DefaultValueSetFactory(Comparator comparator) {
+ this.comparator = comparator;
+ }
+
+ /**
+ * Creates a new {@link ConcurrentSkipListSet} instance using the
+ * comparator specified when the class instance was constructor.
+ * @return The instantiated {@link ConcurrentSkipListSet} object
+ */
+ @Override
+ public Set get() {
+ return new ConcurrentSkipListSet(comparator);
+ }
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java (working copy)
@@ -0,0 +1,83 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.ByteBufferArray;
+
+/**
+ * IO engine that stores data on the memory using an array of ByteBuffers
+ * {@link ByteBufferArray}
+ */
+public class ByteBufferIOEngine implements IOEngine {
+
+ private ByteBufferArray bufferArray;
+
+ /**
+ * Construct the ByteBufferIOEngine with the given capacity
+ * @param capacity
+ * @param direct true if allocate direct buffer
+ * @throws IOException
+ */
+ public ByteBufferIOEngine(long capacity, int bufferSize, boolean direct)
+ throws IOException {
+ bufferArray = new ByteBufferArray(capacity, bufferSize, direct);
+ }
+
+ /**
+ * Transfers data from the buffer array to the given byte buffer
+ * @param dst the given byte array into which bytes are to be written
+ * @param offset The offset in the ByteBufferArray of the first byte to be
+ * read
+ * @throws IOException
+ */
+ @Override
+ public void read(byte[] dst, long offset) throws IOException {
+ bufferArray.getMultiple(offset, dst.length, dst, 0);
+ }
+
+ /**
+ * Transfers data from the given byte buffer to the buffer array
+ * @param src the given byte array from which bytes are to be read
+ * @param offset The offset in the ByteBufferArray of the first byte to be
+ * written
+ * @throws IOException
+ */
+ @Override
+ public void write(byte[] src, long offset) throws IOException {
+ bufferArray.putMultiple(offset, src.length, src, 0);
+ }
+
+ /**
+ * No operation for the sync in the memory IO engine
+ */
+ @Override
+ public void sync() {
+
+ }
+
+ /**
+ * No operation for the shutdown in the memory IO engine
+ */
+ @Override
+ public void shutdown() {
+
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java (working copy)
@@ -0,0 +1,450 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.ConditionUtil;
+
+/**
+ * This class is used to allocate a block with specified size and free the block
+ * when evicting. It manages an array of buckets, each bucket is associated with
+ * a size and caches elements up to this size. For completely empty bucket, this
+ * size could be re-specified dynamically.
+ *
+ * This class is not thread safe.
+ */
+public final class BucketAllocator {
+ static final Log LOG = LogFactory.getLog(BucketAllocator.class);
+
+ final private static class Bucket {
+ private final int[] bucketSizes;
+ private final long bucketCapacity;
+ private long baseOffset;
+ private int itemAllocationSize, sizeIndex;
+ private int itemCount;
+ private int freeList[];
+ private int freeCount, usedCount;
+
+ public Bucket(int[] bucketSizes, long bucketCapacity, long offset) {
+ this.bucketSizes = bucketSizes;
+ this.bucketCapacity = bucketCapacity;
+ baseOffset = offset;
+ sizeIndex = -1;
+ }
+
+ void reconfigure(int sizeIndex) {
+ this.sizeIndex = Preconditions.checkPositionIndex(sizeIndex,
+ bucketSizes.length);
+ itemAllocationSize = bucketSizes[sizeIndex];
+ itemCount = (int) ((bucketCapacity) / (long) itemAllocationSize);
+ freeCount = itemCount;
+ usedCount = 0;
+ freeList = new int[itemCount];
+ for (int i = 0; i < freeCount; ++i)
+ freeList[i] = i;
+ }
+
+ public boolean isUninstantiated() {
+ return sizeIndex == -1;
+ }
+
+ public int sizeIndex() {
+ return sizeIndex;
+ }
+
+ public int itemAllocationSize() {
+ return itemAllocationSize;
+ }
+
+ public boolean hasFreeSpace() {
+ return freeCount > 0;
+ }
+
+ public boolean isCompletelyFree() {
+ return usedCount == 0;
+ }
+
+ public int freeCount() {
+ return freeCount;
+ }
+
+ public int usedCount() {
+ return usedCount;
+ }
+
+ public int freeBytes() {
+ return freeCount * itemAllocationSize;
+ }
+
+ public int usedBytes() {
+ return usedCount * itemAllocationSize;
+ }
+
+ public long baseOffset() {
+ return baseOffset;
+ }
+
+ /**
+ * Allocate a block in this bucket, return the offset representing the
+ * position in physical space
+ * @return the offset in the IOEngine
+ */
+ public long allocate() {
+ Preconditions.checkState(freeCount > 0, "No space to allocate!");
+ Preconditions.checkState(sizeIndex != -1);
+ ++usedCount;
+ return ConditionUtil
+ .checkPositiveOffset(baseOffset + (freeList[--freeCount] * itemAllocationSize));
+ }
+
+ private void free(long offset) {
+ Preconditions.checkState(usedCount > 0);
+ Preconditions.checkState(freeCount < itemCount,
+ "duplicate free, offset: " + offset);
+ offset = ConditionUtil.checkOffset(offset - baseOffset,
+ itemCount * itemAllocationSize);
+ Preconditions.checkState(offset % itemAllocationSize == 0);
+ int item = (int) (offset / (long) itemAllocationSize);
+ Preconditions.checkState(!freeListContains(item), "Item at " + offset +
+ " already on freelist!");
+
+ --usedCount;
+ freeList[freeCount++] = item;
+ }
+
+ private boolean freeListContains(int blockNo) {
+ for (int i = 0; i < freeCount; ++i) {
+ if (freeList[i] == blockNo) return true;
+ }
+ return false;
+ }
+ }
+
+ public final class BucketSizeInfo {
+ // Free bucket means it has space to allocate a block;
+ // Completely free bucket means it has no block.
+ private List bucketList, freeBuckets, completelyFreeBuckets;
+ private int sizeIndex;
+
+ BucketSizeInfo(int sizeIndex) {
+ bucketList = new ArrayList();
+ freeBuckets = new ArrayList();
+ completelyFreeBuckets = new ArrayList();
+ this.sizeIndex = sizeIndex;
+ }
+
+ public void instantiateBucket(Bucket b) {
+ Preconditions.checkArgument(b.isUninstantiated() || b.isCompletelyFree());
+ b.reconfigure(sizeIndex);
+ bucketList.add(b);
+ freeBuckets.add(b);
+ completelyFreeBuckets.add(b);
+ }
+
+ public int sizeIndex() {
+ return sizeIndex;
+ }
+
+ /**
+ * Find a bucket to allocate a block
+ * @return the offset in the IOEngine
+ */
+ public long allocateBlock() {
+ Bucket b = null;
+ if (freeBuckets.size() > 0) // Use up an existing one first...
+ b = freeBuckets.get(freeBuckets.size() - 1);
+ if (b == null) {
+ b = grabGlobalCompletelyFreeBucket();
+ if (b != null) instantiateBucket(b);
+ }
+ if (b == null) return -1;
+ long result = b.allocate();
+ blockAllocated(b);
+ return result;
+ }
+
+ void blockAllocated(Bucket b) {
+ if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
+ if (!b.hasFreeSpace()) freeBuckets.remove(b);
+ }
+
+ public Bucket findAndRemoveCompletelyFreeBucket() {
+ Bucket b = null;
+ Preconditions.checkState(bucketList.size() > 0);
+ if (bucketList.size() == 1) {
+ // So we never get complete starvation of a bucket for a size
+ return null;
+ }
+
+ if (completelyFreeBuckets.size() > 0) {
+ b = completelyFreeBuckets.get(0);
+ removeBucket(b);
+ }
+ return b;
+ }
+
+ private void removeBucket(Bucket b) {
+ Preconditions.checkArgument(b.isCompletelyFree());
+ bucketList.remove(b);
+ freeBuckets.remove(b);
+ completelyFreeBuckets.remove(b);
+ }
+
+ public void freeBlock(Bucket b, long offset) {
+ Preconditions.checkArgument(bucketList.contains(b));
+ // else we shouldn't have anything to free...
+ Preconditions.checkArgument(!completelyFreeBuckets.contains(b),
+ "nothing to free!");
+ b.free(offset);
+ if (!freeBuckets.contains(b)) freeBuckets.add(b);
+ if (b.isCompletelyFree()) completelyFreeBuckets.add(b);
+ }
+
+ public IndexStatistics statistics() {
+ long free = 0, used = 0;
+ for (Bucket b : bucketList) {
+ free += b.freeCount();
+ used += b.usedCount();
+ }
+ return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
+ }
+ }
+
+ private final int bucketSizes[];
+
+ /**
+ * Round up the given block size to bucket size, and get the corresponding
+ * BucketSizeInfo
+ * @param blockSize
+ * @return BucketSizeInfo
+ */
+ public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
+ for (int i = 0; i < bucketSizes.length; ++i)
+ if (blockSize <= bucketSizes[i])
+ return bucketSizeInfos[i];
+ return null;
+ }
+
+
+ static public final int FEWEST_ITEMS_IN_BUCKET = 4;
+ // The capacity size for each bucket
+
+ private final long bucketCapacity;
+
+ private final Bucket[] buckets;
+ private final BucketSizeInfo[] bucketSizeInfos;
+ private final long totalSize;
+
+ private long usedSize = 0;
+
+ BucketAllocator(int[] bucketSizes, long availableSpace) throws BucketAllocatorException {
+ this.bucketSizes = bucketSizes;
+ int bigItemSize = bucketSizes[bucketSizes.length - 1];
+ bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
+ buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
+ if (buckets.length < bucketSizes.length)
+ throw new BucketAllocatorException(
+ "Bucket allocator size too small - must have room for at least "
+ + bucketSizes.length + " buckets");
+ bucketSizeInfos = new BucketSizeInfo[bucketSizes.length];
+ for (int i = 0; i < bucketSizes.length; ++i) {
+ bucketSizeInfos[i] = new BucketSizeInfo(i);
+ }
+ for (int i = 0; i < buckets.length; ++i) {
+ buckets[i] = new Bucket(bucketSizes, bucketCapacity, bucketCapacity * i);
+ bucketSizeInfos[i < bucketSizes.length ? i : bucketSizes.length - 1]
+ .instantiateBucket(buckets[i]);
+ }
+ this.totalSize = ((long) buckets.length) * bucketCapacity;
+ }
+
+ public String getInfo() {
+ StringBuilder sb = new StringBuilder(1024);
+ for (int i = 0; i < buckets.length; ++i) {
+ Bucket b = buckets[i];
+ sb.append(" Bucket ").append(i).append(": ").append(b.itemAllocationSize());
+ sb.append(" freeCount=").append(b.freeCount()).append(" used=")
+ .append(b.usedCount());
+ sb.append('\n');
+ }
+ return sb.toString();
+ }
+
+ public long getUsedSize() {
+ return this.usedSize;
+ }
+
+ public long getFreeSize() {
+ return this.totalSize - getUsedSize();
+ }
+
+ public long getTotalSize() {
+ return this.totalSize;
+ }
+
+ /**
+ * Allocate a block with specified size. Return the offset
+ * @param blockSize size of block
+ * @throws BucketAllocatorException,CacheFullException
+ * @return the offset in the IOEngine
+ */
+ public synchronized long allocateBlock(int blockSize) throws CacheFullException,
+ BucketAllocatorException {
+ Preconditions.checkArgument(blockSize > 0);
+ BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
+ if (bsi == null) {
+ throw new BucketAllocatorException("Allocation too big size=" + blockSize);
+ }
+ long offset = bsi.allocateBlock();
+
+ // Ask caller to free up space and try again!
+ if (offset < 0)
+ throw new CacheFullException(blockSize, bsi.sizeIndex());
+ usedSize += bucketSizes[bsi.sizeIndex()];
+ return offset;
+ }
+
+ private Bucket grabGlobalCompletelyFreeBucket() {
+ for (BucketSizeInfo bsi : bucketSizeInfos) {
+ Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
+ if (b != null) return b;
+ }
+ return null;
+ }
+
+ /**
+ * Free a block with the offset
+ * @param offset block's offset
+ * @return size freed
+ */
+ public synchronized int freeBlock(long offset) {
+ int bucketNo = (int) (offset / bucketCapacity);
+ Preconditions.checkPositionIndex(bucketNo, buckets.length);
+ Bucket targetBucket = buckets[bucketNo];
+ bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
+ usedSize -= targetBucket.itemAllocationSize();
+ return targetBucket.itemAllocationSize();
+ }
+
+ public int sizeOfAllocation(long offset) {
+ int bucketNo = (int) (offset / bucketCapacity);
+ Preconditions.checkPositionIndex(bucketNo, buckets.length);
+ Bucket targetBucket = buckets[bucketNo];
+ return targetBucket.itemAllocationSize();
+ }
+
+ public static class IndexStatistics {
+ private long freeCount, usedCount, itemSize, totalCount;
+
+ public long freeCount() {
+ return freeCount;
+ }
+
+ public long usedCount() {
+ return usedCount;
+ }
+
+ public long totalCount() {
+ return totalCount;
+ }
+
+ public long freeBytes() {
+ return freeCount * itemSize;
+ }
+
+ public long usedBytes() {
+ return usedCount * itemSize;
+ }
+
+ public long totalBytes() {
+ return totalCount * itemSize;
+ }
+
+ public long itemSize() {
+ return itemSize;
+ }
+
+ public IndexStatistics(long free, long used, long itemSize) {
+ setTo(free, used, itemSize);
+ }
+
+ public IndexStatistics() {
+ setTo(-1, -1, 0);
+ }
+
+ public void setTo(long free, long used, long itemSize) {
+ this.itemSize = itemSize;
+ this.freeCount = free;
+ this.usedCount = used;
+ this.totalCount = free + used;
+ }
+ }
+
+ public void dumpToLog() {
+ logStatistics();
+ StringBuilder sb = new StringBuilder();
+ for (Bucket b : buckets) {
+ sb.append("Bucket:").append(b.baseOffset).append('\n');
+ sb.append(" Size index: " + b.sizeIndex() + "; Free:" + b.freeCount
+ + "; used:" + b.usedCount + "; freelist\n");
+ for (int i = 0; i < b.freeCount(); ++i)
+ sb.append(b.freeList[i]).append(',');
+ sb.append('\n');
+ }
+ LOG.info(sb);
+ }
+
+ public void logStatistics() {
+ IndexStatistics total = new IndexStatistics();
+ IndexStatistics[] stats = getIndexStatistics(total);
+ LOG.info("Bucket allocator statistics follow:\n");
+ LOG.info(" Free bytes=" + total.freeBytes() + "+; used bytes="
+ + total.usedBytes() + "; total bytes=" + total.totalBytes());
+ for (IndexStatistics s : stats) {
+ LOG.info(" Object size " + s.itemSize() + " used=" + s.usedCount()
+ + "; free=" + s.freeCount() + "; total=" + s.totalCount());
+ }
+ }
+
+ public IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
+ IndexStatistics[] stats = getIndexStatistics();
+ long totalfree = 0, totalused = 0;
+ for (IndexStatistics stat : stats) {
+ totalfree += stat.freeBytes();
+ totalused += stat.usedBytes();
+ }
+ grandTotal.setTo(totalfree, totalused, 1);
+ return stats;
+ }
+
+ public IndexStatistics[] getIndexStatistics() {
+ IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
+ for (int i = 0; i < stats.length; ++i)
+ stats[i] = bucketSizeInfos[i].statistics();
+ return stats;
+ }
+}
+
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java (working copy)
@@ -0,0 +1,61 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Class that implements cache metrics for bucket cache.
+ */
+public class BucketCacheStats extends CacheStats {
+ private static final int NS_PER_SECOND = 1000000;
+
+ private final AtomicLong ioHitCount = new AtomicLong(0);
+ private final AtomicLong ioHitTime = new AtomicLong(0);
+
+ private volatile long lastLogTime = EnvironmentEdgeManager.currentTimeMillis();
+
+ public void ioHit(long time) {
+ ioHitCount.incrementAndGet();
+ ioHitTime.addAndGet(time);
+ }
+
+ public long getIOHitsPerSecond() {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ long took = (now - lastLogTime) / 1000;
+ lastLogTime = now;
+ return took == 0 ? 0 : ioHitCount.get() / took;
+ }
+
+ public double getIOTimePerHit() {
+ long time = ioHitTime.get() / NS_PER_SECOND;
+ long count = ioHitCount.get();
+ return count == 0 ? 0.0 : ((double) time / (double) count);
+ }
+
+ public void reset() {
+ ioHitCount.set(0);
+ ioHitTime.set(0);
+ }
+}
+
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java (working copy)
@@ -0,0 +1,59 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+/**
+ * Interface for a secondary level block cache that deals with byte arrays
+ * identical to what is written to disk (i.e., usually encoded and compressed),
+ * as opposed to HFile objects.
+ */
+public interface L2Cache {
+ /**
+ * Retrieve a block from the L2Cache. The block is retrieved as a byte
+ * array, in the same exact format as it is stored on disk.
+ * @param hfileName Filename associated with the block
+ * @param dataBlockOffset Offset in the file
+ * @return
+ */
+ public byte[] getRawBlock(String hfileName, long dataBlockOffset);
+
+
+ /**
+ * Add a block to the L2Cache. The block must be represented by a
+ * byte array identical to what would be written to disk.
+ * @param hfileName Filename associated with the block
+ * @param dataBlockOffset Offset in the file
+ * @param rawBlock The exact byte representation of the block
+ */
+ public void cacheRawBlock(String hfileName, long dataBlockOffset,
+ byte[] rawBlock);
+
+ /**
+ * Evict all blocks matching a given filename. This operation should be
+ * efficient and can be called on each close of a store file.
+ * @param hfileName Filename whose blocks to evict
+ */
+ public int evictBlocksByHfileName(String hfileName);
+
+ /**
+ * Shutdown the cache
+ */
+ public void shutdown();
+}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java (revision 1530525)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java (working copy)
@@ -171,4 +171,7 @@
return onHeapCache.getBlockCount() + offHeapCache.getBlockCount();
}
+ @Override
+ public void clearCache() {
+ }
}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (revision 1530525)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (working copy)
@@ -105,9 +105,10 @@
closeIStream, cacheConf, hfs);
trailer.expectMajorVersion(2);
validateMinorVersion(path, trailer.getMinorVersion());
- HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis,
- fsdisNoFsChecksum,
- compressAlgo, fileSize, trailer.getMinorVersion(), hfs, path);
+ HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis, fsdisNoFsChecksum,
+ compressAlgo, fileSize, trailer.getMinorVersion(), hfs, path,
+ cacheConf.isL2CacheEnabled() ? cacheConf.getL2Cache() : null,
+ cacheConf.isL2CacheEnabled() ? name : null);
this.fsBlockReader = fsBlockReaderV2; // upcast
// Comparator class name is stored in the trailer in version 2.
@@ -219,6 +220,7 @@
BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset,
DataBlockEncoding.NONE, BlockType.META);
+ boolean cacheInL2 = cacheBlock && cacheConf.isL2CacheEnabled();
cacheBlock &= cacheConf.shouldCacheDataOnRead();
if (cacheConf.isBlockCacheEnabled()) {
HFileBlock cachedBlock =
@@ -233,7 +235,7 @@
}
HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
- blockSize, -1, true);
+ blockSize, -1, true, cacheInL2);
passSchemaMetricsTo(metaBlock);
final long delta = System.nanoTime() - startTimeNs;
@@ -292,6 +294,7 @@
boolean useLock = false;
IdLock.Entry lockEntry = null;
+ HFileBlock cachedBlock = null;
try {
while (true) {
@@ -304,7 +307,7 @@
if (cacheConf.isBlockCacheEnabled()) {
// Try and get the block from the block cache. If the useLock variable is true then this
// is the second time through the loop and it should not be counted as a block cache miss.
- HFileBlock cachedBlock = (HFileBlock)
+ cachedBlock = (HFileBlock)
cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, useLock);
if (cachedBlock != null) {
BlockCategory blockCategory =
@@ -337,10 +340,39 @@
continue;
}
- // Load block from filesystem.
+ // First, check if the block exists in L2 cache
+ cachedBlock = null;
+ try {
+ cachedBlock = getBlockFromL2Cache(name, dataBlockOffset,
+ expectedBlockType, isCompaction);
+ } catch (Throwable t) {
+ // If exception is encountered when attempting to read from the L2
+ // cache, we should go on to try to read from disk and log the
+ // exception.
+ LOG.warn("Error occured attempting to retrieve from the L2 cache! " +
+ "[ hfileName = " + name + ", offset = " + dataBlockOffset +
+ ", expectedBlockType =" + expectedBlockType + ", isCompaction = " +
+ isCompaction + " ]", t);
+ }
+ if (cachedBlock != null) {
+ if (cacheBlock && cacheConf.shouldCacheBlockOnRead(
+ cachedBlock.getBlockType().getCategory())) {
+ // If L1 BlockCache is configured to cache blocks on read, then
+ // cache the block in the L1 cache. Updates to the L1 cache need to
+ // happen under a lock, which is why this logic is located here.
+ // TODO (avf): implement "evict on promotion" to avoid double caching
+ cacheConf.getBlockCache().cacheBlock(cacheKey, cachedBlock,
+ cacheConf.isInMemory());
+ }
+ getSchemaMetrics().updateOnCacheHit(cachedBlock.getBlockType().getCategory(),
+ isCompaction);
+ // Return early if a block exists in the L2 cache
+ return cachedBlock;
+ }
+ // In case of an L2 cache miss, load block from filesystem.
long startTimeNs = System.nanoTime();
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset,
- onDiskBlockSize, -1, pread);
+ onDiskBlockSize, -1, pread, cacheBlock && !isCompaction);
hfileBlock = dataBlockEncoder.diskToCacheFormat(hfileBlock,
isCompaction);
validateBlockType(hfileBlock, expectedBlockType);
@@ -372,6 +404,42 @@
}
/**
+ * If the L2 cache is enabled, retrieve the on-disk representation of a
+ * block (i.e., compressed and encoded byte array) from the L2 cache,
+ * de-compress, decode, and then construct an in-memory representation of the
+ * block.
+ * @param hfileName Name of the HFile that contains the block (used as part
+ * of the cache key)
+ * @param offset Offset in the HFile containing the block (used as another
+ * part of the cache key)
+ * @param expectedBlockType Expected type of the block
+ * @param isCompaction Indicates if this is a compaction related read. This
+ * value is passed along to
+ * {@link HFileDataBlockEncoder#diskToCacheFormat(
+ * HFileBlock, boolean)}
+ *
+ * @return The constructed and initiated block or null if the L2 cache is
+ * disabled or if no block is associated with the given filename and
+ * offset in the L2 cache.
+ * @throws IOException If we are unable to decompress and decode the block.
+ */
+ public HFileBlock getBlockFromL2Cache(String hfileName, long offset,
+ BlockType expectedBlockType, boolean isCompaction) throws IOException {
+ if (cacheConf.isL2CacheEnabled()) {
+ byte[] bytes = cacheConf.getL2Cache().getRawBlock(hfileName, offset);
+ if (bytes != null) {
+ HFileBlock hfileBlock = HFileBlock.fromBytes(bytes, compressAlgo, includesMemstoreTS,
+ offset, super.getTrailer().getMinorVersion());
+ hfileBlock = dataBlockEncoder.diskToCacheFormat(hfileBlock, isCompaction);
+ validateBlockType(hfileBlock, expectedBlockType);
+ passSchemaMetricsTo(hfileBlock);
+ return hfileBlock;
+ }
+ }
+ return null;
+ }
+
+ /**
* Compares the actual type of a block retrieved from cache or disk with its
* expected type and throws an exception in case of a mismatch. Expected
* block type of {@link BlockType#DATA} is considered to match the actual
@@ -424,13 +492,26 @@
}
public void close(boolean evictOnClose) throws IOException {
- if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
+ close(evictOnClose, cacheConf.shouldL2EvictOnClose());
+ }
+
+ @Override
+ public void close(boolean evictL1OnClose, boolean evictL2OnClose)
+ throws IOException {
+ if (evictL1OnClose && cacheConf.isBlockCacheEnabled()) {
int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
if (LOG.isTraceEnabled()) {
LOG.trace("On close, file=" + name + " evicted=" + numEvicted
- + " block(s)");
+ + " block(s) from L1 cache");
}
}
+ if (cacheConf.isL2CacheEnabled() && evictL2OnClose) {
+ int numEvicted = cacheConf.getL2Cache().evictBlocksByHfileName(name);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("On close, file=" + name + " evicted=" + numEvicted
+ + " block(s) from L2 cache");
+ }
+ }
if (closeIStream) {
if (istream != istreamNoFsChecksum && istreamNoFsChecksum != null) {
istreamNoFsChecksum.close();
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (revision 1530525)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (working copy)
@@ -521,6 +521,9 @@
/** Close method with optional evictOnClose */
void close(boolean evictOnClose) throws IOException;
+ /** Close method with optional evictOnClose for L1 and L2 caches */
+ void close(boolean evictL1OnClose, boolean evictL2OnClose) throws IOException;
+
DataBlockEncoding getEncodingOnDisk();
}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java (revision 1530525)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java (working copy)
@@ -134,5 +134,8 @@
return 0;
}
+ @Override
+ public void clearCache() {
+ }
}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (revision 1530525)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (working copy)
@@ -137,9 +137,11 @@
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
+ boolean cacheL2IndexesOnWrite = cacheConf.shouldL2CacheDataOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
cacheIndexesOnWrite ? cacheConf.getBlockCache(): null,
- cacheIndexesOnWrite ? name : null);
+ cacheL2IndexesOnWrite ? cacheConf.getL2Cache() : null,
+ (cacheIndexesOnWrite || cacheL2IndexesOnWrite) ? name : null);
dataBlockIndexWriter.setMaxChunkSize(
HFileBlockIndex.getMaxChunkSize(conf));
inlineBlockWriters.add(dataBlockIndexWriter);
@@ -196,10 +198,14 @@
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
HFile.offerWriteLatency(System.nanoTime() - startTimeNs);
-
+ // If a write is succesfull, cached the written block in the L1 and L2
+ // caches
if (cacheConf.shouldCacheDataOnWrite()) {
doCacheOnWrite(lastDataBlockOffset);
}
+ if (cacheConf.isL2CacheEnabled() && cacheConf.shouldL2CacheDataOnWrite()) {
+ doCacheInL2Cache(lastDataBlockOffset);
+ }
}
/** Gives inline block writers an opportunity to contribute blocks. */
@@ -214,10 +220,13 @@
ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
fsBlockWriter.getUncompressedSizeWithoutHeader());
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
-
+ // If a write is successful, cache the block in the L1 and L2 caches
if (cacheThisBlock) {
doCacheOnWrite(offset);
}
+ if (cacheConf.isL2CacheEnabled() && cacheConf.shouldL2CacheDataOnWrite()) {
+ doCacheInL2Cache(offset);
+ }
}
}
}
@@ -240,6 +249,15 @@
}
/**
+ * Associates the compressed and encoded block at a specific offset in the current HFile in the
+ * L2 Cache.
+ * @param offset Offset at which the block has been written
+ */
+ private void doCacheInL2Cache(long offset) throws IOException {
+ cacheConf.getL2Cache().cacheRawBlock(name, offset, fsBlockWriter.getHeaderAndData());
+ }
+
+ /**
* Ready a new block for writing.
*
* @throws IOException
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (revision 1530525)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (working copy)
@@ -788,6 +788,7 @@
}
/** Clears the cache. Used in tests. */
+ @Override
public void clearCache() {
map.clear();
}