diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 37d3508ab18..dd5b77890fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -556,6 +556,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { this.curBlock = null; } } + @Override public boolean isSeeked(){ return blockBuffer != null; @@ -877,13 +878,15 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // The key we are interested in if (previousBlockOffset == -1) { // we have a 'problem', the key we want is the first of the file. + releaseIfNotCurBlock(seekToBlock); return false; } // The first key in the current block 'seekToBlock' is greater than the given // seekBefore key. We will go ahead by reading the next block that satisfies the // given key. Return the current block before reading the next one. - reader.returnBlock(seekToBlock); + releaseIfNotCurBlock(seekToBlock); + // It is important that we compute and pass onDiskSize to the block // reader so that it does not have to read the header separately to // figure out the size. Currently, we do not have a way to do this @@ -900,6 +903,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return true; } + /** + * The curBlock will be released by shipping or close method, so only need to consider releasing + * the block, which was read from HFile before and not referenced by curBlock. + */ + protected void releaseIfNotCurBlock(HFileBlock block) { + if (curBlock != block) { + reader.returnBlock(block); + } + } + /** * Scans blocks in the "scanned" section of the {@link HFile} until the next * data block is found. @@ -1136,6 +1149,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { HFileBlock newBlock = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); if (newBlock.getOffset() < 0) { + releaseIfNotCurBlock(newBlock); throw new IOException( "Invalid block offset: " + newBlock.getOffset() + ", path=" + reader.getPath()); } @@ -1192,19 +1206,23 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * @param newBlock the block to make current */ protected void updateCurrentBlock(HFileBlock newBlock) throws IOException { - // Set the active block on the reader - // sanity check - if (newBlock.getBlockType() != BlockType.DATA) { - throw new IllegalStateException("ScannerV2 works only on data " + "blocks, got " - + newBlock.getBlockType() + "; " + "HFileName=" + reader.getPath() - + ", " + "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " + "isCompaction=" - + isCompaction); - } + try { + // Set the active block on the reader + // sanity check + if (newBlock.getBlockType() != BlockType.DATA) { + throw new IllegalStateException( + "ScannerV2 works only on data " + "blocks, got " + newBlock.getBlockType() + "; " + + "HFileName=" + reader.getPath() + ", " + "dataBlockEncoder=" + reader + .getDataBlockEncoding() + ", " + "isCompaction=" + isCompaction); + } - updateCurrBlockRef(newBlock); - blockBuffer = newBlock.getBufferWithoutHeader(); - readKeyValueLen(); - blockFetches.incrementAndGet(); + updateCurrBlockRef(newBlock); + blockBuffer = newBlock.getBufferWithoutHeader(); + readKeyValueLen(); + blockFetches.incrementAndGet(); + } finally { + releaseIfNotCurBlock(newBlock); + } // Reset the next indexed key this.nextIndexedKey = null; @@ -1299,69 +1317,78 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * Retrieve block from cache. Validates the retrieved block's type vs {@code expectedBlockType} * and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary. */ - private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock, - boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, - DataBlockEncoding expectedDataBlockEncoding) throws IOException { - // Check cache for block. If found return. - if (cacheConf.isBlockCacheEnabled()) { - BlockCache cache = cacheConf.getBlockCache(); - HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, - updateCacheMetrics); - if (cachedBlock != null) { - if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { - HFileBlock compressedBlock = cachedBlock; - cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader); - // In case of compressed block after unpacking we can return the compressed block + private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock, + boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, + DataBlockEncoding expectedDataBlockEncoding) throws IOException { + // Check cache for block. If found return. + if (cacheConf.isBlockCacheEnabled()) { + BlockCache cache = cacheConf.getBlockCache(); + HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, + updateCacheMetrics); + if (cachedBlock != null) { + if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { + HFileBlock compressedBlock = cachedBlock; + cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader); + // In case of compressed block after unpacking we can return the compressed block if (compressedBlock != cachedBlock) { cache.returnBlock(cacheKey, compressedBlock); } } - validateBlockType(cachedBlock, expectedBlockType); + try { + validateBlockType(cachedBlock, expectedBlockType); + } catch (IOException e) { + returnAndEvictBlock(cache, cacheKey, cachedBlock); + throw e; + } - if (expectedDataBlockEncoding == null) { - return cachedBlock; - } - DataBlockEncoding actualDataBlockEncoding = + if (expectedDataBlockEncoding == null) { + return cachedBlock; + } + DataBlockEncoding actualDataBlockEncoding = cachedBlock.getDataBlockEncoding(); - // Block types other than data blocks always have - // DataBlockEncoding.NONE. To avoid false negative cache misses, only - // perform this check if cached block is a data block. - if (cachedBlock.getBlockType().isData() && - !actualDataBlockEncoding.equals(expectedDataBlockEncoding)) { - // This mismatch may happen if a Scanner, which is used for say a - // compaction, tries to read an encoded block from the block cache. - // The reverse might happen when an EncodedScanner tries to read - // un-encoded blocks which were cached earlier. - // - // Because returning a data block with an implicit BlockType mismatch - // will cause the requesting scanner to throw a disk read should be - // forced here. This will potentially cause a significant number of - // cache misses, so update so we should keep track of this as it might - // justify the work on a CompoundScanner. - if (!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) && - !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)) { - // If the block is encoded but the encoding does not match the - // expected encoding it is likely the encoding was changed but the - // block was not yet evicted. Evictions on file close happen async - // so blocks with the old encoding still linger in cache for some - // period of time. This event should be rare as it only happens on - // schema definition change. - LOG.info("Evicting cached block with key " + cacheKey - + " because of a data block encoding mismatch" + "; expected: " - + expectedDataBlockEncoding + ", actual: " + actualDataBlockEncoding + ", path=" - + path); - // This is an error scenario. so here we need to decrement the - // count. - cache.returnBlock(cacheKey, cachedBlock); - cache.evictBlock(cacheKey); - } - return null; - } - return cachedBlock; - } - } - return null; - } + // Block types other than data blocks always have + // DataBlockEncoding.NONE. To avoid false negative cache misses, only + // perform this check if cached block is a data block. + if (cachedBlock.getBlockType().isData() && + !actualDataBlockEncoding.equals(expectedDataBlockEncoding)) { + // This mismatch may happen if a Scanner, which is used for say a + // compaction, tries to read an encoded block from the block cache. + // The reverse might happen when an EncodedScanner tries to read + // un-encoded blocks which were cached earlier. + // + // Because returning a data block with an implicit BlockType mismatch + // will cause the requesting scanner to throw a disk read should be + // forced here. This will potentially cause a significant number of + // cache misses, so update so we should keep track of this as it might + // justify the work on a CompoundScanner. + if (!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) && + !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)) { + // If the block is encoded but the encoding does not match the + // expected encoding it is likely the encoding was changed but the + // block was not yet evicted. Evictions on file close happen async + // so blocks with the old encoding still linger in cache for some + // period of time. This event should be rare as it only happens on + // schema definition change. + LOG.info("Evicting cached block with key " + cacheKey + + " because of a data block encoding mismatch" + "; expected: " + + expectedDataBlockEncoding + ", actual: " + actualDataBlockEncoding + ", path=" + + path); + // This is an error scenario. so here we need to decrement the + // count. + returnAndEvictBlock(cache, cacheKey, cachedBlock); + } + return null; + } + return cachedBlock; + } + } + return null; + } + + private void returnAndEvictBlock(BlockCache cache, BlockCacheKey cacheKey, Cacheable block) { + cache.returnBlock(cacheKey, block); + cache.evictBlock(cacheKey); + } /** * @param metaBlockName @@ -1472,10 +1499,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // Validate encoding type for data blocks. We include encoding // type in the cache key, and we expect it to match on a cache hit. if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) { - throw new IOException("Cached block under key " + cacheKey + " " - + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: " - + dataBlockEncoder.getDataBlockEncoding() + ")" - + ", path=" + path); + Optional.ofNullable(cacheConf.getBlockCache()).ifPresent(cache -> { + returnAndEvictBlock(cache, cacheKey, cachedBlock); + }); + throw new IOException( + "Cached block under key " + cacheKey + " " + "has wrong encoding: " + + cachedBlock.getDataBlockEncoding() + " (expected: " + dataBlockEncoder + .getDataBlockEncoding() + ")" + ", path=" + path); } } // Cache-hit. Return! @@ -1635,23 +1665,25 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { */ @Override protected void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException { - - // sanity checks - if (newBlock.getBlockType() != BlockType.ENCODED_DATA) { - throw new IllegalStateException("EncodedScanner works only on encoded data blocks"); - } - short dataBlockEncoderId = newBlock.getDataBlockEncodingId(); - if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) { - String encoderCls = dataBlockEncoder.getClass().getName(); - throw new CorruptHFileException("Encoder " + encoderCls - + " doesn't support data block encoding " - + DataBlockEncoding.getNameFromId(dataBlockEncoderId) - + ", path=" + reader.getPath()); + try { + // sanity checks + if (newBlock.getBlockType() != BlockType.ENCODED_DATA) { + throw new IllegalStateException("EncodedScanner works only on encoded data blocks"); + } + short dataBlockEncoderId = newBlock.getDataBlockEncodingId(); + if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) { + String encoderCls = dataBlockEncoder.getClass().getName(); + throw new CorruptHFileException( + "Encoder " + encoderCls + " doesn't support data block encoding " + DataBlockEncoding + .getNameFromId(dataBlockEncoderId) + ", path=" + reader.getPath()); + } + updateCurrBlockRef(newBlock); + ByteBuff encodedBuffer = getEncodedBuffer(newBlock); + seeker.setCurrentBuffer(encodedBuffer); + blockFetches.incrementAndGet(); + } finally { + releaseIfNotCurBlock(newBlock); } - updateCurrBlockRef(newBlock); - ByteBuff encodedBuffer = getEncodedBuffer(newBlock); - seeker.setCurrentBuffer(encodedBuffer); - blockFetches.incrementAndGet(); // Reset the next indexed key this.nextIndexedKey = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java new file mode 100644 index 00000000000..867b58011dd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.junit.Assert.assertEquals; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparatorImpl; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + + +/** + * Test + */ +@Category({ IOTests.class, SmallTests.class}) +public class TestHFileReaderImpl { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHFileReaderImpl.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + static KeyValue toKV(String row) { + return new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), + Bytes.toBytes("value")); + } + + static String toRowStr(Cell c) { + return Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength()); + } + + Path makeNewFile() throws IOException { + Path ncTFile = new Path(TEST_UTIL.getDataTestDir(), "basic.hfile"); + FSDataOutputStream fout = TEST_UTIL.getTestFileSystem().create(ncTFile); + int blocksize = toKV("a").getLength() * 3; + HFileContext context = + new HFileContextBuilder().withBlockSize(blocksize).withIncludesTags(true).build(); + Configuration conf = TEST_UTIL.getConfiguration(); + HFile.Writer writer = + HFile.getWriterFactoryNoCache(conf).withOutputStream(fout).withFileContext(context) + .withComparator(CellComparatorImpl.COMPARATOR).create(); + // 4 bytes * 3 * 2 for each key/value + + // 3 for keys, 15 for values = 42 (woot) + writer.append(toKV("c")); + writer.append(toKV("e")); + writer.append(toKV("g")); + // block transition + writer.append(toKV("i")); + writer.append(toKV("k")); + writer.close(); + fout.close(); + return ncTFile; + } + + @Test + public void testSeekBefore() throws Exception { + Path p = makeNewFile(); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration conf = TEST_UTIL.getConfiguration(); + int[] bucketSizes = { 512, 2048, 4096, 64 * 1024, 128 * 1024 }; + BucketCache bucketcache = + new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null); + + HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(bucketcache, + CacheConfig.DEFAULT_CACHE_DATA_ON_READ, + CacheConfig.DEFAULT_IN_MEMORY, + CacheConfig.DEFAULT_CACHE_DATA_ON_WRITE, + CacheConfig.DEFAULT_CACHE_INDEXES_ON_WRITE, + CacheConfig.DEFAULT_CACHE_BLOOMS_ON_WRITE, + CacheConfig.DEFAULT_EVICT_ON_CLOSE, + CacheConfig.DEFAULT_CACHE_DATA_COMPRESSED, + CacheConfig.DEFAULT_PREFETCH_ON_OPEN, + true), true, conf); + reader.loadFileInfo(); + + // warm cache + HFileScanner scanner = reader.getScanner(true, true); + scanner.seekTo(toKV("i")); + assertEquals("i", toRowStr(scanner.getCell())); + scanner.close(); + + while (bucketcache.getBlockCount() <= 0) { + Thread.sleep(10); + } + + // reopen again. + scanner = reader.getScanner(true, true); + scanner.seekTo(toKV("i")); + assertEquals("i", toRowStr(scanner.getCell())); + scanner.seekBefore(toKV("i")); + assertEquals("g", toRowStr(scanner.getCell())); + scanner.close(); + + for (CachedBlock cachedBlock : Lists.newArrayList(bucketcache)) { + BlockCacheKey cacheKey = + new BlockCacheKey(cachedBlock.getFilename(), cachedBlock.getOffset()); + int refCount = bucketcache.getRefCount(cacheKey); + assertEquals(0, refCount); + } + + // case 2 + scanner = reader.getScanner(true, true); + scanner.seekTo(toKV("i")); + assertEquals("i", toRowStr(scanner.getCell())); + scanner.seekBefore(toKV("c")); + scanner.close(); + for (CachedBlock cachedBlock : Lists.newArrayList(bucketcache)) { + BlockCacheKey cacheKey = + new BlockCacheKey(cachedBlock.getFilename(), cachedBlock.getOffset()); + int refCount = bucketcache.getRefCount(cacheKey); + assertEquals(0, refCount); + } + + reader.close(); + + // clear bucketcache + for (CachedBlock cachedBlock : Lists.newArrayList(bucketcache)) { + BlockCacheKey cacheKey = + new BlockCacheKey(cachedBlock.getFilename(), cachedBlock.getOffset()); + bucketcache.evictBlock(cacheKey); + } + bucketcache.shutdown(); + + deleteTestDir(fs); + } + + protected void deleteTestDir(FileSystem fs) throws IOException { + Path dataTestDir = TEST_UTIL.getDataTestDir(); + if(fs.exists(dataTestDir)) { + fs.delete(dataTestDir, true); + } + } + +} \ No newline at end of file