.../org/apache/hadoop/hbase/client/HTable.java | 2 +- .../hadoop/hbase/SharedMemoryBackedCell.java | 30 + .../apache/hadoop/hbase/io/hfile/HFileContext.java | 26 +- .../hadoop/hbase/io/hfile/HFileContextBuilder.java | 9 +- .../apache/hadoop/hbase/io/hfile/BlockCache.java | 21 + .../hbase/io/hfile/CacheableDeserializer.java | 6 +- .../hadoop/hbase/io/hfile/CombinedBlockCache.java | 18 + .../hadoop/hbase/io/hfile/CompoundBloomFilter.java | 14 +- .../org/apache/hadoop/hbase/io/hfile/HFile.java | 10 +- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 34 +- .../hadoop/hbase/io/hfile/HFileBlockIndex.java | 152 ++- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 314 +++-- .../hadoop/hbase/io/hfile/LruBlockCache.java | 10 +- .../hadoop/hbase/io/hfile/MemcachedBlockCache.java | 9 +- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 73 +- .../hbase/io/hfile/bucket/ByteBufferIOEngine.java | 19 +- .../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 15 +- .../hadoop/hbase/io/hfile/bucket/IOEngine.java | 10 +- .../org/apache/hadoop/hbase/ipc/RpcCallback.java | 2 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 106 +- .../hadoop/hbase/regionserver/KeyValueHeap.java | 8 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 124 +- .../hadoop/hbase/regionserver/StoreFile.java | 13 +- .../hadoop/hbase/regionserver/StoreScanner.java | 26 +- .../apache/hadoop/hbase/HBaseTestingUtility.java | 18 + .../hbase/client/TestBlockEvictionFromClient.java | 1439 ++++++++++++++++++++ .../hadoop/hbase/io/hfile/CacheTestUtils.java | 3 +- .../hadoop/hbase/io/hfile/TestCacheConfig.java | 3 +- .../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 30 + .../apache/hadoop/hbase/io/hfile/TestHFile.java | 2 +- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 4 +- .../hadoop/hbase/io/hfile/TestHFileBlockIndex.java | 4 + .../io/hfile/bucket/TestByteBufferIOEngine.java | 6 +- .../hbase/io/hfile/bucket/TestFileIOEngine.java | 6 +- .../hbase/regionserver/TestHeapMemoryManager.java | 12 +- 35 files changed, 2291 insertions(+), 287 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index f446bb7..a0485dc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -800,7 +800,7 @@ public class HTable implements HTableInterface { new RegionServerCallable(connection, getName(), row) { @Override public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); try { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/SharedMemoryBackedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/SharedMemoryBackedCell.java new file mode 100644 index 0000000..7699bce --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/SharedMemoryBackedCell.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Any Cell implementing this interface means its data is in shared + * common cache memory. When we read data from such cache, we will make a cell + * referring to the same memory and will avoid any copy. + */ +@InterfaceAudience.Private +public interface SharedMemoryBackedCell extends Cloneable { + public Cell clone() throws CloneNotSupportedException; +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java index 5edd47d..5b06626 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java @@ -56,6 +56,7 @@ public class HFileContext implements HeapSize, Cloneable { /** Encryption algorithm and key used */ private Encryption.Context cryptoContext = Encryption.Context.NONE; private long fileCreateTime; + private String hfileName; //Empty constructor. Go with setters public HFileContext() { @@ -77,12 +78,13 @@ public class HFileContext implements HeapSize, Cloneable { this.encoding = context.encoding; this.cryptoContext = context.cryptoContext; this.fileCreateTime = context.fileCreateTime; + this.hfileName = context.hfileName; } public HFileContext(boolean useHBaseChecksum, boolean includesMvcc, boolean includesTags, Compression.Algorithm compressAlgo, boolean compressTags, ChecksumType checksumType, int bytesPerChecksum, int blockSize, DataBlockEncoding encoding, - Encryption.Context cryptoContext, long fileCreateTime) { + Encryption.Context cryptoContext, long fileCreateTime, String hfileName) { this.usesHBaseChecksum = useHBaseChecksum; this.includesMvcc = includesMvcc; this.includesTags = includesTags; @@ -96,6 +98,7 @@ public class HFileContext implements HeapSize, Cloneable { } this.cryptoContext = cryptoContext; this.fileCreateTime = fileCreateTime; + this.hfileName = hfileName; } /** @@ -119,10 +122,6 @@ public class HFileContext implements HeapSize, Cloneable { return compressAlgo; } - public void setCompression(Compression.Algorithm compressAlgo) { - this.compressAlgo = compressAlgo; - } - public boolean isUseHBaseChecksum() { return usesHBaseChecksum; } @@ -175,10 +174,6 @@ public class HFileContext implements HeapSize, Cloneable { return encoding; } - public void setDataBlockEncoding(DataBlockEncoding encoding) { - this.encoding = encoding; - } - public Encryption.Context getEncryptionContext() { return cryptoContext; } @@ -187,6 +182,10 @@ public class HFileContext implements HeapSize, Cloneable { this.cryptoContext = cryptoContext; } + public String getHFileName() { + return this.hfileName; + } + /** * HeapSize implementation * NOTE : The heapsize should be altered as and when new state variable are added @@ -196,11 +195,14 @@ public class HFileContext implements HeapSize, Cloneable { public long heapSize() { long size = ClassSize.align(ClassSize.OBJECT + // Algorithm reference, encodingon, checksumtype, Encryption.Context reference - 4 * ClassSize.REFERENCE + + 5 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + // usesHBaseChecksum, includesMvcc, includesTags and compressTags 4 * Bytes.SIZEOF_BOOLEAN + Bytes.SIZEOF_LONG); + if (this.hfileName != null) { + size += ClassSize.STRING + this.hfileName.length(); + } return size; } @@ -227,6 +229,10 @@ public class HFileContext implements HeapSize, Cloneable { sb.append(" compressAlgo="); sb.append(compressAlgo); sb.append(" compressTags="); sb.append(compressTags); sb.append(" cryptoContext=[ "); sb.append(cryptoContext); sb.append(" ]"); + if (hfileName != null) { + sb.append(" name="); + sb.append(hfileName); + } sb.append(" ]"); return sb.toString(); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java index a903974..19a323a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java @@ -53,6 +53,8 @@ public class HFileContextBuilder { private Encryption.Context cryptoContext = Encryption.Context.NONE; private long fileCreateTime = 0; + private String hfileName = null; + public HFileContextBuilder withHBaseCheckSum(boolean useHBaseCheckSum) { this.usesHBaseChecksum = useHBaseCheckSum; return this; @@ -108,9 +110,14 @@ public class HFileContextBuilder { return this; } + public HFileContextBuilder withPathName(String name) { + this.hfileName = name; + return this; + } + public HFileContext build() { return new HFileContext(usesHBaseChecksum, includesMvcc, includesTags, compression, compressTags, checksumType, bytesPerChecksum, blocksize, encoding, cryptoContext, - fileCreateTime); + fileCreateTime, hfileName); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 57c4be9..3f7a747 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -116,4 +116,25 @@ public interface BlockCache extends Iterable { * @return The list of sub blockcaches that make up this one; returns null if no sub caches. */ BlockCache [] getBlockCaches(); + + /** + * Called when the block usage is over + * @param cacheKey + * @return true if returned block happened, false otherwise + */ + boolean returnBlock(BlockCacheKey cacheKey); + + /** + * An enum that indicates if the block retrieved from the cache is from shared + * memory cache or non shared memory. When we say shared memory it means we + * try to serve the reads from those cache without doing any copy and all the + * blocks coming out of this cache implementation cannot be evicted till the results are + * consumed. For the non-shared memory case, we don't have any such constraint + * because either there is no such shared memory or the blocks are already + * copied like in the case of + * {@link org.apache.hadoop.hbase.io.hfile.bucket.FileIOEngine} + */ + public static enum MemoryType { + SHARED_MEMORY, NON_SHARED_MEMORY; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java index f56a921..25d9a9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.BlockCache.MemoryType; /** * Interface for a deserializer. Throws an IOException if the serialized data is @@ -36,14 +37,15 @@ public interface CacheableDeserializer { T deserialize(ByteBuffer b) throws IOException; /** - * + * * @param b * @param reuse true if Cacheable object can use the given buffer as its * content + * @param memType the {@link MemoryType} of the buffer * @return T the deserialized object. * @throws IOException */ - T deserialize(ByteBuffer b, boolean reuse) throws IOException; + T deserialize(ByteBuffer b, boolean reuse, MemoryType memType) throws IOException; /** * Get the identifier of this deserialiser. Identifier is unique for each diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 7725cf9..19a16a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import com.google.common.annotations.VisibleForTesting; + /** * CombinedBlockCache is an abstraction layer that combines @@ -219,4 +221,20 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { public void setMaxSize(long size) { this.lruCache.setMaxSize(size); } + + @Override + public boolean returnBlock(BlockCacheKey cacheKey) { + // The caller would be calling this API only for the SHARED_MEMORY block + return this.l2Cache.returnBlock(cacheKey); + } + + @VisibleForTesting + public int getRefCount(BlockCacheKey cacheKey) { + // Check this + if (this.l2Cache != null) { + return ((BucketCache) this.l2Cache).getRefCount(cacheKey); + } else { + return 0; + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java index 11436ce..c8ea9ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java @@ -119,11 +119,15 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase "Failed to load Bloom block for key " + Bytes.toStringBinary(key, keyOffset, keyLength), ex); } - - ByteBuffer bloomBuf = bloomBlock.getBufferReadOnly(); - result = BloomFilterUtil.contains(key, keyOffset, keyLength, - bloomBuf, bloomBlock.headerSize(), - bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount); + try { + ByteBuffer bloomBuf = bloomBlock.getBufferReadOnly(); + result = BloomFilterUtil + .contains(key, keyOffset, keyLength, bloomBuf, bloomBlock.headerSize(), + bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount); + } finally { + // After the use return back the block if it was served from a cache. + reader.returnBlock(bloomBlock); + } } if (numQueriesPerChunk != null && block >= 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 6653c23..22603b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -26,7 +26,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.SequenceInputStream; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -40,7 +39,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -55,6 +53,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; @@ -374,6 +373,11 @@ public class HFile { final boolean updateCacheMetrics, BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException; + /** + * Return the given block back to the cache, if it was obtained from cache. + * @param block Block to be returned. + */ + void returnBlock(HFileBlock block); } /** An interface used by clients to open and iterate an {@link HFile}. */ @@ -389,7 +393,7 @@ public class HFile { HFileScanner getScanner(boolean cacheBlocks, final boolean pread, final boolean isCompaction); - ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException; + HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException; Map loadFileInfo() throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index f3bf0b7..12152a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.hfile.BlockCache.MemoryType; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; @@ -118,7 +119,8 @@ public class HFileBlock implements Cacheable { static final CacheableDeserializer blockDeserializer = new CacheableDeserializer() { - public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{ + public HFileBlock deserialize(ByteBuffer buf, boolean reuse, + MemoryType memType) throws IOException{ buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); ByteBuffer newByteBuffer; if (reuse) { @@ -130,7 +132,7 @@ public class HFileBlock implements Cacheable { buf.position(buf.limit()); buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE); boolean usesChecksum = buf.get() == (byte)1; - HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum); + HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum, memType); hFileBlock.offset = buf.getLong(); hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt(); if (hFileBlock.hasNextBlockHeader()) { @@ -146,7 +148,7 @@ public class HFileBlock implements Cacheable { @Override public HFileBlock deserialize(ByteBuffer b) throws IOException { - return deserialize(b, false); + return deserialize(b, false, MemoryType.NON_SHARED_MEMORY); } }; private static final int deserializerIdentifier; @@ -192,6 +194,8 @@ public class HFileBlock implements Cacheable { */ private int nextBlockOnDiskSizeWithHeader = -1; + private MemoryType memType = MemoryType.NON_SHARED_MEMORY; + /** * Creates a new {@link HFile} block from the given fields. This constructor * is mostly used when the block data has already been read and uncompressed, @@ -248,6 +252,17 @@ public class HFileBlock implements Cacheable { * indicate the format inside a HFileBlock. */ HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException { + this(b, usesHBaseChecksum, MemoryType.NON_SHARED_MEMORY); + } + /** + * Creates a block from an existing buffer starting with a header. Rewinds + * and takes ownership of the buffer. By definition of rewind, ignores the + * buffer position, but if you slice the buffer beforehand, it will rewind + * to that point. The reason this has a minorNumber and not a majorNumber is + * because majorNumbers indicate the format of a HFile whereas minorNumbers + * indicate the format inside a HFileBlock. + */ + HFileBlock(ByteBuffer b, boolean usesHBaseChecksum, MemoryType memType) throws IOException { b.rewind(); blockType = BlockType.read(b); onDiskSizeWithoutHeader = b.getInt(); @@ -266,6 +281,7 @@ public class HFileBlock implements Cacheable { HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; } this.fileContext = contextBuilder.build(); + this.memType = memType; buf = b; buf.rewind(); } @@ -416,8 +432,10 @@ public class HFileBlock implements Cacheable { sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset"); if (this.fileContext.isUseHBaseChecksum()) { - sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); - sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum"); + sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), + "checksumType"); + sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), + "bytesPerChecksum"); sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); } @@ -637,7 +655,7 @@ public class HFileBlock implements Cacheable { long size = ClassSize.align( ClassSize.OBJECT + // Block type, byte buffer and meta references - 3 * ClassSize.REFERENCE + + 4 * ClassSize.REFERENCE + // On-disk size, uncompressed size, and next block's on-disk size // bytePerChecksum and onDiskDataSize 4 * Bytes.SIZEOF_INT + @@ -1871,6 +1889,10 @@ public class HFileBlock implements Cacheable { return this.fileContext; } + public MemoryType getMemoryType() { + return this.memType; + } + /** * Convert the contents of the block header into a human readable string. * This is mostly helpful for debugging. This assumes that the block diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 86b5e15..8d4c689 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hbase.Cell; @@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader; @@ -294,78 +294,90 @@ public class HFileBlockIndex { int lookupLevel = 1; // How many levels deep we are in our lookup. int index = -1; - HFileBlock block; + HFileBlock block = null; + boolean dataBlock = false; KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue(); while (true) { - - if (currentBlock != null && currentBlock.getOffset() == currentOffset) - { - // Avoid reading the same block again, even with caching turned off. - // This is crucial for compaction-type workload which might have - // caching turned off. This is like a one-block cache inside the - // scanner. - block = currentBlock; - } else { - // Call HFile's caching block reader API. We always cache index - // blocks, otherwise we might get terrible performance. - boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel); - BlockType expectedBlockType; - if (lookupLevel < searchTreeLevel - 1) { - expectedBlockType = BlockType.INTERMEDIATE_INDEX; - } else if (lookupLevel == searchTreeLevel - 1) { - expectedBlockType = BlockType.LEAF_INDEX; + try { + if (currentBlock != null && currentBlock.getOffset() == currentOffset) { + // Avoid reading the same block again, even with caching turned off. + // This is crucial for compaction-type workload which might have + // caching turned off. This is like a one-block cache inside the + // scanner. + block = currentBlock; } else { - // this also accounts for ENCODED_DATA - expectedBlockType = BlockType.DATA; + // Call HFile's caching block reader API. We always cache index + // blocks, otherwise we might get terrible performance. + boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel); + BlockType expectedBlockType; + if (lookupLevel < searchTreeLevel - 1) { + expectedBlockType = BlockType.INTERMEDIATE_INDEX; + } else if (lookupLevel == searchTreeLevel - 1) { + expectedBlockType = BlockType.LEAF_INDEX; + } else { + // this also accounts for ENCODED_DATA + expectedBlockType = BlockType.DATA; + } + block = cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, + pread, isCompaction, true, expectedBlockType, expectedDataBlockEncoding); } - block = cachingBlockReader.readBlock(currentOffset, - currentOnDiskSize, shouldCache, pread, isCompaction, true, - expectedBlockType, expectedDataBlockEncoding); - } - if (block == null) { - throw new IOException("Failed to read block at offset " + - currentOffset + ", onDiskSize=" + currentOnDiskSize); - } + if (block == null) { + throw new IOException("Failed to read block at offset " + currentOffset + + ", onDiskSize=" + currentOnDiskSize); + } - // Found a data block, break the loop and check our level in the tree. - if (block.getBlockType().isData()) { - break; - } + // Found a data block, break the loop and check our level in the tree. + if (block.getBlockType().isData()) { + dataBlock = true; + break; + } - // Not a data block. This must be a leaf-level or intermediate-level - // index block. We don't allow going deeper than searchTreeLevel. - if (++lookupLevel > searchTreeLevel) { - throw new IOException("Search Tree Level overflow: lookupLevel="+ - lookupLevel + ", searchTreeLevel=" + searchTreeLevel); - } + // Not a data block. This must be a leaf-level or intermediate-level + // index block. We don't allow going deeper than searchTreeLevel. + if (++lookupLevel > searchTreeLevel) { + throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel + + ", searchTreeLevel=" + searchTreeLevel); + } - // Locate the entry corresponding to the given key in the non-root - // (leaf or intermediate-level) index block. - ByteBuffer buffer = block.getBufferWithoutHeader(); - index = locateNonRootIndexEntry(buffer, key, comparator); - if (index == -1) { - // This has to be changed - // For now change this to key value - KeyValue kv = KeyValueUtil.ensureKeyValue(key); - throw new IOException("The key " - + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength()) - + " is before the" + " first key of the non-root index block " - + block); - } + // Locate the entry corresponding to the given key in the non-root + // (leaf or intermediate-level) index block. + ByteBuffer buffer = block.getBufferWithoutHeader(); + index = locateNonRootIndexEntry(buffer, key, comparator); + if (index == -1) { + // This has to be changed + // For now change this to key value + KeyValue kv = KeyValueUtil.ensureKeyValue(key); + throw new IOException("The key " + + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength()) + + " is before the" + " first key of the non-root index block " + block); + } - currentOffset = buffer.getLong(); - currentOnDiskSize = buffer.getInt(); + currentOffset = buffer.getLong(); + currentOnDiskSize = buffer.getInt(); - // Only update next indexed key if there is a next indexed key in the current level - byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1); - if (nonRootIndexedKey != null) { - tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length); - nextIndexedKey = tmpNextIndexKV; + // Only update next indexed key if there is a next indexed key in the + // current level + byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1); + if (nonRootIndexedKey != null) { + tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length); + nextIndexedKey = tmpNextIndexKV; + } + } finally { + if (!dataBlock) { + // Return the block immediately if it is not the + // data block + cachingBlockReader.returnBlock(block); + } } } if (lookupLevel != searchTreeLevel) { + assert dataBlock == true; + // Though we have retrieved a data block we have found an issue + // in the retrieved data block. Hence returned the block so that + // the ref count can be decremented + cachingBlockReader.returnBlock(block); throw new IOException("Reached a data block at level " + lookupLevel + " but the number of levels is " + searchTreeLevel); } @@ -395,16 +407,18 @@ public class HFileBlockIndex { HFileBlock midLeafBlock = cachingBlockReader.readBlock( midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true, BlockType.LEAF_INDEX, null); - - ByteBuffer b = midLeafBlock.getBufferWithoutHeader(); - int numDataBlocks = b.getInt(); - int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1)); - int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - - keyRelOffset; - int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset - + SECONDARY_INDEX_ENTRY_OVERHEAD; - byte[] bytes = ByteBufferUtils.toBytes(b, keyOffset, keyLen); - targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length); + try { + ByteBuffer b = midLeafBlock.getBufferWithoutHeader(); + int numDataBlocks = b.getInt(); + int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1)); + int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - keyRelOffset; + int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset + + SECONDARY_INDEX_ENTRY_OVERHEAD; + byte[] bytes = ByteBufferUtils.toBytes(b, keyOffset, keyLen); + targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length); + } finally { + cachingBlockReader.returnBlock(midLeafBlock); + } } else { // The middle of the root-level index. targetMidKey = blockKeys[rootCount / 2]; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 13836ae..4323bc8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -35,6 +35,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.NoTagsKeyValue; +import org.apache.hadoop.hbase.SharedMemoryBackedCell; import org.apache.hadoop.hbase.SizeCachedKeyValue; import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue; import org.apache.hadoop.hbase.HConstants; @@ -47,6 +49,7 @@ import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.hfile.BlockCache.MemoryType; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; @@ -256,6 +259,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false, null, null); + // Need not update the current block. Ideally here the + // readBlock wont find the + // block in cache. We call this readBlock so that block data is + // read from FS and + // cached in BC. Even if we have to returnBlock to make sure it is + // being returned, + // just return it immediately. + returnBlock(block); prevBlock = block; offset += block.getOnDiskSizeWithHeader(); } @@ -337,6 +348,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return fileSize; } + @Override + public void returnBlock(HFileBlock block) { + BlockCache blockCache = this.cacheConf.getBlockCache(); + if (blockCache != null && block != null + && block.getMemoryType() == MemoryType.SHARED_MEMORY) { + BlockCacheKey cacheKey = new BlockCacheKey(this.getFileContext().getHFileName(), + block.getOffset()); + blockCache.returnBlock(cacheKey); + } + } /** * @return the first key in the file. May be null if file has no entries. Note * that this is not the first row key, but rather the byte form of the @@ -449,7 +470,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { protected final HFile.Reader reader; private int currTagsLen; private KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue(); - protected HFileBlock block; /** * The next indexed key is to keep track of the indexed key of the next data block. @@ -459,6 +479,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet. */ protected Cell nextIndexedKey; + // Current block being used + protected HFileBlock curBlock; + // Previous blocks that were used in the course of the read + protected ArrayList prevBlocks = null; public HFileScannerImpl(final HFile.Reader reader, final boolean cacheBlocks, final boolean pread, final boolean isCompaction) { @@ -466,8 +490,44 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { this.cacheBlocks = cacheBlocks; this.pread = pread; this.isCompaction = isCompaction; + prevBlocks = new ArrayList(); + } + + void updateCurrBlock(HFileBlock block) { + if (block != null && this.curBlock != null && + block.getOffset() == this.curBlock.getOffset()) { + return; + } + if (this.curBlock != null) { + prevBlocks.add(this.curBlock); + } + this.curBlock = block; + } + + void reset() { + if (this.curBlock != null) { + this.prevBlocks.add(this.curBlock); + } + this.curBlock = null; + } + + private void returnBlockToCache(HFileBlock block) { + if (LOG.isTraceEnabled()) { + LOG.trace("Returning the block : " + block); + } + this.reader.returnBlock(block); } + void returnBlocks(boolean returnAll) { + for (int i = 0; i < this.prevBlocks.size(); i++) { + returnBlockToCache(this.prevBlocks.get(i)); + } + this.prevBlocks.clear(); + if (returnAll && this.curBlock != null) { + returnBlockToCache(this.curBlock); + this.curBlock = null; + } + } @Override public boolean isSeeked(){ return blockBuffer != null; @@ -496,6 +556,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return kvBufSize; } + @Override + public void close() { + this.returnBlocks(true); + } + protected int getNextCellStartPosition() { int nextKvPos = blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen + currMemstoreTSLen; @@ -532,7 +597,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { private final void checkTagsLen() { if (checkLen(this.currTagsLen)) { throw new IllegalStateException("Invalid currTagsLen " + this.currTagsLen + - ". Block offset: " + block.getOffset() + ", block length: " + this.blockBuffer.limit() + + ". Block offset: " + curBlock.getOffset() + ", block length: " + + this.blockBuffer.limit() + ", position: " + this.blockBuffer.position() + " (without header)."); } } @@ -609,7 +675,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { || vlen > blockBuffer.limit()) { throw new IllegalStateException("Invalid klen " + klen + " or vlen " + vlen + ". Block offset: " - + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + curBlock.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + blockBuffer.position() + " (without header)."); } ByteBufferUtils.skip(blockBuffer, klen + vlen); @@ -618,7 +684,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff); if (tlen < 0 || tlen > blockBuffer.limit()) { throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: " - + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + curBlock.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + blockBuffer.position() + " (without header)."); } ByteBufferUtils.skip(blockBuffer, tlen); @@ -645,8 +711,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { throw new IllegalStateException("blockSeek with seekBefore " + "at the first key of the block: key=" + CellUtil.getCellKeyAsString(key) - + ", blockOffset=" + block.getOffset() + ", onDiskSize=" - + block.getOnDiskSizeWithHeader()); + + ", blockOffset=" + curBlock.getOffset() + ", onDiskSize=" + + curBlock.getOnDiskSizeWithHeader()); } blockBuffer.position(blockBuffer.position() - lastKeyValueSize); readKeyValueLen(); @@ -717,8 +783,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // smaller than // the next indexed key or the current data block is the last data // block. - return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false); + return loadBlockAndSeekToKey(this.curBlock, nextIndexedKey, false, key, + false); } + } } // Don't rewind on a reseek operation, because reseek implies that we are @@ -742,10 +810,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { */ public int seekTo(Cell key, boolean rewind) throws IOException { HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader(); - BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block, + BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, curBlock, cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding()); if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) { - // This happens if the key e.g. falls before the beginning of the file. + // This happens if the key e.g. falls before the beginning of the + // file. return -1; } return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(), @@ -754,7 +823,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public boolean seekBefore(Cell key) throws IOException { - HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block, + HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, curBlock, cacheBlocks, pread, isCompaction, reader.getEffectiveEncodingInCache(isCompaction)); if (seekToBlock == null) { return false; @@ -769,6 +838,12 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return false; } + // The current block 'seekToBlock' is less than the first key in the + // block. + // We will go ahead by reading the next block that satisfies the + // given key. + // Return the current block before reading the next one. + reader.returnBlock(seekToBlock); // It is important that we compute and pass onDiskSize to the block // reader so that it does not have to read the header separately to // figure out the size. @@ -791,17 +866,17 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { */ protected HFileBlock readNextDataBlock() throws IOException { long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset(); - if (block == null) + if (curBlock == null) return null; - HFileBlock curBlock = block; + HFileBlock curBlock = this.curBlock; do { if (curBlock.getOffset() >= lastDataBlockOffset) return null; if (curBlock.getOffset() < 0) { - throw new IOException("Invalid block file offset: " + block); + throw new IOException("Invalid block file offset: " + curBlock); } // We are reading the next block without block type validation, because @@ -810,6 +885,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { + curBlock.getOnDiskSizeWithHeader(), curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread, isCompaction, true, null, getEffectiveDataBlockEncoding()); + if (curBlock != null && !curBlock.getBlockType().isData()) { + // Whatever block we read we will be returning it unless + // it is a datablock. Just in case the blocks are non data blocks + reader.returnBlock(curBlock); + } } while (!curBlock.getBlockType().isData()); return curBlock; @@ -826,11 +906,23 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { KeyValue ret; if (currTagsLen > 0) { - ret = new SizeCachedKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position(), getCellBufSize()); + if (this.curBlock.getMemoryType() == BlockCache.MemoryType.SHARED_MEMORY) { + // If the block is of shared_memory type, create a cell that is of type SharedMemoryCell + ret = new SharedMemorySizeCachedKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position(), getCellBufSize()); + } else { + ret = new SizeCachedKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position(), getCellBufSize()); + } } else { - ret = new SizeCachedNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position(), getCellBufSize()); + if (this.curBlock.getMemoryType() == BlockCache.MemoryType.SHARED_MEMORY) { + // If the block is of shared_memory type, create a cell that is of type SharedMemoryCell + ret = new SharedMemorySizeCachedNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position(), getCellBufSize()); + } else { + ret = new SizeCachedNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position(), getCellBufSize()); + } } if (this.reader.shouldIncludeMemstoreTS()) { ret.setSequenceId(currMemstoreTS); @@ -846,6 +938,32 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { + KEY_VALUE_LEN_SIZE, currKeyLen); } + private static class SharedMemorySizeCachedKeyValue extends SizeCachedKeyValue implements + SharedMemoryBackedCell { + public SharedMemorySizeCachedKeyValue(byte[] bytes, int offset, int length) { + super(bytes, offset, length); + } + + @Override + public KeyValue clone() { + byte[] copy = Bytes.copy(this.bytes, this.offset, this.length); + return new SizeCachedKeyValue(copy, 0, copy.length); + } + } + + private static class SharedMemorySizeCachedNoTagsKeyValue extends SizeCachedNoTagsKeyValue + implements SharedMemoryBackedCell { + public SharedMemorySizeCachedNoTagsKeyValue(byte[] bytes, int offset, int length) { + super(bytes, offset, length); + } + + @Override + public KeyValue clone() { + byte[] copy = Bytes.copy(this.bytes, this.offset, this.length); + return new SizeCachedNoTagsKeyValue(copy, 0, copy.length); + } + } + @Override public ByteBuffer getValue() { assertSeeked(); @@ -856,7 +974,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } protected void setNonSeekedState() { - block = null; + reset(); blockBuffer = null; currKeyLen = 0; currValueLen = 0; @@ -876,7 +994,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { + "; currKeyLen = " + currKeyLen + "; currValLen = " + currValueLen + "; block limit = " + blockBuffer.limit() + "; HFile name = " + reader.getName() - + "; currBlock currBlockOffset = " + block.getOffset()); + + "; currBlock currBlockOffset = " + this.curBlock.getOffset()); throw e; } } @@ -889,7 +1007,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { private boolean positionForNextBlock() throws IOException { // Methods are small so they get inlined because they are 'hot'. long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset(); - if (block.getOffset() >= lastDataBlockOffset) { + if (this.curBlock.getOffset() >= lastDataBlockOffset) { setNonSeekedState(); return false; } @@ -904,7 +1022,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { setNonSeekedState(); return false; } - updateCurrBlock(nextBlock); + updateCurrentBlock(nextBlock); return true; } @@ -953,27 +1071,37 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return false; } - long firstDataBlockOffset = - reader.getTrailer().getFirstDataBlockOffset(); - if (block != null && block.getOffset() == firstDataBlockOffset) { - blockBuffer.rewind(); - readKeyValueLen(); - return true; + long firstDataBlockOffset = reader.getTrailer().getFirstDataBlockOffset(); + if (curBlock != null + && curBlock.getOffset() == firstDataBlockOffset) { + return processFirstDataBlock(); } - block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, + readAndUpdateNewBlock(firstDataBlockOffset); + return true; + } + + protected boolean processFirstDataBlock() throws IOException{ + blockBuffer.rewind(); + readKeyValueLen(); + return true; + } + + protected void readAndUpdateNewBlock(long firstDataBlockOffset) throws IOException, + CorruptHFileException { + HFileBlock newBlock = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); - if (block.getOffset() < 0) { - throw new IOException("Invalid block offset: " + block.getOffset()); + if (newBlock.getOffset() < 0) { + throw new IOException("Invalid block offset: " + newBlock.getOffset()); } - updateCurrBlock(block); - return true; + updateCurrentBlock(newBlock); } protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, boolean rewind, Cell key, boolean seekBefore) throws IOException { - if (block == null || block.getOffset() != seekToBlock.getOffset()) { - updateCurrBlock(seekToBlock); + if (this.curBlock == null + || this.curBlock.getOffset() != seekToBlock.getOffset()) { + updateCurrentBlock(seekToBlock); } else if (rewind) { blockBuffer.rewind(); } @@ -996,10 +1124,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { */ protected final void checkKeyValueLen() { if (checkLen(this.currKeyLen) || checkLen(this.currValueLen)) { - throw new IllegalStateException("Invalid currKeyLen " + this.currKeyLen + - " or currValueLen " + this.currValueLen + ". Block offset: " + block.getOffset() + - ", block length: " + this.blockBuffer.limit() + ", position: " + - this.blockBuffer.position() + " (without header)."); + throw new IllegalStateException("Invalid currKeyLen " + this.currKeyLen + + " or currValueLen " + this.currValueLen + ". Block offset: " + + this.curBlock.getOffset() + ", block length: " + + this.blockBuffer.limit() + ", position: " + this.blockBuffer.position() + + " (without header)."); } } @@ -1009,19 +1138,18 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * * @param newBlock the block to make current */ - protected void updateCurrBlock(HFileBlock newBlock) { - block = newBlock; - + protected void updateCurrentBlock(HFileBlock newBlock) throws IOException { + // Set the active block on the reader // sanity check - if (block.getBlockType() != BlockType.DATA) { - throw new IllegalStateException("Scanner works only on data " + - "blocks, got " + block.getBlockType() + "; " + - "fileName=" + reader.getName() + ", " + - "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " + - "isCompaction=" + isCompaction); + if (newBlock.getBlockType() != BlockType.DATA) { + throw new IllegalStateException("ScannerV2 works only on data " + "blocks, got " + + newBlock.getBlockType() + "; " + "fileName=" + reader.getName() + + ", " + "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " + "isCompaction=" + + isCompaction); } - blockBuffer = block.getBufferWithoutHeader(); + updateCurrBlock(newBlock); + blockBuffer = newBlock.getBufferWithoutHeader(); readKeyValueLen(); blockFetches++; @@ -1066,13 +1194,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } @Override - public void close() { - // HBASE-12295 will add code here. - } - - @Override public void shipped() throws IOException { - // HBASE-12295 will add code here. + this.returnBlocks(false); } } @@ -1135,8 +1258,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { updateCacheMetrics); if (cachedBlock != null) { if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { - cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader); - } + HFileBlock compressedBlock = cachedBlock; + cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader); + // In case of compressed block after unpacking we can return the compressed block + if (compressedBlock != cachedBlock + && (compressedBlock.getMemoryType() == MemoryType.SHARED_MEMORY)) { + cache.returnBlock(cacheKey); + } + } validateBlockType(cachedBlock, expectedBlockType); if (expectedDataBlockEncoding == null) { @@ -1171,6 +1300,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { " because of a data block encoding mismatch" + "; expected: " + expectedDataBlockEncoding + ", actual: " + actualDataBlockEncoding); + // This is an error scenario. so here we need to decrement the + // count. + if (cachedBlock.getMemoryType() == MemoryType.SHARED_MEMORY) { + cache.returnBlock(cacheKey); + } cache.evictBlock(cacheKey); } return null; @@ -1188,7 +1322,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * @throws IOException */ @Override - public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) + public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException { if (trailer.getMetaIndexCount() == 0) { return null; // there are no meta blocks @@ -1221,7 +1355,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { assert cachedBlock.isUnpacked() : "Packed block leak."; // Return a distinct 'shallow copy' of the block, // so pos does not get messed by the scanner - return cachedBlock.getBufferWithoutHeader(); + return cachedBlock; } // Cache Miss, please load. } @@ -1235,7 +1369,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1()); } - return metaBlock.getBufferWithoutHeader(); + return metaBlock; } } @@ -1432,7 +1566,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public boolean isSeeked(){ - return this.block != null; + return curBlock != null; + } + + public void setNonSeekedState() { + reset(); } /** @@ -1442,22 +1580,20 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * @param newBlock the block to make current * @throws CorruptHFileException */ - private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException { - block = newBlock; - + @Override + protected void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException { // sanity checks - if (block.getBlockType() != BlockType.ENCODED_DATA) { - throw new IllegalStateException( - "EncodedScanner works only on encoded data blocks"); + if (newBlock.getBlockType() != BlockType.ENCODED_DATA) { + throw new IllegalStateException("EncodedScanner works only on encoded data blocks"); } - short dataBlockEncoderId = block.getDataBlockEncodingId(); + short dataBlockEncoderId = newBlock.getDataBlockEncodingId(); if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) { String encoderCls = dataBlockEncoder.getClass().getName(); throw new CorruptHFileException("Encoder " + encoderCls + " doesn't support data block encoding " + DataBlockEncoding.getNameFromId(dataBlockEncoderId)); } - + updateCurrBlock(newBlock); seeker.setCurrentBuffer(getEncodedBuffer(newBlock)); blockFetches++; @@ -1476,29 +1612,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } @Override - public boolean seekTo() throws IOException { - if (reader == null) { - return false; - } - - if (reader.getTrailer().getEntryCount() == 0) { - // No data blocks. - return false; - } - - long firstDataBlockOffset = - reader.getTrailer().getFirstDataBlockOffset(); - if (block != null && block.getOffset() == firstDataBlockOffset) { - seeker.rewind(); - return true; - } - - block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, - isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); - if (block.getOffset() < 0) { - throw new IOException("Invalid block offset: " + block.getOffset()); - } - updateCurrentBlock(block); + protected boolean processFirstDataBlock() throws IOException { + seeker.rewind(); return true; } @@ -1506,10 +1621,12 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { public boolean next() throws IOException { boolean isValid = seeker.next(); if (!isValid) { - block = readNextDataBlock(); - isValid = block != null; + HFileBlock newBlock = readNextDataBlock(); + isValid = newBlock != null; if (isValid) { - updateCurrentBlock(block); + updateCurrentBlock(newBlock); + } else { + setNonSeekedState(); } } return isValid; @@ -1529,7 +1646,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public Cell getCell() { - if (block == null) { + if (this.curBlock == null) { return null; } return seeker.getKeyValue(); @@ -1548,7 +1665,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } private void assertValidSeek() { - if (block == null) { + if (this.curBlock == null) { throw new NotSeekedException(); } } @@ -1557,9 +1674,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return dataBlockEncoder.getFirstKeyCellInBlock(getEncodedBuffer(curBlock)); } + @Override protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, boolean rewind, Cell key, boolean seekBefore) throws IOException { - if (block == null || block.getOffset() != seekToBlock.getOffset()) { + if (this.curBlock == null + || this.curBlock.getOffset() != seekToBlock.getOffset()) { updateCurrentBlock(seekToBlock); } else if (rewind) { seeker.rewind(); @@ -1640,6 +1759,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { HFileContextBuilder builder = new HFileContextBuilder() .withIncludesMvcc(shouldIncludeMemstoreTS()) .withHBaseCheckSum(true) + .withPathName(this.getName()) .withCompression(this.compressAlgo); // Check for any key material available diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 806ddc9..da76e19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -34,11 +34,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -import com.google.common.base.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; @@ -49,6 +48,7 @@ import org.apache.hadoop.util.StringUtils; import org.codehaus.jackson.annotate.JsonIgnoreProperties; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -1090,4 +1090,10 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { public BlockCache[] getBlockCaches() { return null; } + + @Override + public boolean returnBlock(BlockCacheKey cacheKey) { + // There is no SHARED_MEMORY type here. Just return true. + return true; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index 57e7f28..a93d44b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -256,7 +256,8 @@ public class MemcachedBlockCache implements BlockCache { public HFileBlock decode(CachedData d) { try { ByteBuffer buf = ByteBuffer.wrap(d.getData()); - return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true); + return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true, + MemoryType.NON_SHARED_MEMORY); } catch (IOException e) { LOG.warn("Error deserializing data from memcached",e); } @@ -269,4 +270,10 @@ public class MemcachedBlockCache implements BlockCache { } } + @Override + public boolean returnBlock(BlockCacheKey cacheKey) { + // Not handling reference counting + return true; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index dfada87..8c1842f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -66,6 +67,7 @@ import org.apache.hadoop.hbase.util.ConcurrentIndex; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -417,19 +419,23 @@ public class BucketCache implements BlockCache, HeapSize { // existence here. if (bucketEntry.equals(backingMap.get(key))) { int len = bucketEntry.getLength(); - ByteBuffer bb = ByteBuffer.allocate(len); - int lenRead = ioEngine.read(bb, bucketEntry.offset()); + Pair pair = ioEngine.read(bucketEntry.offset(), len); + ByteBuffer bb = pair.getFirst(); + int lenRead = bb.limit(); if (lenRead != len) { throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected"); } CacheableDeserializer deserializer = bucketEntry.deserializerReference(this.deserialiserMap); - Cacheable cachedBlock = deserializer.deserialize(bb, true); + Cacheable cachedBlock = deserializer.deserialize(bb, true, pair.getSecond()); long timeTaken = System.nanoTime() - start; if (updateCacheMetrics) { cacheStats.hit(caching); cacheStats.ioHit(timeTaken); } + if (pair.getSecond() == MemoryType.SHARED_MEMORY) { + bucketEntry.refCount.incrementAndGet(); + } bucketEntry.access(accessCount.incrementAndGet()); if (this.ioErrorStartTime > 0) { ioErrorStartTime = -1; @@ -463,6 +469,10 @@ public class BucketCache implements BlockCache, HeapSize { @Override public boolean evictBlock(BlockCacheKey cacheKey) { + return evictBlock(cacheKey, true); + } + + public boolean evictBlock(BlockCacheKey cacheKey, boolean forceful) { if (!cacheEnabled) { return false; } @@ -483,10 +493,28 @@ public class BucketCache implements BlockCache, HeapSize { IdLock.Entry lockEntry = null; try { lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); - if (backingMap.remove(cacheKey, bucketEntry)) { - blockEvicted(cacheKey, bucketEntry, removedBlock == null); + int refCount = bucketEntry.refCount.get(); + if(refCount == 0) { + if (backingMap.remove(cacheKey, bucketEntry)) { + blockEvicted(cacheKey, bucketEntry, removedBlock == null); + } else { + return false; + } } else { - return false; + if(!forceful) { + if (LOG.isDebugEnabled()) { + LOG.debug("This block " + cacheKey + " is still referred by " + refCount + + " readers. Can not be freed now"); + } + return false; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("This block " + cacheKey + " is still referred by " + refCount + + " readers. Can not be freed now. Hence will mark this" + + " for evicting at a later point"); + } + bucketEntry.markedForEvict = true; + } } } catch (IOException ie) { LOG.warn("Failed evicting block " + cacheKey); @@ -1102,6 +1130,10 @@ public class BucketCache implements BlockCache, HeapSize { byte deserialiserIndex; private volatile long accessCounter; private BlockPriority priority; + // Set this when we were not able to forcefully evict the block + private volatile boolean markedForEvict; + private AtomicInteger refCount = new AtomicInteger(0); + /** * Time this block was cached. Presumes we are created just before we are added to the cache. */ @@ -1193,9 +1225,12 @@ public class BucketCache implements BlockCache, HeapSize { public long free(long toFree) { Map.Entry entry; long freedBytes = 0; + // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free + // What to do then? Caching attempt fail? Need some changes in cacheBlock API? while ((entry = queue.pollLast()) != null) { - evictBlock(entry.getKey()); - freedBytes += entry.getValue().getLength(); + if (evictBlock(entry.getKey(), false)) { + freedBytes += entry.getValue().getLength(); + } if (freedBytes >= toFree) { return freedBytes; } @@ -1399,4 +1434,26 @@ public class BucketCache implements BlockCache, HeapSize { public BlockCache[] getBlockCaches() { return null; } + + @Override + public boolean returnBlock(BlockCacheKey cacheKey) { + BucketEntry bucketEntry = backingMap.get(cacheKey); + if (bucketEntry != null) { + int refCount = bucketEntry.refCount.decrementAndGet(); + if (bucketEntry.markedForEvict && refCount == 0) { + evictBlock(cacheKey); + } + return true; + } + return false; + } + + @VisibleForTesting + public int getRefCount(BlockCacheKey cacheKey) { + BucketEntry bucketEntry = backingMap.get(cacheKey); + if (bucketEntry != null) { + return bucketEntry.refCount.get(); + } + return 0; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index de10667..d132f4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.BlockCache.MemoryType; import org.apache.hadoop.hbase.util.ByteBufferArray; +import org.apache.hadoop.hbase.util.Pair; /** * IO engine that stores data in memory using an array of ByteBuffers @@ -65,17 +67,26 @@ public class ByteBufferIOEngine implements IOEngine { /** * Transfers data from the buffer array to the given byte buffer - * @param dstBuffer the given byte buffer into which bytes are to be written * @param offset The offset in the ByteBufferArray of the first byte to be * read + * @param length The length of the ByteBuffer that should be allocated for + * reading from the underlying ByteBufferArray. * @return number of bytes read * @throws IOException */ @Override - public int read(ByteBuffer dstBuffer, long offset) throws IOException { - assert dstBuffer.hasArray(); - return bufferArray.getMultiple(offset, dstBuffer.remaining(), dstBuffer.array(), + public Pair read(long offset, int length) throws IOException { + // TODO : this allocate and copy will go away + ByteBuffer dstBuffer = ByteBuffer.allocate(length); + bufferArray.getMultiple(offset, dstBuffer.remaining(), dstBuffer.array(), dstBuffer.arrayOffset()); + // Here the buffer that is created directly refers to the buffer in the actual buckets. + // When any cell is referring to the blocks created out of these buckets then it means that + // those cells are referring to a shared memory area which if evicted by the BucketCache would lead + // to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY so that the + // readers using this block are aware of this fact and do the necessary action to prevent eviction + // till the results are either consumed or copied + return new Pair(dstBuffer, MemoryType.SHARED_MEMORY); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index 7b6b25f..b50b9f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -26,6 +26,8 @@ import java.nio.channels.FileChannel; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.BlockCache.MemoryType; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; /** @@ -79,14 +81,21 @@ public class FileIOEngine implements IOEngine { /** * Transfers data from file to the given byte buffer - * @param dstBuffer the given byte buffer into which bytes are to be written * @param offset The offset in the file where the first byte to be read + * @param length The length of buffer that should be allocated for reading + * from the file channel * @return number of bytes read * @throws IOException */ @Override - public int read(ByteBuffer dstBuffer, long offset) throws IOException { - return fileChannel.read(dstBuffer, offset); + public Pair read(long offset, int length) throws IOException { + ByteBuffer dstBuffer = ByteBuffer.allocate(length); + fileChannel.read(dstBuffer, offset); + // The buffer created out of the fileChannel is formed by copying the data from the file + // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts + // this buffer from the file the data is already copied and there is no need to ensure that + // the results are not corrupted before consuming them. + return new Pair(dstBuffer, MemoryType.NON_SHARED_MEMORY); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java index 430c5af..e0fe14d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.BlockCache.MemoryType; +import org.apache.hadoop.hbase.util.Pair; /** * A class implementing IOEngine interface supports data services for @@ -35,13 +37,13 @@ public interface IOEngine { boolean isPersistent(); /** - * Transfers data from IOEngine to the given byte buffer - * @param dstBuffer the given byte buffer into which bytes are to be written + * Transfers data from IOEngine to a byte buffer * @param offset The offset in the IO engine where the first byte to be read - * @return number of bytes read + * @param length How many bytes to be read from the offset + * @return Pair of ByteBuffer where data is read and its MemoryType ({@link MemoryType}) * @throws IOException */ - int read(ByteBuffer dstBuffer, long offset) throws IOException; + Pair read(long offset, int length) throws IOException; /** * Transfers data from the given byte buffer to IOEngine diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java index 90b7a87..81a4252 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; /** * Denotes a callback action that has to be executed at the end of an Rpc Call. * - * @see RpcCallContext#setCallBack(RpcCallback) + * @see RpcCallContext#setCallBack(RpcCallback callback) */ @InterfaceAudience.Private public interface RpcCallback { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c139296..41be94f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; @@ -87,6 +88,7 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.SharedMemoryBackedCell; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; @@ -143,6 +145,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; +import org.apache.hadoop.hbase.regionserver.RSRpcServices.RegionScannersCloseCallBack; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; @@ -2422,7 +2425,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ////////////////////////////////////////////////////////////////////////////// @Override - public Result getClosestRowBefore(final byte [] row, final byte [] family) throws IOException { + public Result getClosestRowBefore(final byte[] row, final byte[] family) throws IOException { if (coprocessorHost != null) { Result result = new Result(); if (coprocessorHost.preGetClosestRowBefore(row, family, result)) { @@ -2458,8 +2461,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return getScanner(scan, null); } - protected RegionScanner getScanner(Scan scan, - List additionalScanners) throws IOException { + protected RegionScanner getScanner(Scan scan, List additionalScanners) + throws IOException { startRegionOperation(Operation.SCAN); try { // Verify families are all valid @@ -5241,6 +5244,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected final byte[] stopRow; protected final HRegion region; protected final CellComparator comparator; + protected boolean copyCellsFromSharedMem = false; private final long readPt; private final long maxResultSize; @@ -5252,6 +5256,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return region.getRegionInfo(); } + public void setCopyCellsFromSharedMem(boolean copyCells) { + this.copyCellsFromSharedMem = copyCells; + } + RegionScannerImpl(Scan scan, List additionalScanners, HRegion region) throws IOException { this.region = region; @@ -5262,13 +5270,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.filter = null; } this.comparator = region.getCellCompartor(); - /** * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default * scanner context that can be used to enforce the batch limit in the event that a * ScannerContext is not specified during an invocation of next/nextRaw */ - defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); + defaultScannerContext = ScannerContext.newBuilder() + .setBatchLimit(scan.getBatch()).build(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) { this.stopRow = null; @@ -5384,24 +5392,52 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // scanner is closed throw new UnknownScannerException("Scanner was closed"); } - boolean moreValues; - if (outResults.isEmpty()) { - // Usually outResults is empty. This is true when next is called - // to handle scan or get operation. - moreValues = nextInternal(outResults, scannerContext); - } else { - List tmpList = new ArrayList(); - moreValues = nextInternal(tmpList, scannerContext); - outResults.addAll(tmpList); - } + boolean moreValues = false; + try { + if (outResults.isEmpty()) { + // Usually outResults is empty. This is true when next is called + // to handle scan or get operation. + moreValues = nextInternal(outResults, scannerContext); + } else { + List tmpList = new ArrayList(); + moreValues = nextInternal(tmpList, scannerContext); + outResults.addAll(tmpList); + } + + // If the size limit was reached it means a partial Result is being + // returned. Returning a + // partial Result means that we should not reset the filters; filters + // should only be reset in + // between rows + if (!scannerContext.partialResultFormed()) resetFilters(); - // If the size limit was reached it means a partial Result is being returned. Returning a - // partial Result means that we should not reset the filters; filters should only be reset in - // between rows - if (!scannerContext.partialResultFormed()) resetFilters(); + if (isFilterDoneInternal()) { + moreValues = false; + } - if (isFilterDoneInternal()) { - moreValues = false; + // If copyCellsFromSharedMem = true, then we need to copy the cells. Otherwise + // it is a call coming from the RsRpcServices.scan(). + if (copyCellsFromSharedMem && !outResults.isEmpty()) { + // Do the copy of the results here. + ListIterator listItr = outResults.listIterator(); + Cell cell = null; + while (listItr.hasNext()) { + cell = listItr.next(); + if (cell instanceof SharedMemoryBackedCell) { + try { + listItr.set(((SharedMemoryBackedCell) cell).clone()); + } catch (CloneNotSupportedException e) { + // will not happen + } + } + } + } + } finally { + if (copyCellsFromSharedMem) { + // In case of copyCellsFromSharedMem==true (where the CPs wrap a scanner) we return + // the blocks then and there (for wrapped CPs) + this.shipped(); + } } return moreValues; } @@ -6396,6 +6432,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public Result get(final Get get) throws IOException { + prepareGet(get); + List results = get(get, true); + boolean stale = this.getRegionInfo().getReplicaId() != 0; + return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); + } + + void prepareGet(final Get get) throws IOException, NoSuchColumnFamilyException { checkRow(get.getRow(), "Get"); // Verify families are all valid if (get.hasFamilies()) { @@ -6407,9 +6450,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi get.addFamily(family); } } - List results = get(get, true); - boolean stale = this.getRegionInfo().getReplicaId() != 0; - return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); } @Override @@ -6419,9 +6459,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // pre-get CP hook if (withCoprocessor && (coprocessorHost != null)) { - if (coprocessorHost.preGet(get, results)) { - return results; - } + if (coprocessorHost.preGet(get, results)) { + return results; + } } Scan scan = new Scan(get); @@ -6441,15 +6481,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // do after lock + metricsUpdate(results); + + return results; + } + + void metricsUpdate(List results) { if (this.metricsRegion != null) { long totalSize = 0L; for (Cell cell : results) { + // This should give an estimate of the cell in the result. Why do we need + // to know the serialization of how the codec works with it?? totalSize += CellUtil.estimatedSerializedSizeOf(cell); } this.metricsRegion.updateGet(totalSize); } - - return results; } @Override @@ -7220,7 +7266,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // New HBASE-880 Helpers // - private void checkFamily(final byte [] family) + void checkFamily(final byte [] family) throws NoSuchColumnFamilyException { if (!this.htableDescriptor.hasFamily(family)) { throw new NoSuchColumnFamilyException("Column family " + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 7483568..8a933f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.Comparator; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; import java.util.Set; @@ -47,6 +48,10 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner implements KeyValueScanner, InternalScanner { protected PriorityQueue heap = null; + // Holds the scanners when a ever a eager close() happens. All such eagerly closed + // scans are collected and when the final scanner.close() happens will perform the + // actual close. + protected Set scannersForDelayedClose = new HashSet(); /** * The current sub-scanner, i.e. the one that contains the next key/value @@ -62,8 +67,6 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner protected KVScannerComparator comparator; - protected Set scannersForDelayedClose = new HashSet(); - /** * Constructor. This KeyValueHeap will handle closing of passed in * KeyValueScanners. @@ -160,6 +163,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner */ if (pee == null || !moreCells) { + // add the scanner that is to be closed this.scannersForDelayedClose.add(this.current); } else { this.heap.add(this.current); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index d7be4b4..cd66410 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -154,7 +155,9 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.quotas.OperationQuota; +import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Leases.Lease; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Region.FlushResult; @@ -243,7 +246,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, /** * An Rpc callback for closing a RegionScanner. */ - private static class RegionScannerCloseCallBack implements RpcCallback { + static class RegionScannerCloseCallBack implements RpcCallback { private final RegionScanner scanner; @@ -284,6 +287,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } /** + * An Rpccallback that works with multiget() where all the scanners that are + * created is collected and at the end of the multiget() call the callback is + * set with the multi() RpcCallContext so that all the required callback is + * performed after the creation of the cellblock in the Rpc layer + */ + static class RegionScannersCloseCallBack implements RpcCallback { + private final List scanners = new ArrayList(); + + public void addScanner(RegionScanner scanner) { + this.scanners.add(scanner); + } + + @Override + public void run() { + for (RegionScanner scanner : scanners) { + try { + scanner.close(); + } catch (IOException e) { + LOG.error("Exception while closing the scanner " + scanner, e); + } + } + } + } + + /** * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. */ private static class RegionScannerHolder { @@ -337,7 +365,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (region != null && region.getCoprocessorHost() != null) { region.getCoprocessorHost().preScannerClose(s); } - s.close(); if (region != null && region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerClose(s); @@ -418,8 +445,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return context != null && context.isClientCellBlockSupport(); } - private void addResult(final MutateResponse.Builder builder, - final Result result, final PayloadCarryingRpcController rpcc) { + private void addResult(final MutateResponse.Builder builder, final Result result, + final PayloadCarryingRpcController rpcc) { if (result == null) return; if (isClientCellBlockSupport()) { builder.setResult(ProtobufUtil.toResultNoData(result)); @@ -616,6 +643,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param builder * @param cellsToReturn Could be null. May be allocated in this method. This is what this * method returns as a 'result'. + * @param controller * @return Return the cellScanner passed */ private List doNonAtomicRegionMutation(final Region region, @@ -626,13 +654,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // ResultOrException instance that matches each Put or Delete is then added down in the // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched List mutations = null; - for (ClientProtos.Action action: actions.getActionList()) { + RpcCallContext context = RpcServer.getCurrentCall(); + // An RpcCallBack that creates a list of scanners that needs to perform callBack operation on completion of multiGets + RegionScannersCloseCallBack closeCallBack = null; + for (ClientProtos.Action action : actions.getActionList()) { ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null; try { Result r = null; if (action.hasGet()) { + if (closeCallBack == null) { + // Initialize only once + closeCallBack = new RegionScannersCloseCallBack(); + // Set the call back here itself. + context.setCallBack(closeCallBack); + } Get get = ProtobufUtil.toGet(action.getGet()); - r = region.get(get); + r = get(get, ((HRegion) region), closeCallBack, context); } else if (action.hasServiceCall()) { resultOrExceptionBuilder = ResultOrException.newBuilder(); try { @@ -661,7 +698,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, r = append(region, quota, action.getMutation(), cellScanner, nonceGroup); break; case INCREMENT: - r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup); + r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup); break; case PUT: case DELETE: @@ -1930,7 +1967,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ClientProtos.Get get = request.getGet(); Boolean existence = null; Result r = null; - + RpcCallContext context = RpcServer.getCurrentCall(); quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { @@ -1948,7 +1985,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, existence = region.getCoprocessorHost().preExists(clientGet); } if (existence == null) { - r = region.get(clientGet); + if (isClientCellBlockSupport(context)) { + r = get(clientGet, ((HRegion) region), null, context); + } else { + r = region.get(clientGet); + } if (get.getExistenceOnly()) { boolean exists = r.getExists(); if (region.getCoprocessorHost() != null) { @@ -1983,6 +2024,52 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, + RpcCallContext context) throws IOException { + ((HRegion) region).prepareGet(get); + List results = new ArrayList(); + boolean stale = region.getRegionInfo().getReplicaId() != 0; + // pre-get CP hook + if (region.getCoprocessorHost() != null) { + if (region.getCoprocessorHost().preGet(get, results)) { + return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); + } + } + + Scan scan = new Scan(get); + + RegionScanner scanner = null; + try { + scanner = region.getScanner(scan); + scanner.next(results); + } finally { + if (scanner != null) { + if (closeCallBack == null) { + // If there is a context then the scanner can be added to the current + // RpcCallContext. The rpc callback will take care of closing the + // scanner, for eg in case + // of get() + assert scanner instanceof org.apache.hadoop.hbase.ipc.RpcCallback; + context.setCallBack((RegionScannerImpl) scanner); + } else { + // The call is from multi() where the results from the get() are + // aggregated and then send out to the + // rpc. The rpccall back will close all such scanners created as part + // of multi(). + closeCallBack.addScanner(scanner); + } + } + } + + // post-get CP hook + if (region.getCoprocessorHost() != null) { + region.getCoprocessorHost().postGet(get, results); + } + // do after lock + region.metricsUpdate(results); + return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); + } + /** * Execute multiple actions on a table: get, mutate, and/or execCoprocessor * @@ -2242,6 +2329,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean moreResults = true; boolean closeScanner = false; boolean isSmallScan = false; + Exception exception = null; + RegionScanner actualRegionScanner = null; ScanResponse.Builder builder = ScanResponse.newBuilder(); if (request.hasCloseScanner()) { closeScanner = request.getCloseScanner(); @@ -2288,15 +2377,25 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (scanner == null) { scanner = region.getScanner(scan); } + actualRegionScanner = scanner; if (region.getCoprocessorHost() != null) { scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); } + if (actualRegionScanner != scanner) { + // It means the RegionScanner has been wrapped + if (actualRegionScanner instanceof RegionScannerImpl) { + // Copy the results when nextRaw is called from the CP so that + // CP can have a cloned version of the results without bothering + // about the eviction. Ugly, yes!!! + ((RegionScannerImpl) actualRegionScanner).setCopyCellsFromSharedMem(true); + } + } scannerId = this.scannerIdGen.incrementAndGet(); scannerName = String.valueOf(scannerId); rsh = addScanner(scannerName, scanner, region); ttl = this.scannerLeaseTimeoutPeriod; } - + assert scanner != null; RpcCallContext context = RpcServer.getCurrentCall(); quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); @@ -2307,9 +2406,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // performed even before checking of Lease. // See HBASE-5974 if (request.hasNextCallSeq()) { - if (rsh == null) { - rsh = scanners.get(scannerName); - } if (rsh != null) { if (request.getNextCallSeq() != rsh.getNextCallSeq()) { throw new OutOfOrderScannerNextException( @@ -2423,7 +2519,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, contextBuilder.setTimeLimit(timeScope, timeLimit); contextBuilder.setTrackMetrics(trackMetrics); ScannerContext scannerContext = contextBuilder.build(); - boolean limitReached = false; while (i < rows) { // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The @@ -2500,7 +2595,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } finally { region.closeRegionOperation(); } - // coprocessor postNext hook if (region != null && region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 891a59d..170a997 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -1047,6 +1048,7 @@ public class StoreFile { private long deleteFamilyCnt = -1; private boolean bulkLoadResult = false; private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null; + private CacheConfig cacheConf; public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException { @@ -1056,6 +1058,7 @@ public class StoreFile { public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, Configuration conf) throws IOException { + this.cacheConf = cacheConf; reader = HFile.createReader(fs, path, in, size, cacheConf, conf); bloomFilterType = BloomType.NONE; } @@ -1282,7 +1285,7 @@ public class StoreFile { // Empty file if (reader.getTrailer().getEntryCount() == 0) return false; - + HFileBlock bloomBlock = null; try { boolean shouldCheckBloom; ByteBuffer bloom; @@ -1290,8 +1293,8 @@ public class StoreFile { bloom = null; shouldCheckBloom = true; } else { - bloom = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY, - true); + bloomBlock = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY, true); + bloom = bloomBlock.getBufferWithoutHeader(); shouldCheckBloom = bloom != null; } @@ -1343,8 +1346,10 @@ public class StoreFile { } catch (IllegalArgumentException e) { LOG.error("Bad bloom filter data -- proceeding without", e); setGeneralBloomFilterFaulty(); + } finally { + // Return the bloom block so that its ref count can be decremented. + reader.returnBlock(bloomBlock); } - return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 701cf8a..dbe875d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -55,8 +55,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; * into List<KeyValue> for a single row. */ @InterfaceAudience.Private -public class StoreScanner extends NonReversedNonLazyKeyValueScanner - implements KeyValueScanner, InternalScanner, ChangedReadersObserver { +public class StoreScanner extends NonReversedNonLazyKeyValueScanner implements KeyValueScanner, + InternalScanner, ChangedReadersObserver { private static final Log LOG = LogFactory.getLog(StoreScanner.class); protected Store store; protected ScanQueryMatcher matcher; @@ -86,6 +86,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected final long maxRowSize; protected final long cellsPerHeartbeatCheck; + // Collects all the KVHeap that are eagerly getting closed during the + // course of a scan protected Set heapsForDelayedClose = new HashSet(); /** @@ -375,9 +377,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } } - protected void resetKVHeap(List scanners, - CellComparator comparator) throws IOException { - // Combine all seeked scanners with a heap + protected void resetKVHeap(List scanners, CellComparator comparator) + throws IOException { heap = new KeyValueHeap(scanners, comparator); } @@ -444,10 +445,19 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner close(true); } - private void close(boolean withHeapClose){ + private void close(boolean withHeapClose) { lock.lock(); try { - if (this.closing) return; + if (this.closing) { + if (withHeapClose) { + try { + shipped(); + } catch (Exception e) { + LOG.error("Exception while closing "+e); + } + } + return; + } this.closing = true; // under test, we dont have a this.store if (this.store != null) this.store.deleteChangedReaderObserver(this); @@ -510,6 +520,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // if the heap was left null, then the scanners had previously run out anyways, close and // return. if (this.heap == null) { + // By this time partial close should happened because already heap is null close(false);// Do all cleanup except heap.close() return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } @@ -556,7 +567,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. checkScanOrder(prevCell, cell, comparator); prevCell = cell; - ScanQueryMatcher.MatchCode qcode = matcher.match(cell); qcode = optimize(qcode, cell); switch(qcode) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index e8b79a8..d711d95 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1687,6 +1687,24 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return (HTable) getConnection().getTable(tableName); } + public HTable createTable(TableName tableName, byte[][] families, + int numVersions, int blockSize, String cpName) throws IOException { + HTableDescriptor desc = new HTableDescriptor(tableName); + for (byte[] family : families) { + HColumnDescriptor hcd = new HColumnDescriptor(family) + .setMaxVersions(numVersions) + .setBlocksize(blockSize); + desc.addFamily(hcd); + } + if(cpName != null) { + desc.addCoprocessor(cpName); + } + getHBaseAdmin().createTable(desc); + // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned + waitUntilAllRegionsAssigned(tableName); + return (HTable) getConnection().getTable(tableName); + } + /** * Create a table. * @param tableName diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java new file mode 100644 index 0000000..9234453 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java @@ -0,0 +1,1439 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.CachedBlock; +import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class, ClientTests.class }) +@SuppressWarnings("deprecation") +public class TestBlockEvictionFromClient { + private static final Log LOG = LogFactory.getLog(TestBlockEvictionFromClient.class); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + static byte[][] ROWS = new byte[2][]; + private static int NO_OF_THREADS = 3; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static byte[] ROW1 = Bytes.toBytes("testRow1"); + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[][] FAMILIES_1 = new byte[1][0]; + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + private static byte[] data = new byte[1000]; + private static byte[] data2 = Bytes.add(data, data); + protected static int SLAVES = 1; + private static CountDownLatch latch; + private static CountDownLatch getLatch; + private static CountDownLatch compactionLatch; + private static CountDownLatch exceptionLatch; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + ROWS[0] = ROW; + ROWS[1] = ROW1; + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName()); + conf.setBoolean("hbase.table.sanity.checks", true); // enable for below + // tests + conf.setInt("hbase.regionserver.handler.count", 20); + conf.setInt("hbase.bucketcache.size", 400); + conf.setStrings("hbase.bucketcache.ioengine", "heap"); + conf.setFloat("hfile.block.cache.size", 0.2f); + conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); + FAMILIES_1[0] = FAMILY; + TEST_UTIL.startMiniCluster(SLAVES); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + CustomInnerRegionObserver.waitForGets.set(false); + CustomInnerRegionObserver.countOfNext.set(0); + CustomInnerRegionObserver.countOfGets.set(0); + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + if (latch != null) { + while (latch.getCount() > 0) { + latch.countDown(); + } + } + if (getLatch != null) { + getLatch.countDown(); + } + if (compactionLatch != null) { + compactionLatch.countDown(); + } + if (exceptionLatch != null) { + exceptionLatch.countDown(); + } + latch = null; + getLatch = null; + compactionLatch = null; + exceptionLatch = null; + CustomInnerRegionObserver.throwException.set(false); + // Clean up the tables for every test case + TableName[] listTableNames = TEST_UTIL.getHBaseAdmin().listTableNames(); + for (TableName tableName : listTableNames) { + if (!tableName.isSystemTable()) { + TEST_UTIL.getHBaseAdmin().disableTable(tableName); + TEST_UTIL.getHBaseAdmin().deleteTable(tableName); + } + } + } + + @Test + public void testBlockEvictionWithParallelScans() throws Exception { + HTable table = null; + try { + latch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testBlockEvictionWithParallelScans"); + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + // insert data. 2 Rows are added + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); + // data was in memstore so don't expect any changes + // flush the data + System.out.println("Flushing cache in problematic area"); + // Should create one Hfile with 2 blocks + region.flush(true); + // Load cache + // Create three sets of scan + ScanThread[] scanThreads = initiateScan(table, false); + Thread.sleep(100); + checkForBlockEviction(cache, false, false, false); + for (ScanThread thread : scanThreads) { + thread.join(); + } + // CustomInnerRegionObserver.sleepTime.set(0); + Iterator iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + // read the data and expect same blocks, one new hit, no misses + assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + // Check how this miss is happening + // insert a second column, read the row, no new blocks, 3 new hits + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + byte[] data2 = Bytes.add(data, data); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + Result r = table.get(new Get(ROW)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + // flush, one new block + System.out.println("Flushing cache"); + region.flush(true); + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + // compact, net minus two blocks, two hits, no misses + System.out.println("Compacting"); + assertEquals(2, store.getStorefilesCount()); + store.triggerMajorCompaction(); + region.compact(true); + waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max + assertEquals(1, store.getStorefilesCount()); + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + // read the row, this should be a cache miss because we don't cache data + // blocks on compaction + r = table.get(new Get(ROW)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testParallelGetsAndScans() throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(2); + // Check if get() returns blocks on its close() itself + getLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testParallelGetsAndScans"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + insertData(table); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + region.flush(true); + // Create three sets of scan + CustomInnerRegionObserver.waitForGets.set(true); + ScanThread[] scanThreads = initiateScan(table, false); + // Create three sets of gets + GetThread[] getThreads = initiateGet(table, false, false); + checkForBlockEviction(cache, false, false, false); + CustomInnerRegionObserver.waitForGets.set(false); + checkForBlockEviction(cache, false, false, false); + for (GetThread thread : getThreads) { + thread.join(); + } + // Verify whether the gets have returned the blocks that it had + CustomInnerRegionObserver.waitForGets.set(true); + // giving some time for the block to be decremented + checkForBlockEviction(cache, true, false, false); + getLatch.countDown(); + for (ScanThread thread : scanThreads) { + thread.join(); + } + System.out.println("Scans should have returned the bloks"); + // Check with either true or false + CustomInnerRegionObserver.waitForGets.set(false); + // The scan should also have released the blocks by now + checkForBlockEviction(cache, true, true, false); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(1); + // Check if get() returns blocks on its close() itself + getLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testGetWithCellsInDifferentFiles"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + region.flush(true); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + CustomInnerRegionObserver.waitForGets.set(true); + // Create three sets of gets + GetThread[] getThreads = initiateGet(table, false, false); + Thread.sleep(200); + CustomInnerRegionObserver.getCdl().get().countDown(); + for (GetThread thread : getThreads) { + thread.join(); + } + // Verify whether the gets have returned the blocks that it had + CustomInnerRegionObserver.waitForGets.set(true); + // giving some time for the block to be decremented + checkForBlockEviction(cache, true, false, false); + getLatch.countDown(); + System.out.println("Gets should have returned the bloks"); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + // TODO : check how block index works here + public void testGetsWithMultiColumnsAndExplicitTracker() throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(1); + // Check if get() returns blocks on its close() itself + getLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testGetsWithMultiColumnsAndExplicitTracker"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + BlockCache cache = setCacheProperties(region); + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + for (int i = 1; i < 10; i++) { + put = new Put(ROW); + put.add(FAMILY, Bytes.toBytes("testQualifier" + i), data2); + table.put(put); + if (i % 2 == 0) { + region.flush(true); + } + } + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + region.flush(true); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + CustomInnerRegionObserver.waitForGets.set(true); + // Create three sets of gets + GetThread[] getThreads = initiateGet(table, true, false); + Thread.sleep(200); + Iterator iterator = cache.iterator(); + boolean usedBlocksFound = false; + int refCount = 0; + int noOfBlocksWithRef = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 + System.out.println("The refCount is " + refCount); + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + noOfBlocksWithRef++; + } + } + assertTrue(usedBlocksFound); + // the number of blocks referred + assertEquals(10, noOfBlocksWithRef); + CustomInnerRegionObserver.getCdl().get().countDown(); + for (GetThread thread : getThreads) { + thread.join(); + } + // Verify whether the gets have returned the blocks that it had + CustomInnerRegionObserver.waitForGets.set(true); + // giving some time for the block to be decremented + checkForBlockEviction(cache, true, false, false); + getLatch.countDown(); + System.out.println("Gets should have returned the bloks"); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(1); + // Check if get() returns blocks on its close() itself + getLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testGetWithMultipleColumnFamilies"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + byte[][] fams = new byte[10][]; + fams[0] = FAMILY; + for (int i = 1; i < 10; i++) { + fams[i] = (Bytes.toBytes("testFamily" + i)); + } + table = TEST_UTIL.createTable(tableName, fams, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + BlockCache cache = setCacheProperties(region); + + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + for (int i = 1; i < 10; i++) { + put = new Put(ROW); + put.add(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); + table.put(put); + if (i % 2 == 0) { + region.flush(true); + } + } + region.flush(true); + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + region.flush(true); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + CustomInnerRegionObserver.waitForGets.set(true); + // Create three sets of gets + GetThread[] getThreads = initiateGet(table, true, true); + Thread.sleep(200); + Iterator iterator = cache.iterator(); + boolean usedBlocksFound = false; + int refCount = 0; + int noOfBlocksWithRef = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 + System.out.println("The refCount is " + refCount); + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + noOfBlocksWithRef++; + } + } + assertTrue(usedBlocksFound); + // the number of blocks referred + assertEquals(3, noOfBlocksWithRef); + CustomInnerRegionObserver.getCdl().get().countDown(); + for (GetThread thread : getThreads) { + thread.join(); + } + // Verify whether the gets have returned the blocks that it had + CustomInnerRegionObserver.waitForGets.set(true); + // giving some time for the block to be decremented + checkForBlockEviction(cache, true, false, false); + getLatch.countDown(); + System.out.println("Gets should have returned the bloks"); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testMultiGets() throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(2); + // Check if get() returns blocks on its close() itself + getLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testMultiGets"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + region.flush(true); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + CustomInnerRegionObserver.waitForGets.set(true); + // Create three sets of gets + MultiGetThread[] getThreads = initiateMultiGet(table); + Thread.sleep(200); + int refCount; + Iterator iterator = cache.iterator(); + boolean foundNonZeroBlock = false; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + assertEquals(NO_OF_THREADS, refCount); + foundNonZeroBlock = true; + } + } + assertTrue("Should have found nonzero ref count block",foundNonZeroBlock); + CustomInnerRegionObserver.getCdl().get().countDown(); + CustomInnerRegionObserver.getCdl().get().countDown(); + for (MultiGetThread thread : getThreads) { + thread.join(); + } + // Verify whether the gets have returned the blocks that it had + CustomInnerRegionObserver.waitForGets.set(true); + // giving some time for the block to be decremented + iterateBlockCache(cache, iterator); + getLatch.countDown(); + System.out.println("Gets should have returned the bloks"); + } finally { + if (table != null) { + table.close(); + } + } + } + @Test + public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(1); + // Check if get() returns blocks on its close() itself + TableName tableName = TableName.valueOf("testScanWithMultipleColumnFamilies"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + byte[][] fams = new byte[10][]; + fams[0] = FAMILY; + for (int i = 1; i < 10; i++) { + fams[i] = (Bytes.toBytes("testFamily" + i)); + } + table = TEST_UTIL.createTable(tableName, fams, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + BlockCache cache = setCacheProperties(region); + + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + for (int i = 1; i < 10; i++) { + put = new Put(ROW); + put.add(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); + table.put(put); + if (i % 2 == 0) { + region.flush(true); + } + } + region.flush(true); + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + region.flush(true); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + // Create three sets of gets + ScanThread[] scanThreads = initiateScan(table, true); + Thread.sleep(200); + Iterator iterator = cache.iterator(); + boolean usedBlocksFound = false; + int refCount = 0; + int noOfBlocksWithRef = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 + System.out.println("The refCount is " + refCount); + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + noOfBlocksWithRef++; + } + } + assertTrue(usedBlocksFound); + // the number of blocks referred + assertEquals(12, noOfBlocksWithRef); + CustomInnerRegionObserver.getCdl().get().countDown(); + for (ScanThread thread : scanThreads) { + thread.join(); + } + // giving some time for the block to be decremented + checkForBlockEviction(cache, true, false, false); + } finally { + if (table != null) { + table.close(); + } + } + } + + private BlockCache setCacheProperties(Region region) { + Iterator strItr = region.getStores().iterator(); + BlockCache cache = null; + while (strItr.hasNext()) { + Store store = strItr.next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + // Use the last one + cache = cacheConf.getBlockCache(); + } + return cache; + } + + @Test + public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException, + InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(2); + // Check if get() returns blocks on its close() itself + getLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testParallelGetsAndScanWithWrappedRegionScanner"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserverWrapper.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + // insert data. 2 Rows are added + insertData(table); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + region.flush(true); + // CustomInnerRegionObserver.sleepTime.set(5000); + // Create three sets of scan + CustomInnerRegionObserver.waitForGets.set(true); + ScanThread[] scanThreads = initiateScan(table, false); + // Create three sets of gets + GetThread[] getThreads = initiateGet(table, false, false); + // The block would have been decremented for the scan case as it was + // wrapped + // before even the postNext hook gets executed. + // giving some time for the block to be decremented + Thread.sleep(100); + CustomInnerRegionObserver.waitForGets.set(false); + checkForBlockEviction(cache, false, false, true); + // countdown the latch + CustomInnerRegionObserver.getCdl().get().countDown(); + for (GetThread thread : getThreads) { + thread.join(); + } + getLatch.countDown(); + for (ScanThread thread : scanThreads) { + thread.join(); + } + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testScanWithCompaction() throws IOException, InterruptedException { + testScanWithCompactionInternals("testScanWithCompaction", false); + } + + @Test + public void testReverseScanWithCompaction() throws IOException, InterruptedException { + testScanWithCompactionInternals("testReverseScanWithCompaction", true); + } + + private void testScanWithCompactionInternals(String tableNameStr, boolean reversed) + throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(1); + compactionLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf(tableNameStr); + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserverWrapper.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + // insert data. 2 Rows are added + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); + // Should create one Hfile with 2 blocks + region.flush(true); + // read the data and expect same blocks, one new hit, no misses + int refCount = 0; + // Check how this miss is happening + // insert a second column, read the row, no new blocks, 3 new hits + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + byte[] data2 = Bytes.add(data, data); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + // flush, one new block + System.out.println("Flushing cache"); + region.flush(true); + Iterator iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + // Create three sets of scan + ScanThread[] scanThreads = initiateScan(table, reversed); + Thread.sleep(100); + iterator = cache.iterator(); + boolean usedBlocksFound = false; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + } + } + assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); + usedBlocksFound = false; + System.out.println("Compacting"); + assertEquals(2, store.getStorefilesCount()); + store.triggerMajorCompaction(); + region.compact(true); + waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max + assertEquals(1, store.getStorefilesCount()); + // Even after compaction is done we will have some blocks that cannot + // be evicted this is because the scan is still referencing them + iterator = cache.iterator(); + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 as they are not yet cleared + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + } + } + assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); + // Should not throw exception + compactionLatch.countDown(); + latch.countDown(); + for (ScanThread thread : scanThreads) { + thread.join(); + } + // by this time all blocks should have been evicted + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + Result r = table.get(new Get(ROW)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); + // The gets would be working on new blocks + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testScanWithException() throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(1); + exceptionLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testScanWithException"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserverWrapper.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + // insert data. 2 Rows are added + insertData(table); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + region.flush(true); + // CustomInnerRegionObserver.sleepTime.set(5000); + CustomInnerRegionObserver.throwException.set(true); + ScanThread[] scanThreads = initiateScan(table, false); + // The block would have been decremented for the scan case as it was + // wrapped + // before even the postNext hook gets executed. + // giving some time for the block to be decremented + Thread.sleep(100); + Iterator iterator = cache.iterator(); + boolean usedBlocksFound = false; + int refCount = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + } + } + assertTrue(usedBlocksFound); + exceptionLatch.countDown(); + // countdown the latch + CustomInnerRegionObserver.getCdl().get().countDown(); + for (ScanThread thread : scanThreads) { + thread.join(); + } + iterator = cache.iterator(); + usedBlocksFound = false; + refCount = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + } + } + assertTrue(usedBlocksFound); + // Sleep till the scan lease would expire? Can we reduce this value? + Thread.sleep(5100); + iterator = cache.iterator(); + refCount = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + assertEquals(0, refCount); + } + } finally { + if (table != null) { + table.close(); + } + } + } + + private void iterateBlockCache(BlockCache cache, Iterator iterator) { + int refCount; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + assertEquals(0, refCount); + } + } + + private void insertData(HTable table) throws IOException { + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + } + + private ScanThread[] initiateScan(HTable table, boolean reverse) throws IOException, + InterruptedException { + ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS]; + for (int i = 0; i < NO_OF_THREADS; i++) { + scanThreads[i] = new ScanThread(table, reverse); + } + for (ScanThread thread : scanThreads) { + thread.start(); + } + return scanThreads; + } + + private GetThread[] initiateGet(HTable table, boolean tracker, boolean multipleCFs) + throws IOException, InterruptedException { + GetThread[] getThreads = new GetThread[NO_OF_THREADS]; + for (int i = 0; i < NO_OF_THREADS; i++) { + getThreads[i] = new GetThread(table, tracker, multipleCFs); + } + for (GetThread thread : getThreads) { + thread.start(); + } + return getThreads; + } + + private MultiGetThread[] initiateMultiGet(HTable table) + throws IOException, InterruptedException { + MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS]; + for (int i = 0; i < NO_OF_THREADS; i++) { + multiGetThreads[i] = new MultiGetThread(table); + } + for (MultiGetThread thread : multiGetThreads) { + thread.start(); + } + return multiGetThreads; + } + + private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero, + boolean wrappedCp) throws InterruptedException { + int counter = NO_OF_THREADS; + if (CustomInnerRegionObserver.waitForGets.get()) { + // Because only one row is selected, it has only 2 blocks + counter = counter - 1; + while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) { + Thread.sleep(100); + } + } else { + while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) { + Thread.sleep(100); + } + } + Iterator iterator = cache.iterator(); + int refCount = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + System.out.println(" the refcount is " + refCount + " block is " + cacheKey); + if (CustomInnerRegionObserver.waitForGets.get()) { + if (expectOnlyZero) { + assertTrue(refCount == 0); + } + if (refCount != 0) { + // Because the scan would have also touched up on these blocks but + // it + // would have touched + // all 3 + if (getClosed) { + // If get has closed only the scan's blocks would be available + assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get()); + } else { + assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS)); + } + } + } else { + // Because the get would have also touched up on these blocks but it + // would have touched + // upon only 2 additionally + if (expectOnlyZero) { + assertTrue(refCount == 0); + } + if (refCount != 0) { + if (getLatch == null || wrappedCp) { + assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get()); + } else { + assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS)); + } + } + } + } + CustomInnerRegionObserver.getCdl().get().countDown(); + } + + private static class MultiGetThread extends Thread { + private final HTable table; + private final List gets = new ArrayList(); + public MultiGetThread(HTable table) { + this.table = table; + } + @Override + public void run() { + gets.add(new Get(ROW)); + gets.add(new Get(ROW1)); + try { + CustomInnerRegionObserver.getCdl().set(latch); + Result[] r = table.get(gets); + assertTrue(Bytes.equals(r[0].getRow(), ROW)); + assertTrue(Bytes.equals(r[1].getRow(), ROW1)); + } catch (IOException e) { + } + } + } + + private static class GetThread extends Thread { + private final HTable table; + private final boolean tracker; + private final boolean multipleCFs; + + public GetThread(HTable table, boolean tracker, boolean multipleCFs) { + this.table = table; + this.tracker = tracker; + this.multipleCFs = multipleCFs; + } + + @Override + public void run() { + try { + initiateGet(table); + } catch (IOException e) { + // do nothing + } + } + + private void initiateGet(HTable table) throws IOException { + Get get = new Get(ROW); + if (tracker) { + // Change this + if (!multipleCFs) { + get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3)); + get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8)); + get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9)); + // Unknown key + get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900)); + } else { + get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)); + get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)); + get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)); + // Unknown key + get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900)); + } + } + CustomInnerRegionObserver.getCdl().set(latch); + Result r = table.get(get); + System.out.println(r); + if (!tracker) { + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); + } else { + if (!multipleCFs) { + assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2)); + assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2)); + assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2)); + } else { + assertTrue(Bytes.equals( + r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)), + data2)); + assertTrue(Bytes.equals( + r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)), + data2)); + assertTrue(Bytes.equals( + r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)), + data2)); + } + } + } + } + + private static class ScanThread extends Thread { + private final HTable table; + private final boolean reverse; + + public ScanThread(HTable table, boolean reverse) { + this.table = table; + this.reverse = reverse; + } + + @Override + public void run() { + try { + initiateScan(table); + } catch (IOException e) { + // do nothing + } + } + + private void initiateScan(HTable table) throws IOException { + Scan scan = new Scan(); + if (reverse) { + scan.setReversed(true); + } + CustomInnerRegionObserver.getCdl().set(latch); + ResultScanner resScanner = table.getScanner(scan); + int i = (reverse ? ROWS.length - 1 : 0); + boolean resultFound = false; + for (Result result : resScanner) { + resultFound = true; + System.out.println(result); + if (!reverse) { + assertTrue(Bytes.equals(result.getRow(), ROWS[i])); + i++; + } else { + assertTrue(Bytes.equals(result.getRow(), ROWS[i])); + i--; + } + } + assertTrue(resultFound); + } + } + + private void waitForStoreFileCount(Store store, int count, int timeout) + throws InterruptedException { + long start = System.currentTimeMillis(); + while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) { + Thread.sleep(100); + } + System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" + + store.getStorefilesCount()); + assertEquals(count, store.getStorefilesCount()); + } + + private static class CustomScanner implements RegionScanner { + + private RegionScanner delegate; + + public CustomScanner(RegionScanner delegate) { + this.delegate = delegate; + } + + @Override + public boolean next(List results) throws IOException { + return delegate.next(results); + } + + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + return delegate.next(result, scannerContext); + } + + @Override + public boolean nextRaw(List result) throws IOException { + return delegate.nextRaw(result); + } + + @Override + public boolean nextRaw(List result, ScannerContext context) throws IOException { + boolean nextRaw = delegate.nextRaw(result, context); + if (compactionLatch != null && compactionLatch.getCount() > 0) { + try { + compactionLatch.await(); + } catch (InterruptedException ie) { + } + } + + if (CustomInnerRegionObserver.throwException.get()) { + if (exceptionLatch.getCount() > 0) { + try { + exceptionLatch.await(); + } catch (InterruptedException e) { + } + throw new IOException("throw exception"); + } + } + return nextRaw; + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public HRegionInfo getRegionInfo() { + return delegate.getRegionInfo(); + } + + @Override + public boolean isFilterDone() throws IOException { + return delegate.isFilterDone(); + } + + @Override + public boolean reseek(byte[] row) throws IOException { + return false; + } + + @Override + public long getMaxResultSize() { + return delegate.getMaxResultSize(); + } + + @Override + public long getMvccReadPoint() { + return delegate.getMvccReadPoint(); + } + + @Override + public int getBatch() { + return delegate.getBatch(); + } + + @Override + public void shipped() throws IOException { + this.delegate.shipped(); + } + } + + public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { + @Override + public RegionScanner postScannerOpen(ObserverContext e, + Scan scan, RegionScanner s) throws IOException { + return new CustomScanner(s); + } + } + + public static class CustomInnerRegionObserver extends BaseRegionObserver { + static final AtomicLong sleepTime = new AtomicLong(0); + static final AtomicBoolean slowDownNext = new AtomicBoolean(false); + static final AtomicInteger countOfNext = new AtomicInteger(0); + static final AtomicInteger countOfGets = new AtomicInteger(0); + static final AtomicBoolean waitForGets = new AtomicBoolean(false); + static final AtomicBoolean throwException = new AtomicBoolean(false); + private static final AtomicReference cdl = new AtomicReference( + new CountDownLatch(0)); + + @Override + public boolean postScannerNext(ObserverContext e, + InternalScanner s, List results, int limit, boolean hasMore) throws IOException { + slowdownCode(e, false); + if (getLatch != null && getLatch.getCount() > 0) { + try { + getLatch.await(); + } catch (InterruptedException e1) { + } + } + return super.postScannerNext(e, s, results, limit, hasMore); + } + + @Override + public void postGetOp(ObserverContext e, Get get, + List results) throws IOException { + slowdownCode(e, true); + super.postGetOp(e, get, results); + } + + public static AtomicReference getCdl() { + return cdl; + } + + private void slowdownCode(final ObserverContext e, boolean isGet) { + CountDownLatch latch = getCdl().get(); + try { + System.out.println(latch.getCount() + " is the count " + isGet); + if (latch.getCount() > 0) { + if (isGet) { + countOfGets.incrementAndGet(); + } else { + countOfNext.incrementAndGet(); + } + LOG.info("Waiting for the counterCountDownLatch"); + latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. + if (latch.getCount() > 0) { + throw new RuntimeException("Can't wait more"); + } + } + } catch (InterruptedException e1) { + LOG.error(e1); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index b0a2ba2..4808679 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.BlockCache.MemoryType; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.util.ChecksumType; @@ -267,7 +268,7 @@ public class CacheTestUtils { } @Override - public Cacheable deserialize(ByteBuffer b, boolean reuse) + public Cacheable deserialize(ByteBuffer b, boolean reuse, MemoryType memType) throws IOException { return deserialize(b); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java index ce78a37..f854e6b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.io.hfile.BlockCache.MemoryType; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.util.Threads; import org.junit.After; @@ -71,7 +72,7 @@ public class TestCacheConfig { } @Override - public Cacheable deserialize(ByteBuffer b, boolean reuse) throws IOException { + public Cacheable deserialize(ByteBuffer b, boolean reuse, MemoryType type) throws IOException { LOG.info("Deserialized " + b + ", reuse=" + reuse); return cacheable; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 77311bc..ff87bae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -28,8 +28,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.EnumMap; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Random; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -290,6 +295,8 @@ public class TestCacheOnWrite { DataBlockEncoding encodingInCache = encoderType.getEncoder().getDataBlockEncoding(); + List cachedBlocksOffset = new ArrayList(); + Map cachedBlocks = new HashMap(); while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { long onDiskSize = -1; if (prevBlock != null) { @@ -303,6 +310,8 @@ public class TestCacheOnWrite { offset); HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); boolean isCached = fromCache != null; + cachedBlocksOffset.add(offset); + cachedBlocks.put(offset, fromCache); boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); assertTrue("shouldBeCached: " + shouldBeCached+ "\n" + "isCached: " + isCached + "\n" + @@ -355,6 +364,27 @@ public class TestCacheOnWrite { while (scanner.next()) { scanner.getCell(); } + Iterator iterator = cachedBlocksOffset.iterator(); + while(iterator.hasNext()) { + Long entry = iterator.next(); + BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), + entry); + HFileBlock hFileBlock = cachedBlocks.get(entry); + if (hFileBlock != null) { + // call return twice because for the isCache cased the counter would have got incremented twice + blockCache.returnBlock(blockCacheKey); + if(cacheCompressedData) { + if (this.compress == Compression.Algorithm.NONE + || cowType == CacheOnWriteType.INDEX_BLOCKS + || cowType == CacheOnWriteType.BLOOM_BLOCKS) { + blockCache.returnBlock(blockCacheKey); + } + } else { + blockCache.returnBlock(blockCacheKey); + } + } + } + scanner.shipped(); reader.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index fa0cfec..e7fe98c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -329,7 +329,7 @@ public class TestHFile extends HBaseTestCase { private void readNumMetablocks(Reader reader, int n) throws IOException { for (int i = 0; i < n; i++) { - ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false); + ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false).getBufferWithoutHeader(); ByteBuffer expected = ByteBuffer.wrap(("something to test" + i).getBytes()); assertEquals("failed to match metadata", diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 1bfd18c..6d645ea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.BlockCache.MemoryType; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -461,7 +462,8 @@ public class TestHFileBlock { ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength()); blockFromHFile.serialize(serialized); HFileBlock deserialized = - (HFileBlock) blockFromHFile.getDeserializer().deserialize(serialized, reuseBuffer); + (HFileBlock) blockFromHFile.getDeserializer().deserialize(serialized, reuseBuffer, + MemoryType.NON_SHARED_MEMORY); assertEquals( "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer, blockFromHFile, deserialized); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index 279c4ea..caff960 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -167,6 +167,10 @@ public class TestHFileBlockIndex { } @Override + public void returnBlock(HFileBlock block) { + } + + @Override public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread, boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java index 511f942..9aa23c7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java @@ -22,8 +22,10 @@ import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.io.hfile.BlockCache.MemoryType; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Pair; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -63,8 +65,8 @@ public class TestByteBufferIOEngine { offset = (int) (Math.random() * (capacity - maxBlockSize)); } ioEngine.write(srcBuffer, offset); - ByteBuffer dstBuffer = ByteBuffer.allocate(blockSize); - ioEngine.read(dstBuffer, offset); + Pair pair = ioEngine.read(offset, blockSize); + ByteBuffer dstBuffer = pair.getFirst(); byte[] byteArray2 = dstBuffer.array(); for (int j = 0; j < byteArray.length; ++j) { assertTrue(byteArray[j] == byteArray2[j]); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java index 8306114..e8ea8cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java @@ -24,8 +24,10 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.io.hfile.BlockCache.MemoryType; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Pair; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -47,9 +49,9 @@ public class TestFileIOEngine { for (int j = 0; j < data1.length; ++j) { data1[j] = (byte) (Math.random() * 255); } - byte[] data2 = new byte[len]; fileIOEngine.write(ByteBuffer.wrap(data1), offset); - fileIOEngine.read(ByteBuffer.wrap(data2), offset); + Pair pair = fileIOEngine.read(offset, len); + byte[] data2 = pair.getFirst().array(); for (int j = 0; j < data1.length; ++j) { assertTrue(data1[j] == data2[j]); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index fbda6b8..d943415 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheStats; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.CachedBlock; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext; @@ -578,9 +579,14 @@ public class TestHeapMemoryManager { return null; } - public void setTestBlockSize(long testBlockSize) { - this.testBlockSize = testBlockSize; - } + @Override + public boolean returnBlock(BlockCacheKey cacheKey) { + return true; + } + + public void setTestBlockSize(long testBlockSize) { + this.testBlockSize = testBlockSize; + } } private static class MemstoreFlusherStub implements FlushRequester {