.../apache/hadoop/hbase/io/hfile/HFileContext.java | 19 +- .../hadoop/hbase/io/hfile/HFileContextBuilder.java | 9 +- .../hadoop/hbase/io/HalfStoreFileReader.java | 5 + .../apache/hadoop/hbase/io/hfile/BlockCache.java | 4 + .../hadoop/hbase/io/hfile/CombinedBlockCache.java | 14 + .../org/apache/hadoop/hbase/io/hfile/HFile.java | 10 +- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 15 + .../hadoop/hbase/io/hfile/HFileBlockIndex.java | 124 +++--- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 432 +++++++++++++-------- .../apache/hadoop/hbase/io/hfile/HFileScanner.java | 4 + .../hadoop/hbase/io/hfile/LruBlockCache.java | 12 +- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 50 ++- .../hadoop/hbase/regionserver/StoreFile.java | 12 +- .../hbase/regionserver/StoreFileScanner.java | 1 + .../hadoop/hbase/util/CompoundBloomFilter.java | 4 +- .../apache/hadoop/hbase/io/hfile/TestHFile.java | 2 +- .../hadoop/hbase/io/hfile/TestHFileBlockIndex.java | 11 + .../hbase/regionserver/TestHeapMemoryManager.java | 6 + 18 files changed, 491 insertions(+), 243 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java index 5f43444..8d757a0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java @@ -57,6 +57,7 @@ public class HFileContext implements HeapSize, Cloneable { /** Encryption algorithm and key used */ private Encryption.Context cryptoContext = Encryption.Context.NONE; private long fileCreateTime; + private String hfileName; //Empty constructor. Go with setters public HFileContext() { @@ -78,12 +79,13 @@ public class HFileContext implements HeapSize, Cloneable { this.encoding = context.encoding; this.cryptoContext = context.cryptoContext; this.fileCreateTime = context.fileCreateTime; + this.hfileName = context.hfileName; } public HFileContext(boolean useHBaseChecksum, boolean includesMvcc, boolean includesTags, Compression.Algorithm compressAlgo, boolean compressTags, ChecksumType checksumType, int bytesPerChecksum, int blockSize, DataBlockEncoding encoding, - Encryption.Context cryptoContext, long fileCreateTime) { + Encryption.Context cryptoContext, long fileCreateTime, String hfileName) { this.usesHBaseChecksum = useHBaseChecksum; this.includesMvcc = includesMvcc; this.includesTags = includesTags; @@ -97,6 +99,7 @@ public class HFileContext implements HeapSize, Cloneable { } this.cryptoContext = cryptoContext; this.fileCreateTime = fileCreateTime; + this.hfileName = hfileName; } /** @@ -188,6 +191,15 @@ public class HFileContext implements HeapSize, Cloneable { this.cryptoContext = cryptoContext; } + // TODO take the file name in constructors. + public String getHFileName(){ + return this.hfileName; + } + + public void setHFileName(String name) { + this.hfileName = name; + } + /** * HeapSize implementation * NOTE : The heapsize should be altered as and when new state variable are added @@ -199,6 +211,7 @@ public class HFileContext implements HeapSize, Cloneable { // Algorithm reference, encodingon, checksumtype, Encryption.Context reference 4 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 1 * ClassSize.STRING + // usesHBaseChecksum, includesMvcc, includesTags and compressTags 4 * Bytes.SIZEOF_BOOLEAN + Bytes.SIZEOF_LONG); @@ -228,6 +241,10 @@ public class HFileContext implements HeapSize, Cloneable { sb.append(" compressAlgo="); sb.append(compressAlgo); sb.append(" compressTags="); sb.append(compressTags); sb.append(" cryptoContext=[ "); sb.append(cryptoContext); sb.append(" ]"); + if (hfileName != null) { + sb.append(" name="); + sb.append(hfileName); + } sb.append(" ]"); return sb.toString(); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java index 0d1e6ef..669b562 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java @@ -54,6 +54,8 @@ public class HFileContextBuilder { private Encryption.Context cryptoContext = Encryption.Context.NONE; private long fileCreateTime = 0; + private String hfileName = null; + public HFileContextBuilder withHBaseCheckSum(boolean useHBaseCheckSum) { this.usesHBaseChecksum = useHBaseCheckSum; return this; @@ -109,9 +111,14 @@ public class HFileContextBuilder { return this; } + public HFileContextBuilder withPathName(String name) { + this.hfileName = name; + return this; + } + public HFileContext build() { return new HFileContext(usesHBaseChecksum, includesMvcc, includesTags, compression, compressTags, checksumType, bytesPerChecksum, blocksize, encoding, cryptoContext, - fileCreateTime); + fileCreateTime, hfileName); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index 43bbab5..0c55435 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -322,6 +322,11 @@ public class HalfStoreFileReader extends StoreFile.Reader { public Cell getNextIndexedKey() { return null; } + + @Override + public void close() { + this.delegate.close(); + } }; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 57c4be9..34155c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -116,4 +116,8 @@ public interface BlockCache extends Iterable { * @return The list of sub blockcaches that make up this one; returns null if no sub caches. */ BlockCache [] getBlockCaches(); + + // TODO : Check if HFileBlock is needed here + // Call when this block usage is over. + boolean returnBlock(BlockCacheKey cacheKey, HFileBlock block); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 7725cf9..61af413 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -23,6 +23,7 @@ import java.util.Iterator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; +import org.apache.hadoop.hbase.io.hfile.HFileBlock.CacheType; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; @@ -219,4 +220,17 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { public void setMaxSize(long size) { this.lruCache.setMaxSize(size); } + + @Override + public boolean returnBlock(BlockCacheKey cacheKey, HFileBlock block) { + assert block.getCacheType() != CacheType.NOT_CACHED; + if (block.getCacheType() == CacheType.L1_CACHED) { + // At the time when this block was served, it was in L1 cache. Even if it is transferred to L2 + // cache by this time, no need to contact L2 cache. + return this.lruCache.returnBlock(cacheKey, block); + } else if (block.getCacheType() == CacheType.L2_CACHED) { + return this.l2Cache.returnBlock(cacheKey, block); + } + return true; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 09233a2..3bc0a9d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -26,7 +26,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.SequenceInputStream; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -39,7 +38,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -51,6 +49,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; @@ -376,6 +375,11 @@ public class HFile { final boolean updateCacheMetrics, BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException; + /** + * Return the given block back to the cache, if it was obtained from cache. + * @param block Block to be returned. + */ + void returnBlock(HFileBlock block); } /** An interface used by clients to open and iterate an {@link HFile}. */ @@ -391,7 +395,7 @@ public class HFile { HFileScanner getScanner(boolean cacheBlocks, final boolean pread, final boolean isCompaction); - ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException; + HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException; Map loadFileInfo() throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index a64bb94..5a262a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -192,6 +192,8 @@ public class HFileBlock implements Cacheable { */ private int nextBlockOnDiskSizeWithHeader = -1; + private CacheType cacheType = CacheType.NOT_CACHED; + /** * Creates a new {@link HFile} block from the given fields. This constructor * is mostly used when the block data has already been read and uncompressed, @@ -1871,6 +1873,19 @@ public class HFileBlock implements Cacheable { return this.fileContext; } + // TODO need setter or only via constructor arg. To avoid lot of changes just add with setter. + public void setCacheType(CacheType cacheType) { + this.cacheType = cacheType; + } + + public CacheType getCacheType() { + return this.cacheType; + } + + public static enum CacheType { + L1_CACHED, L2_CACHED, NOT_CACHED; + } + /** * Convert the contents of the block header into a human readable string. * This is mostly helpful for debugging. This assumes that the block diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 5b54807..bb615bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hbase.Cell; @@ -40,6 +39,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader; @@ -233,76 +233,83 @@ public class HFileBlockIndex { int lookupLevel = 1; // How many levels deep we are in our lookup. int index = -1; - HFileBlock block; + HFileBlock block = null; + boolean dataBlock = false; while (true) { - - if (currentBlock != null && currentBlock.getOffset() == currentOffset) - { - // Avoid reading the same block again, even with caching turned off. - // This is crucial for compaction-type workload which might have - // caching turned off. This is like a one-block cache inside the - // scanner. - block = currentBlock; - } else { - // Call HFile's caching block reader API. We always cache index - // blocks, otherwise we might get terrible performance. - boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel); - BlockType expectedBlockType; - if (lookupLevel < searchTreeLevel - 1) { - expectedBlockType = BlockType.INTERMEDIATE_INDEX; - } else if (lookupLevel == searchTreeLevel - 1) { - expectedBlockType = BlockType.LEAF_INDEX; + try { + if (currentBlock != null && currentBlock.getOffset() == currentOffset) { + // Avoid reading the same block again, even with caching turned off. + // This is crucial for compaction-type workload which might have + // caching turned off. This is like a one-block cache inside the + // scanner. + block = currentBlock; } else { - // this also accounts for ENCODED_DATA - expectedBlockType = BlockType.DATA; + // Call HFile's caching block reader API. We always cache index + // blocks, otherwise we might get terrible performance. + boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel); + BlockType expectedBlockType; + if (lookupLevel < searchTreeLevel - 1) { + expectedBlockType = BlockType.INTERMEDIATE_INDEX; + } else if (lookupLevel == searchTreeLevel - 1) { + expectedBlockType = BlockType.LEAF_INDEX; + } else { + // this also accounts for ENCODED_DATA + expectedBlockType = BlockType.DATA; + } + block = cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, + pread, isCompaction, true, expectedBlockType, expectedDataBlockEncoding); } - block = cachingBlockReader.readBlock(currentOffset, - currentOnDiskSize, shouldCache, pread, isCompaction, true, - expectedBlockType, expectedDataBlockEncoding); - } - if (block == null) { - throw new IOException("Failed to read block at offset " + - currentOffset + ", onDiskSize=" + currentOnDiskSize); - } + if (block == null) { + throw new IOException("Failed to read block at offset " + currentOffset + + ", onDiskSize=" + currentOnDiskSize); + } - // Found a data block, break the loop and check our level in the tree. - if (block.getBlockType().isData()) { - break; - } + // Found a data block, break the loop and check our level in the tree. + if (block.getBlockType().isData()) { + dataBlock = true; + break; + } - // Not a data block. This must be a leaf-level or intermediate-level - // index block. We don't allow going deeper than searchTreeLevel. - if (++lookupLevel > searchTreeLevel) { - throw new IOException("Search Tree Level overflow: lookupLevel="+ - lookupLevel + ", searchTreeLevel=" + searchTreeLevel); - } + // Not a data block. This must be a leaf-level or intermediate-level + // index block. We don't allow going deeper than searchTreeLevel. + if (++lookupLevel > searchTreeLevel) { + throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel + + ", searchTreeLevel=" + searchTreeLevel); + } - // Locate the entry corresponding to the given key in the non-root - // (leaf or intermediate-level) index block. - ByteBuffer buffer = block.getBufferWithoutHeader(); - index = locateNonRootIndexEntry(buffer, key, comparator); - if (index == -1) { - // This has to be changed - // For now change this to key value - KeyValue kv = KeyValueUtil.ensureKeyValue(key); - throw new IOException("The key " - + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength()) - + " is before the" + " first key of the non-root index block " - + block); - } + // Locate the entry corresponding to the given key in the non-root + // (leaf or intermediate-level) index block. + ByteBuffer buffer = block.getBufferWithoutHeader(); + index = locateNonRootIndexEntry(buffer, key, comparator); + if (index == -1) { + // This has to be changed + // For now change this to key value + KeyValue kv = KeyValueUtil.ensureKeyValue(key); + throw new IOException("The key " + + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength()) + + " is before the" + " first key of the non-root index block " + block); + } - currentOffset = buffer.getLong(); - currentOnDiskSize = buffer.getInt(); + currentOffset = buffer.getLong(); + currentOnDiskSize = buffer.getInt(); - // Only update next indexed key if there is a next indexed key in the current level - byte[] tmpNextIndexedKey = getNonRootIndexedKey(buffer, index + 1); - if (tmpNextIndexedKey != null) { - nextIndexedKey = new KeyValue.KeyOnlyKeyValue(tmpNextIndexedKey); + // Only update next indexed key if there is a next indexed key in the + // current level + byte[] tmpNextIndexedKey = getNonRootIndexedKey(buffer, index + 1); + if (tmpNextIndexedKey != null) { + nextIndexedKey = new KeyValue.KeyOnlyKeyValue(tmpNextIndexedKey); + } + } finally { + if (!dataBlock) { + cachingBlockReader.returnBlock(block); + } } } if (lookupLevel != searchTreeLevel) { + assert dataBlock == true; + cachingBlockReader.returnBlock(block); throw new IOException("Reached a data block at level " + lookupLevel + " but the number of levels is " + searchTreeLevel); } @@ -347,6 +354,7 @@ public class HFileBlockIndex { int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset + SECONDARY_INDEX_ENTRY_OVERHEAD; targetMidKey = ByteBufferUtils.toBytes(b, keyOffset, keyLen); + cachingBlockReader.returnBlock(midLeafBlock); } else { // The middle of the root-level index. targetMidKey = blockKeys[rootCount / 2]; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 933ad22..52e1353 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.io.WritableUtils; import org.apache.htrace.Trace; +import org.apache.hadoop.hbase.io.hfile.HFileBlock.CacheType; import org.apache.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; @@ -253,6 +254,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false, null, null); + // No need to update cur block with fetcher. Ideally here the + // readBlock wont find the + // block in cache. We call this readBlock so that block data is + // read from FS and + // cached in BC. Even if we have to returnBlock to make sure it is + // being returned, + // just return it immediately. + returnBlock(block); prevBlock = block; offset += block.getOnDiskSizeWithHeader(); } @@ -332,6 +341,15 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return fileSize; } + @Override + public void returnBlock(HFileBlock block) { + BlockCache blockCache = this.cacheConf.getBlockCache(); + if (blockCache != null && block != null && block.getCacheType() != CacheType.NOT_CACHED) { + BlockCacheKey cacheKey = new BlockCacheKey(this.getFileContext().getHFileName(), + block.getOffset()); + blockCache.returnBlock(cacheKey, block); + } + } /** * @return the first key in the file. May be null if file has no entries. Note * that this is not the first row key, but rather the byte form of the @@ -443,7 +461,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { protected final HFile.Reader reader; private int currTagsLen; - protected HFileBlock block; + protected CurrentBlockHolder blockHolder; /** * The next indexed key is to keep track of the indexed key of the next data block. @@ -460,6 +478,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { this.cacheBlocks = cacheBlocks; this.pread = pread; this.isCompaction = isCompaction; + blockHolder = new CurrentBlockHolder(reader); } @Override @@ -522,8 +541,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { || currValueLen > blockBuffer.limit()) { throw new IllegalStateException("Invalid currKeyLen " + currKeyLen + " or currValueLen " + currValueLen + ". Block offset: " - + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " - + blockBuffer.position() + " (without header)."); + + blockHolder.getCurrentBlock().getOffset() + + ", block length: " + blockBuffer.limit() + ", position: " + blockBuffer.position() + + " (without header)."); } ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen); if (this.reader.getFileContext().isIncludesTags()) { @@ -531,8 +551,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { currTagsLen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff); if (currTagsLen < 0 || currTagsLen > blockBuffer.limit()) { throw new IllegalStateException("Invalid currTagsLen " + currTagsLen + ". Block offset: " - + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " - + blockBuffer.position() + " (without header)."); + + blockHolder.getCurrentBlock().getOffset() + ", block length: " + + blockBuffer.limit() + ", position: " + blockBuffer.position() + + " (without header)."); } ByteBufferUtils.skip(blockBuffer, currTagsLen); } @@ -569,7 +590,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { || vlen > blockBuffer.limit()) { throw new IllegalStateException("Invalid klen " + klen + " or vlen " + vlen + ". Block offset: " - + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + blockHolder.getCurrentBlock().getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + blockBuffer.position() + " (without header)."); } ByteBufferUtils.skip(blockBuffer, klen + vlen); @@ -578,7 +599,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff); if (tlen < 0 || tlen > blockBuffer.limit()) { throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: " - + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + blockHolder.getCurrentBlock().getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + blockBuffer.position() + " (without header)."); } ByteBufferUtils.skip(blockBuffer, tlen); @@ -604,8 +625,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { throw new IllegalStateException("blockSeek with seekBefore " + "at the first key of the block: key=" + CellUtil.getCellKeyAsString(key) - + ", blockOffset=" + block.getOffset() + ", onDiskSize=" - + block.getOnDiskSizeWithHeader()); + + ", blockOffset=" + blockHolder.getCurrentBlock().getOffset() + ", onDiskSize=" + + blockHolder.getCurrentBlock().getOnDiskSizeWithHeader()); } blockBuffer.position(blockBuffer.position() - lastKeyValueSize); readKeyValueLen(); @@ -678,17 +699,22 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // don't do anything. return compared; } else { - // The comparison with no_next_index_key has to be checked - if (this.nextIndexedKey != null && - (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY || reader - .getComparator().compareOnlyKeyPortion(key, nextIndexedKey) < 0)) { - // The reader shall continue to scan the current data block instead - // of querying the - // block index as long as it knows the target key is strictly - // smaller than - // the next indexed key or the current data block is the last data - // block. - return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false); + try { + // The comparison with no_next_index_key has to be checked + if (this.nextIndexedKey != null + && (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY || reader.getComparator() + .compareOnlyKeyPortion(key, nextIndexedKey) < 0)) { + // The reader shall continue to scan the current data block + // instead of querying the + // block index as long as it knows the target key is strictly smaller than + // the next indexed key or the current data block is the last data + // block. + return loadBlockAndSeekToKey(blockHolder.getCurrentBlock(), nextIndexedKey, false, + key, false); + } + } catch (Exception e) { + this.reader.returnBlock(this.blockHolder.getCurrentBlock()); + throw e; } } } @@ -712,15 +738,22 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * @throws IOException */ public int seekTo(Cell key, boolean rewind) throws IOException { - HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader(); - BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block, - cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding()); - if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) { - // This happens if the key e.g. falls before the beginning of the file. - return -1; + try { + HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader(); + BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, + blockHolder.getCurrentBlock(), cacheBlocks, pread, isCompaction, + getEffectiveDataBlockEncoding()); + if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) { + // This happens if the key e.g. falls before the beginning of the + // file. + return -1; + } + return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(), + blockWithScanInfo.getNextIndexedKey(), rewind, key, false); + } catch (Exception e) { + this.reader.returnBlock(this.blockHolder.getCurrentBlock()); + throw e; } - return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(), - blockWithScanInfo.getNextIndexedKey(), rewind, key, false); } @Override @@ -730,36 +763,43 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public boolean seekBefore(Cell key) throws IOException { - HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block, - cacheBlocks, pread, isCompaction, reader.getEffectiveEncodingInCache(isCompaction)); - if (seekToBlock == null) { - return false; - } - ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock); - - if (reader.getComparator() - .compareOnlyKeyPortion( - new KeyValue.KeyOnlyKeyValue(firstKey.array(), firstKey.arrayOffset(), - firstKey.limit()), key) >= 0) { - long previousBlockOffset = seekToBlock.getPrevBlockOffset(); - // The key we are interested in - if (previousBlockOffset == -1) { - // we have a 'problem', the key we want is the first of the file. + try { + HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, + blockHolder.getCurrentBlock(), cacheBlocks, pread, isCompaction, + reader.getEffectiveEncodingInCache(isCompaction)); + if (seekToBlock == null) { return false; } + ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock); + + if (reader.getComparator() + .compareOnlyKeyPortion( + new KeyValue.KeyOnlyKeyValue(firstKey.array(), firstKey.arrayOffset(), + firstKey.limit()), key) >= 0) { + long previousBlockOffset = seekToBlock.getPrevBlockOffset(); + // The key we are interested in + if (previousBlockOffset == -1) { + // we have a 'problem', the key we want is the first of the file. + return false; + } - // It is important that we compute and pass onDiskSize to the block - // reader so that it does not have to read the header separately to - // figure out the size. - seekToBlock = reader.readBlock(previousBlockOffset, - seekToBlock.getOffset() - previousBlockOffset, cacheBlocks, - pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); - // TODO shortcut: seek forward in this block to the last key of the - // block. + // It is important that we compute and pass onDiskSize to the block + // reader so that it does not have to read the header separately to + // figure out the size. + reader.returnBlock(seekToBlock); + seekToBlock = reader.readBlock(previousBlockOffset, seekToBlock.getOffset() + - previousBlockOffset, cacheBlocks, pread, isCompaction, true, BlockType.DATA, + getEffectiveDataBlockEncoding()); + // TODO shortcut: seek forward in this block to the last key of the + // block. + } + Cell firstKeyInCurrentBlock = new KeyValue.KeyOnlyKeyValue(Bytes.getBytes(firstKey)); + loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, true); + return true; + } catch (Exception e) { + this.reader.returnBlock(this.blockHolder.getCurrentBlock()); + throw e; } - Cell firstKeyInCurrentBlock = new KeyValue.KeyOnlyKeyValue(Bytes.getBytes(firstKey)); - loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, true); - return true; } /** @@ -771,17 +811,17 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { */ protected HFileBlock readNextDataBlock() throws IOException { long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset(); - if (block == null) + if (blockHolder.getCurrentBlock() == null) return null; - HFileBlock curBlock = block; + HFileBlock curBlock = blockHolder.getCurrentBlock(); do { if (curBlock.getOffset() >= lastDataBlockOffset) return null; if (curBlock.getOffset() < 0) { - throw new IOException("Invalid block file offset: " + block); + throw new IOException("Invalid block file offset: " + blockHolder.getCurrentBlock()); } // We are reading the next block without block type validation, because @@ -790,6 +830,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { + curBlock.getOnDiskSizeWithHeader(), curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread, isCompaction, true, null, getEffectiveDataBlockEncoding()); + if (curBlock != null && !curBlock.getBlockType().isData()) { + reader.returnBlock(curBlock); + } } while (!curBlock.getBlockType().isData()); return curBlock; @@ -836,7 +879,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } protected void setNonSeekedState() { - block = null; + blockHolder.reset(); blockBuffer = null; currKeyLen = 0; currValueLen = 0; @@ -855,41 +898,45 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public boolean next() throws IOException { assertSeeked(); - try { - blockBuffer.position(getNextCellStartPosition()); - } catch (IllegalArgumentException e) { - LOG.error("Current pos = " + blockBuffer.position() - + "; currKeyLen = " + currKeyLen + "; currValLen = " - + currValueLen + "; block limit = " + blockBuffer.limit() - + "; HFile name = " + reader.getName() - + "; currBlock currBlockOffset = " + block.getOffset()); - throw e; - } + try { + blockBuffer.position(getNextCellStartPosition()); + } catch (IllegalArgumentException e) { + LOG.error("Current pos = " + blockBuffer.position() + + "; currKeyLen = " + currKeyLen + "; currValLen = " + + currValueLen + "; block limit = " + blockBuffer.limit() + + "; HFile name = " + reader.getName() + + "; currBlock currBlockOffset = " + blockHolder.getCurrentBlock().getOffset()); + throw e; + } - if (blockBuffer.remaining() <= 0) { - long lastDataBlockOffset = - reader.getTrailer().getLastDataBlockOffset(); + if (blockBuffer.remaining() <= 0) { + long lastDataBlockOffset = + reader.getTrailer().getLastDataBlockOffset(); - if (block.getOffset() >= lastDataBlockOffset) { - setNonSeekedState(); - return false; - } + if (blockHolder.getCurrentBlock().getOffset() >= lastDataBlockOffset) { + setNonSeekedState(); + return false; + } - // read the next block - HFileBlock nextBlock = readNextDataBlock(); - if (nextBlock == null) { - setNonSeekedState(); - return false; - } + // read the next block + HFileBlock nextBlock = readNextDataBlock(); + if (nextBlock == null) { + setNonSeekedState(); + return false; + } - updateCurrBlock(nextBlock); - return true; + updateCurrentBlock(nextBlock); + return true; } - // We are still in the same block. - readKeyValueLen(); - return true; + // We are still in the same block. + readKeyValueLen(); + return true; + } catch (Exception e) { + this.reader.returnBlock(this.blockHolder.getCurrentBlock()); + throw e; + } } /** @@ -901,36 +948,51 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { */ @Override public boolean seekTo() throws IOException { - if (reader == null) { - return false; - } + try { + if (reader == null) { + return false; + } - if (reader.getTrailer().getEntryCount() == 0) { - // No data blocks. - return false; - } + if (reader.getTrailer().getEntryCount() == 0) { + // No data blocks. + return false; + } - long firstDataBlockOffset = - reader.getTrailer().getFirstDataBlockOffset(); - if (block != null && block.getOffset() == firstDataBlockOffset) { - blockBuffer.rewind(); - readKeyValueLen(); - return true; - } + long firstDataBlockOffset = + reader.getTrailer().getFirstDataBlockOffset(); + if (blockHolder.getCurrentBlock() != null + && blockHolder.getCurrentBlock().getOffset() == firstDataBlockOffset) { + return processFirstDataBlock(); + } - block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, + readAndUpdateNewBlock(firstDataBlockOffset); + return true; + } catch (Exception e) { + this.reader.returnBlock(this.blockHolder.getCurrentBlock()); + throw e; + } + } + + protected boolean processFirstDataBlock() throws IOException{ + blockBuffer.rewind(); + readKeyValueLen(); + return true; + } + + protected void readAndUpdateNewBlock(long firstDataBlockOffset) throws IOException, + CorruptHFileException { + HFileBlock newBlock = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); - if (block.getOffset() < 0) { - throw new IOException("Invalid block offset: " + block.getOffset()); + if (newBlock.getOffset() < 0) { + throw new IOException("Invalid block offset: " + newBlock.getOffset()); } - updateCurrBlock(block); - return true; + updateCurrentBlock(newBlock); } - + protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, boolean rewind, Cell key, boolean seekBefore) throws IOException { - if (block == null || block.getOffset() != seekToBlock.getOffset()) { - updateCurrBlock(seekToBlock); + if (blockHolder.getCurrentBlock() == null || blockHolder.getCurrentBlock().getOffset() != seekToBlock.getOffset()) { + updateCurrentBlock(seekToBlock); } else if (rewind) { blockBuffer.rewind(); } @@ -946,19 +1008,18 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * * @param newBlock the block to make current */ - protected void updateCurrBlock(HFileBlock newBlock) { - block = newBlock; - + protected void updateCurrentBlock(HFileBlock newBlock) throws IOException { + // Set the active block on the reader // sanity check - if (block.getBlockType() != BlockType.DATA) { - throw new IllegalStateException("Scanner works only on data " + - "blocks, got " + block.getBlockType() + "; " + - "fileName=" + reader.getName() + ", " + - "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " + - "isCompaction=" + isCompaction); + if (newBlock.getBlockType() != BlockType.DATA) { + throw new IllegalStateException("ScannerV2 works only on data " + "blocks, got " + + blockHolder.getCurrentBlock().getBlockType() + "; " + "fileName=" + reader.getName() + + ", " + "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " + "isCompaction=" + + isCompaction); } - blockBuffer = block.getBufferWithoutHeader(); + blockHolder.updateCurrentBlock(newBlock); + blockBuffer = newBlock.getBufferWithoutHeader(); readKeyValueLen(); blockFetches++; @@ -1011,6 +1072,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen)); } + + @Override + public void close() { + if (this.blockHolder.getCurrentBlock() != null) { + this.blockHolder.reset(); + } + } } public Path getPath() { @@ -1065,6 +1133,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock, boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException { + // Ideally if we can make the scanner to work at this level it is much easier because this method + // surely knows what was fetched from the cache and based on that we could keep returning the prev + // blocks + // Check cache for block. If found return. if (cacheConf.isBlockCacheEnabled()) { BlockCache cache = cacheConf.getBlockCache(); @@ -1072,7 +1144,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { updateCacheMetrics); if (cachedBlock != null) { if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { - cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader); + HFileBlock compressedBlock = cachedBlock; + cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader); + if (compressedBlock != cachedBlock) cache.returnBlock(cacheKey, compressedBlock); } validateBlockType(cachedBlock, expectedBlockType); @@ -1108,6 +1182,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { " because of a data block encoding mismatch" + "; expected: " + expectedDataBlockEncoding + ", actual: " + actualDataBlockEncoding); + // This is an error scenario. so here we need to decrement the + // count. + cache.returnBlock(cacheKey, cachedBlock); cache.evictBlock(cacheKey); } return null; @@ -1125,7 +1202,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * @throws IOException */ @Override - public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) + public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException { if (trailer.getMetaIndexCount() == 0) { return null; // there are no meta blocks @@ -1157,7 +1234,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { assert cachedBlock.isUnpacked() : "Packed block leak."; // Return a distinct 'shallow copy' of the block, // so pos does not get messed by the scanner - return cachedBlock.getBufferWithoutHeader(); + return cachedBlock; } // Cache Miss, please load. } @@ -1171,7 +1248,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1()); } - return metaBlock.getBufferWithoutHeader(); + return metaBlock; } } @@ -1347,6 +1424,42 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } /** + * Handles block reference under usage and ensures the prev Block under use is + * returned back in case of Bucket Cache + */ + + protected static class CurrentBlockHolder { + private HFileBlock curBlock; + private HFile.Reader reader; + + public CurrentBlockHolder(HFile.Reader reader) { + this.reader = reader; + } + + // Better name + public void updateCurrentBlock(HFileBlock block) { + if (block != null && this.curBlock != null) { + if (block.getOffset() != this.curBlock.getOffset()) { + returnBlockToCache(this.curBlock); + } + } + this.curBlock = block; + } + + public HFileBlock getCurrentBlock() { + return this.curBlock; + } + + public void reset() { + returnBlockToCache(curBlock); + this.curBlock = null; + } + + private void returnBlockToCache(HFileBlock block) { + this.reader.returnBlock(block); + } + } + /** * Scanner that operates on encoded data blocks. */ protected static class EncodedScanner extends HFileScannerImpl { @@ -1368,7 +1481,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public boolean isSeeked(){ - return this.block != null; + return blockHolder.getCurrentBlock() != null; + } + + public void setNonSeekedState() { + this.blockHolder.reset(); } /** @@ -1378,15 +1495,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * @param newBlock the block to make current * @throws CorruptHFileException */ - private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException { - block = newBlock; - + @Override + protected void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException { // sanity checks - if (block.getBlockType() != BlockType.ENCODED_DATA) { - throw new IllegalStateException( - "EncodedScanner works only on encoded data blocks"); + if (newBlock.getBlockType() != BlockType.ENCODED_DATA) { + throw new IllegalStateException("EncodedScanner works only on encoded data blocks"); } - short dataBlockEncoderId = block.getDataBlockEncodingId(); + short dataBlockEncoderId = newBlock.getDataBlockEncodingId(); if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) { String encoderCls = dataBlockEncoder.getClass().getName(); throw new CorruptHFileException("Encoder " + encoderCls @@ -1410,45 +1525,31 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { DataBlockEncoding.ID_SIZE).slice(); return encodedBlock; } - + @Override - public boolean seekTo() throws IOException { - if (reader == null) { - return false; - } - - if (reader.getTrailer().getEntryCount() == 0) { - // No data blocks. - return false; - } - - long firstDataBlockOffset = - reader.getTrailer().getFirstDataBlockOffset(); - if (block != null && block.getOffset() == firstDataBlockOffset) { - seeker.rewind(); - return true; - } - - block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, - isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); - if (block.getOffset() < 0) { - throw new IOException("Invalid block offset: " + block.getOffset()); - } - updateCurrentBlock(block); + protected boolean processFirstDataBlock() throws IOException { + seeker.rewind(); return true; } @Override public boolean next() throws IOException { - boolean isValid = seeker.next(); - if (!isValid) { - block = readNextDataBlock(); - isValid = block != null; - if (isValid) { - updateCurrentBlock(block); + try { + boolean isValid = seeker.next(); + if (!isValid) { + HFileBlock newBlock = readNextDataBlock(); + isValid = newBlock != null; + if (isValid) { + updateCurrentBlock(newBlock); + } else { + setNonSeekedState(); + } } + return isValid; + } catch (Exception e) { + this.reader.returnBlock(this.blockHolder.getCurrentBlock()); + throw e; } - return isValid; } @Override @@ -1469,7 +1570,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public Cell getKeyValue() { - if (block == null) { + if (blockHolder.getCurrentBlock() == null) { return null; } return seeker.getKeyValue(); @@ -1490,7 +1591,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } private void assertValidSeek() { - if (block == null) { + if (blockHolder.getCurrentBlock() == null) { throw new NotSeekedException(); } } @@ -1499,9 +1600,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock)); } + @Override protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, boolean rewind, Cell key, boolean seekBefore) throws IOException { - if (block == null || block.getOffset() != seekToBlock.getOffset()) { + if (blockHolder.getCurrentBlock() == null + || blockHolder.getCurrentBlock().getOffset() != seekToBlock.getOffset()) { updateCurrentBlock(seekToBlock); } else if (rewind) { seeker.rewind(); @@ -1582,6 +1685,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { HFileContextBuilder builder = new HFileContextBuilder() .withIncludesMvcc(this.includesMemstoreTS) .withHBaseCheckSum(true) + .withPathName(this.getName()) .withCompression(this.compressAlgo); // Check for any key material available diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java index deaa2c0..e3ef8e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java @@ -161,4 +161,8 @@ public interface HFileScanner { * @return the next key in the index (the key to seek to the next block) */ Cell getNextIndexedKey(); + /** + * Helps to return any block that was opened as part of this scanner + */ + void close(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index bf46bcf..3f84795 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -34,13 +34,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -import com.google.common.base.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.HFileBlock.CacheType; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -49,6 +49,7 @@ import org.apache.hadoop.util.StringUtils; import org.codehaus.jackson.annotate.JsonIgnoreProperties; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -1090,4 +1091,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { public BlockCache[] getBlockCaches() { return null; } + + @Override + public boolean returnBlock(BlockCacheKey cacheKey, HFileBlock block) { + assert block.getCacheType() == CacheType.L1_CACHED; + // Do nothing + return true; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 7dda0e6..db0fe64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.HFileBlock.CacheType; import org.apache.hadoop.hbase.util.ConcurrentIndex; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; @@ -423,6 +425,10 @@ public class BucketCache implements BlockCache, HeapSize { cacheStats.hit(caching); cacheStats.ioHit(timeTaken); } + if (cachedBlock instanceof HFileBlock) { + ((HFileBlock) cachedBlock).setCacheType(CacheType.L2_CACHED); + bucketEntry.refCount.incrementAndGet(); + } bucketEntry.access(accessCount.incrementAndGet()); if (this.ioErrorStartTime > 0) { ioErrorStartTime = -1; @@ -444,6 +450,10 @@ public class BucketCache implements BlockCache, HeapSize { @Override public boolean evictBlock(BlockCacheKey cacheKey) { + return evictBlock(cacheKey, false); + } + + public boolean evictBlock(BlockCacheKey cacheKey, boolean withInUseCheck) { if (!cacheEnabled) return false; RAMQueueEntry removedBlock = ramCache.remove(cacheKey); if (removedBlock != null) { @@ -462,14 +472,20 @@ public class BucketCache implements BlockCache, HeapSize { IdLock.Entry lockEntry = null; try { lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); - if (bucketEntry.equals(backingMap.remove(cacheKey))) { - bucketAllocator.freeBlock(bucketEntry.offset()); - realCacheSize.addAndGet(-1 * bucketEntry.getLength()); - blocksByHFile.remove(cacheKey.getHfileName(), cacheKey); - if (removedBlock == null) { - this.blockNumber.decrementAndGet(); + int refCount = bucketEntry.refCount.get(); + if (refCount == 0) { + if (bucketEntry.equals(backingMap.remove(cacheKey))) { + bucketAllocator.freeBlock(bucketEntry.offset()); + realCacheSize.addAndGet(-1 * bucketEntry.getLength()); + blocksByHFile.remove(cacheKey.getHfileName(), cacheKey); + if (removedBlock == null) { + this.blockNumber.decrementAndGet(); + } + } else { + return false; } } else { + LOG.info("This block is still referred by " + refCount + " readers. Can not be freed now"); return false; } } catch (IOException ie) { @@ -1063,6 +1079,8 @@ public class BucketCache implements BlockCache, HeapSize { byte deserialiserIndex; private volatile long accessTime; private BlockPriority priority; + private AtomicInteger refCount = new AtomicInteger(0); + /** * Time this block was cached. Presumes we are created just before we are added to the cache. */ @@ -1165,9 +1183,12 @@ public class BucketCache implements BlockCache, HeapSize { public long free(long toFree) { Map.Entry entry; long freedBytes = 0; + // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free + // What to do then? Caching attempt fail? Need some changes in cacheBlock API? while ((entry = queue.pollLast()) != null) { - evictBlock(entry.getKey()); - freedBytes += entry.getValue().getLength(); + if (evictBlock(entry.getKey(), true)) { + freedBytes += entry.getValue().getLength(); + } if (freedBytes >= toFree) { return freedBytes; } @@ -1371,4 +1392,17 @@ public class BucketCache implements BlockCache, HeapSize { public BlockCache[] getBlockCaches() { return null; } + + @Override + public boolean returnBlock(BlockCacheKey cacheKey, HFileBlock block) { + assert block.getCacheType() == CacheType.L2_CACHED; + // When the block was served from RAMQueueEntry, it wont set the CacheType as L2. It is still on + // its way to get into L2 cache. So still it is like NOT_FROM_CACHE. + BucketEntry bucketEntry = backingMap.get(cacheKey); + if (bucketEntry != null) { + bucketEntry.refCount.decrementAndGet(); + return true; + } + return false; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 345dd9b..acf7c10 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -1030,9 +1031,11 @@ public class StoreFile { private byte[] lastBloomKey; private long deleteFamilyCnt = -1; private boolean bulkLoadResult = false; + private CacheConfig cacheConf; public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException { + this.cacheConf = cacheConf; reader = HFile.createReader(fs, path, cacheConf, conf); bloomFilterType = BloomType.NONE; } @@ -1261,7 +1264,7 @@ public class StoreFile { // Empty file if (reader.getTrailer().getEntryCount() == 0) return false; - + HFileBlock bloomBlock = null; try { boolean shouldCheckBloom; ByteBuffer bloom; @@ -1269,8 +1272,8 @@ public class StoreFile { bloom = null; shouldCheckBloom = true; } else { - bloom = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY, - true); + bloomBlock = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY, true); + bloom = bloomBlock.getBufferWithoutHeader(); shouldCheckBloom = bloom != null; } @@ -1315,8 +1318,9 @@ public class StoreFile { } catch (IllegalArgumentException e) { LOG.error("Bad bloom filter data -- proceeding without", e); setGeneralBloomFilterFaulty(); + } finally { + reader.returnBlock(bloomBlock); } - return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index a8ee091..8ab3fa0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -233,6 +233,7 @@ public class StoreFileScanner implements KeyValueScanner { public void close() { // Nothing to close on HFileScanner? cur = null; + this.hfs.close(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java index beda805..c0af890 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java @@ -23,8 +23,8 @@ import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer; import org.apache.hadoop.hbase.io.hfile.HFile; @@ -111,6 +111,8 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase result = ByteBloomFilter.contains(key, keyOffset, keyLength, bloomBuf, bloomBlock.headerSize(), bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount); + // After the use return back the block if it was served from a cache. + reader.returnBlock(bloomBlock); } if (numQueriesPerChunk != null && block >= 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index 9e4b1c7..6edcc67 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -322,7 +322,7 @@ public class TestHFile extends HBaseTestCase { private void readNumMetablocks(Reader reader, int n) throws IOException { for (int i = 0; i < n; i++) { - ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false); + ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false).getBufferWithoutHeader(); ByteBuffer expected = ByteBuffer.wrap(("something to test" + i).getBytes()); assertEquals("failed to match metadata", diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index 0ee9d14..6476553 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -163,6 +163,11 @@ public class TestHFileBlockIndex { public BlockReaderWrapper(HFileBlock.FSReader realReader) { this.realReader = realReader; } + + @Override + public void returnBlock(HFileBlock block) { + + } @Override public HFileBlock readBlock(long offset, long onDiskSize, @@ -185,6 +190,12 @@ public class TestHFileBlockIndex { return prevBlock; } + + @Override + public CacheConfig getCacheConfig() { + // TODO Auto-generated method stub + return null; + } } private void readIndex(boolean useTags) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index 2965071..03e6df7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheStats; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.CachedBlock; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext; @@ -400,6 +401,11 @@ public class TestHeapMemoryManager { public BlockCache[] getBlockCaches() { return null; } + + @Override + public boolean returnBlock(BlockCacheKey cacheKey, HFileBlock block) { + return true; + } } private static class MemstoreFlusherStub implements FlushRequester {