From 9d6c48a8a8576806da1fbddde18b9300b4629b9b Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Tue, 9 Sep 2014 11:08:28 -0700 Subject: [PATCH] HBASE-11331 [blockcache] lazy block decompression When hbase.block.data.cachecompressed=true, DATA (and ENCODED_DATA) blocks are cached in the BlockCache in their on-disk format. This is different from the default behavior, which decompresses and decrypts a block before caching. Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java --- .../apache/hadoop/hbase/io/hfile/HFileContext.java | 17 + .../hbase/tmpl/regionserver/BlockCacheTmpl.jamon | 6 +- .../apache/hadoop/hbase/io/hfile/CacheConfig.java | 81 ++-- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 441 ++++++++++++--------- .../hadoop/hbase/io/hfile/HFileBlockIndex.java | 30 +- .../hadoop/hbase/io/hfile/HFileReaderV2.java | 18 +- .../hadoop/hbase/io/hfile/HFileWriterV2.java | 6 +- .../hadoop/hbase/io/hfile/LruBlockCache.java | 102 ++++- .../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 62 ++- .../apache/hadoop/hbase/io/hfile/TestChecksum.java | 10 +- .../io/hfile/TestForceCacheImportantBlocks.java | 16 +- .../apache/hadoop/hbase/io/hfile/TestHFile.java | 3 +- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 95 +++-- .../io/hfile/TestHFileBlockCompatibility.java | 16 +- .../hadoop/hbase/io/hfile/TestHFileEncryption.java | 10 +- .../hadoop/hbase/io/hfile/TestHFileWriterV2.java | 12 +- .../hadoop/hbase/io/hfile/TestHFileWriterV3.java | 13 +- .../io/hfile/TestLazyDataBlockDecompression.java | 231 +++++++++++ 18 files changed, 835 insertions(+), 334 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java index 3299e41..8cfc8a7 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java @@ -96,6 +96,23 @@ public class HFileContext implements HeapSize, Cloneable { this.cryptoContext = cryptoContext; } + /** + * @return true when on-disk blocks from this file are compressed, and/or encrypted; + * false otherwise. + */ + public boolean isCompressedOrEncrypted() { + Compression.Algorithm compressAlgo = getCompression(); + boolean compressed = + compressAlgo != null + && compressAlgo != Compression.Algorithm.NONE; + + Encryption.Context cryptoContext = getEncryptionContext(); + boolean encrypted = cryptoContext != null + && cryptoContext != Encryption.Context.NONE; + + return compressed || encrypted; + } + public Compression.Algorithm getCompression() { return compressAlgo; } diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon index d0bf9ea..be0b5e0 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon @@ -157,9 +157,9 @@ org.apache.hadoop.util.StringUtils; reader is closed - Compress blocks - <% cacheConfig.shouldCacheCompressed() %> - True if blocks are compressed in cache + Cache DATA in compressed format + <% cacheConfig.shouldCacheDataCompressed() %> + True if DATA blocks are cached in their compressed form Prefetch on Open diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 9153c8a..fea5243 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -29,7 +30,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; -import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.DirectMemoryUtils; import org.apache.hadoop.util.StringUtils; @@ -61,11 +61,10 @@ public class CacheConfig { "hfile.block.bloom.cacheonwrite"; /** - * TODO: Implement this (jgray) - * Configuration key to cache data blocks in compressed format. + * Configuration key to cache data blocks in compressed and/or encrypted format. */ public static final String CACHE_DATA_BLOCKS_COMPRESSED_KEY = - "hbase.rs.blockcache.cachedatacompressed"; + "hbase.block.data.cachecompressed"; /** * Configuration key to evict all blocks of a given file from the block cache @@ -109,6 +108,14 @@ public class CacheConfig { public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = "hbase.rs.prefetchblocksonopen"; + /** + * The target block size used by blockcache instances. Defaults to + * {@link HConstants#DEFAULT_BLOCKSIZE}. + * TODO: this config point is completely wrong, as it's used to determine the + * target block size of BlockCache instances. Rename. + */ + public static final String BLOCKCACHE_BLOCKSIZE_KEY = "hbase.offheapcache.minblocksize"; + // Defaults public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; @@ -117,7 +124,7 @@ public class CacheConfig { public static final boolean DEFAULT_CACHE_INDEXES_ON_WRITE = false; public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false; public static final boolean DEFAULT_EVICT_ON_CLOSE = false; - public static final boolean DEFAULT_COMPRESSED_CACHE = false; + public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false; public static final boolean DEFAULT_PREFETCH_ON_OPEN = false; /** Local reference to the block cache, null if completely disabled */ @@ -144,8 +151,8 @@ public class CacheConfig { /** Whether blocks of a file should be evicted when the file is closed */ private boolean evictOnClose; - /** Whether data blocks should be stored in compressed form in the cache */ - private final boolean cacheCompressed; + /** Whether data blocks should be stored in compressed and/or encrypted form in the cache */ + private final boolean cacheDataCompressed; /** Whether data blocks should be prefetched into the cache */ private final boolean prefetchOnOpen; @@ -170,7 +177,7 @@ public class CacheConfig { DEFAULT_CACHE_BLOOMS_ON_WRITE) || family.shouldCacheBloomsOnWrite(), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(), - conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE), + conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED), conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen() ); @@ -187,15 +194,12 @@ public class CacheConfig { DEFAULT_IN_MEMORY, // This is a family-level setting so can't be set // strictly from conf conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), - conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, - DEFAULT_CACHE_INDEXES_ON_WRITE), - conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, - DEFAULT_CACHE_BLOOMS_ON_WRITE), + conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_INDEXES_ON_WRITE), + conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_BLOOMS_ON_WRITE), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), - conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, - DEFAULT_COMPRESSED_CACHE), + conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED), conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) - ); + ); } /** @@ -208,14 +212,14 @@ public class CacheConfig { * @param cacheIndexesOnWrite whether index blocks should be cached on write * @param cacheBloomsOnWrite whether blooms should be cached on write * @param evictOnClose whether blocks should be evicted when HFile is closed - * @param cacheCompressed whether to store blocks as compressed in the cache + * @param cacheDataCompressed whether to store blocks as compressed in the cache * @param prefetchOnOpen whether to prefetch blocks upon open */ CacheConfig(final BlockCache blockCache, final boolean cacheDataOnRead, final boolean inMemory, final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheBloomsOnWrite, final boolean evictOnClose, - final boolean cacheCompressed, final boolean prefetchOnOpen) { + final boolean cacheDataCompressed, final boolean prefetchOnOpen) { this.blockCache = blockCache; this.cacheDataOnRead = cacheDataOnRead; this.inMemory = inMemory; @@ -223,7 +227,7 @@ public class CacheConfig { this.cacheIndexesOnWrite = cacheIndexesOnWrite; this.cacheBloomsOnWrite = cacheBloomsOnWrite; this.evictOnClose = evictOnClose; - this.cacheCompressed = cacheCompressed; + this.cacheDataCompressed = cacheDataCompressed; this.prefetchOnOpen = prefetchOnOpen; } @@ -235,7 +239,7 @@ public class CacheConfig { this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory, cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose, - cacheConf.cacheCompressed, cacheConf.prefetchOnOpen); + cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen); } /** @@ -267,14 +271,13 @@ public class CacheConfig { * available. */ public boolean shouldCacheBlockOnRead(BlockCategory category) { - boolean shouldCache = isBlockCacheEnabled() + return isBlockCacheEnabled() && (cacheDataOnRead || category == BlockCategory.INDEX || category == BlockCategory.BLOOM || (prefetchOnOpen && (category != BlockCategory.META && category != BlockCategory.UNKNOWN))); - return shouldCache; } /** @@ -335,10 +338,23 @@ public class CacheConfig { } /** - * @return true if blocks should be compressed in the cache, false if not + * @return true if data blocks should be compressed in the cache, false if not */ - public boolean shouldCacheCompressed() { - return isBlockCacheEnabled() && this.cacheCompressed; + public boolean shouldCacheDataCompressed() { + return isBlockCacheEnabled() && this.cacheDataCompressed; + } + + /** + * @return true if this {@link BlockCategory} should be compressed in blockcache, false otherwise + */ + public boolean shouldCacheCompressed(BlockCategory category) { + if (!isBlockCacheEnabled()) return false; + switch (category) { + case DATA: + return this.cacheDataCompressed; + default: + return false; + } } /** @@ -359,7 +375,7 @@ public class CacheConfig { "[cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() + "] " + "[cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() + "] " + "[cacheEvictOnClose=" + shouldEvictOnClose() + "] " + - "[cacheCompressed=" + shouldCacheCompressed() + "]" + + "[cacheDataCompressed=" + shouldCacheDataCompressed() + "] " + "[prefetchOnOpen=" + shouldPrefetchOnOpen() + "]"; } @@ -369,7 +385,8 @@ public class CacheConfig { * Static reference to the block cache, or null if no caching should be used * at all. */ - private static BlockCache globalBlockCache; + @VisibleForTesting + static BlockCache GLOBAL_BLOCK_CACHE_INSTANCE; /** Boolean whether we have disabled the block cache entirely. */ private static boolean blockCacheDisabled = false; @@ -381,7 +398,7 @@ public class CacheConfig { * @return The block cache or null. */ private static synchronized BlockCache instantiateBlockCache(Configuration conf) { - if (globalBlockCache != null) return globalBlockCache; + if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return GLOBAL_BLOCK_CACHE_INSTANCE; if (blockCacheDisabled) return null; float cachePercentage = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, @@ -398,7 +415,7 @@ public class CacheConfig { // Calculate the amount of heap to give the heap. MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); long lruCacheSize = (long) (mu.getMax() * cachePercentage); - int blockSize = conf.getInt("hbase.offheapcache.minblocksize", HConstants.DEFAULT_BLOCKSIZE); + int blockSize = conf.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); long offHeapCacheSize = (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) * DirectMemoryUtils.getDirectMemorySize()); @@ -450,15 +467,15 @@ public class CacheConfig { LruBlockCache lruCache = new LruBlockCache(lruCacheSize, blockSize, true, conf); lruCache.setVictimCache(bucketCache); if (bucketCache != null && combinedWithLru) { - globalBlockCache = new CombinedBlockCache(lruCache, bucketCache); + GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(lruCache, bucketCache); } else { - globalBlockCache = lruCache; + GLOBAL_BLOCK_CACHE_INSTANCE = lruCache; } } else { LOG.warn("SlabCache is deprecated. Consider BucketCache as a replacement."); - globalBlockCache = new DoubleBlockCache( + GLOBAL_BLOCK_CACHE_INSTANCE = new DoubleBlockCache( lruCacheSize, offHeapCacheSize, blockSize, blockSize, conf); } - return globalBlockCache; + return GLOBAL_BLOCK_CACHE_INSTANCE; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index c43b036..cfe92f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -35,9 +35,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; -import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; @@ -63,25 +60,26 @@ import com.google.common.base.Preconditions; * information from the block index are required to read a block. *
  • In version 2 a block is structured as follows: * * - * The version 2 block representation in the block cache is the same as above, - * except that the data section is always uncompressed in the cache. */ @InterfaceAudience.Private public class HFileBlock implements Cacheable { @@ -110,7 +108,7 @@ public class HFileBlock implements Cacheable { ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false); // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader - public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT + public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG; /** @@ -135,6 +133,9 @@ public class HFileBlock implements Cacheable { HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum); ourBuffer.offset = buf.getLong(); ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt(); + if (ourBuffer.hasNextBlockHeader()) { + ourBuffer.buf.limit(ourBuffer.buf.limit() - ourBuffer.headerSize()); + } return ourBuffer; } @@ -154,23 +155,28 @@ public class HFileBlock implements Cacheable { .registerDeserializer(blockDeserializer); } + /** Type of block. Header field 0. */ private BlockType blockType; - /** Size on disk without the header. It includes checksum data too. */ + /** Size on disk excluding header, including checksum. Header field 1. */ private int onDiskSizeWithoutHeader; - /** Size of pure data. Does not include header or checksums */ + /** Size of pure data. Does not include header or checksums. Header field 2. */ private final int uncompressedSizeWithoutHeader; - /** The offset of the previous block on disk */ + /** The offset of the previous block on disk. Header field 3. */ private final long prevBlockOffset; - /** Size on disk of header and data. Does not include checksum data */ + /** + * Size on disk of header + data. Excludes checksum. Header field 6, + * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum. + */ private final int onDiskDataSizeWithHeader; /** The in-memory representation of the hfile block */ private ByteBuffer buf; - /** Meta data that holds meta information on the hfileblock**/ + + /** Meta data that holds meta information on the hfileblock */ private HFileContext fileContext; /** @@ -192,27 +198,18 @@ public class HFileBlock implements Cacheable { * and is sitting in a byte buffer. * * @param blockType the type of this block, see {@link BlockType} - * @param onDiskSizeWithoutHeader compressed size of the block if compression - * is used, otherwise uncompressed size, header size not included - * @param uncompressedSizeWithoutHeader uncompressed size of the block, - * header size not included. Equals onDiskSizeWithoutHeader if - * compression is disabled. - * @param prevBlockOffset the offset of the previous block in the - * {@link HFile} + * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} + * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} + * @param prevBlockOffset see {@link #prevBlockOffset} * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by * uncompressed data. This - * @param fillHeader true to fill in the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of - * the buffer based on the header fields provided + * @param fillHeader when true, parse {@code buf} and override the first 4 header fields. * @param offset the file offset the block was read from - * @param bytesPerChecksum the number of bytes per checksum chunk - * @param checksumType the checksum algorithm to use - * @param onDiskDataSizeWithHeader size of header and data on disk not - * including checksum data + * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} * @param fileContext HFile meta data */ - HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, - int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf, - boolean fillHeader, long offset, + HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, + long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset, int onDiskDataSizeWithHeader, HFileContext fileContext) { this.blockType = blockType; this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; @@ -224,6 +221,22 @@ public class HFileBlock implements Cacheable { this.offset = offset; this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; this.fileContext = fileContext; + this.buf.rewind(); + } + + /** + * Copy constructor. Creates a shallow copy of {@code that}'s buffer. + */ + HFileBlock(HFileBlock that) { + this.blockType = that.blockType; + this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader; + this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader; + this.prevBlockOffset = that.prevBlockOffset; + this.buf = that.buf.duplicate(); + this.offset = that.offset; + this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader; + this.fileContext = that.fileContext; + this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader; } /** @@ -271,28 +284,21 @@ public class HFileBlock implements Cacheable { } /** - * @return the on-disk size of the block with header size included. This - * includes the header, the data and the checksum data. + * @return the on-disk size of header + data part + checksum. */ public int getOnDiskSizeWithHeader() { return onDiskSizeWithoutHeader + headerSize(); } /** - * Returns the size of the compressed part of the block in case compression - * is used, or the uncompressed size of the data part otherwise. Header size - * and checksum data size is not included. - * - * @return the on-disk size of the data part of the block, header and - * checksum not included. + * @return the on-disk size of the data part + checksum (header excluded). */ public int getOnDiskSizeWithoutHeader() { return onDiskSizeWithoutHeader; } /** - * @return the uncompressed size of the data part of the block, header not - * included + * @return the uncompressed size of data part (header and checksum excluded). */ public int getUncompressedSizeWithoutHeader() { return uncompressedSizeWithoutHeader; @@ -307,8 +313,8 @@ public class HFileBlock implements Cacheable { } /** - * Writes header fields into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the - * buffer. Resets the buffer position to the end of header as side effect. + * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position + * is modified as side-effect. */ private void overwriteHeader() { buf.rewind(); @@ -319,11 +325,9 @@ public class HFileBlock implements Cacheable { } /** - * Returns a buffer that does not include the header. The array offset points - * to the start of the block data right after the header. The underlying data - * array is not copied. Checksum data is not included in the returned buffer. + * Returns a buffer that does not include the header or checksum. * - * @return the buffer with header skipped + * @return the buffer with header skipped and checksum omitted. */ public ByteBuffer getBufferWithoutHeader() { return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(), @@ -335,7 +339,7 @@ public class HFileBlock implements Cacheable { * modify the buffer object. This method has to be public because it is * used in {@link CompoundBloomFilter} to avoid object creation on every * Bloom filter lookup, but has to be used with caution. Checksum data - * is not included in the returned buffer. + * is not included in the returned buffer but header data is. * * @return the buffer of this block for read-only operations */ @@ -349,17 +353,17 @@ public class HFileBlock implements Cacheable { * not modify the buffer object. This method has to be public because it is * used in {@link BucketCache} to avoid buffer copy. * - * @return the byte buffer with header included for read-only operations + * @return the buffer with header and checksum included for read-only operations */ public ByteBuffer getBufferReadOnlyWithHeader() { return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice(); } /** - * Returns a byte buffer of this block, including header data, positioned at + * Returns a byte buffer of this block, including header data and checksum, positioned at * the beginning of header. The underlying data array is not copied. * - * @return the byte buffer with header included + * @return the byte buffer with header and checksum included */ ByteBuffer getBufferWithHeader() { ByteBuffer dupBuf = buf.duplicate(); @@ -375,22 +379,25 @@ public class HFileBlock implements Cacheable { } } + private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) + throws IOException { + if (valueFromBuf != valueFromField) { + throw new IOException("Block type stored in the buffer: " + + valueFromBuf + ", block type field: " + valueFromField); + } + } + /** * Checks if the block is internally consistent, i.e. the first - * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent - * with the fields. This function is primary for testing and debugging, and - * is not thread-safe, because it alters the internal buffer pointer. + * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a + * valid header consistent with the fields. Assumes a packed block structure. + * This function is primary for testing and debugging, and is not + * thread-safe, because it alters the internal buffer pointer. */ void sanityCheck() throws IOException { buf.rewind(); - { - BlockType blockTypeFromBuf = BlockType.read(buf); - if (blockTypeFromBuf != blockType) { - throw new IOException("Block type stored in the buffer: " + - blockTypeFromBuf + ", block type field: " + blockType); - } - } + sanityCheckAssertion(BlockType.read(buf), blockType); sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); @@ -402,45 +409,65 @@ public class HFileBlock implements Cacheable { if (this.fileContext.isUseHBaseChecksum()) { sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum"); - sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, - "onDiskDataSizeWithHeader"); + sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); } int cksumBytes = totalChecksumBytes(); - int hdrSize = headerSize(); - int expectedBufLimit = uncompressedSizeWithoutHeader + headerSize() + - cksumBytes; + int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; if (buf.limit() != expectedBufLimit) { throw new AssertionError("Expected buffer limit " + expectedBufLimit + ", got " + buf.limit()); } // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next - // block's, header, so there are two sensible values for buffer capacity. - int size = uncompressedSizeWithoutHeader + hdrSize + cksumBytes; - if (buf.capacity() != size && - buf.capacity() != size + hdrSize) { + // block's header, so there are two sensible values for buffer capacity. + int hdrSize = headerSize(); + if (buf.capacity() != expectedBufLimit && + buf.capacity() != expectedBufLimit + hdrSize) { throw new AssertionError("Invalid buffer capacity: " + buf.capacity() + - ", expected " + size + " or " + (size + hdrSize)); + ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); } } @Override public String toString() { - return "blockType=" - + blockType - + ", onDiskSizeWithoutHeader=" - + onDiskSizeWithoutHeader - + ", uncompressedSizeWithoutHeader=" - + uncompressedSizeWithoutHeader - + ", prevBlockOffset=" - + prevBlockOffset - + ", dataBeginsWith=" - + Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), - Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())) - + ", fileOffset=" + offset; + StringBuilder sb = new StringBuilder() + .append("HFileBlock [") + .append(" fileOffset=").append(offset) + .append(" headerSize()=").append(headerSize()) + .append(" blockType=").append(blockType) + .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) + .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) + .append(" prevBlockOffset=").append(prevBlockOffset) + .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum()); + if (fileContext.isUseHBaseChecksum()) { + sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24))) + .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1)) + .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); + } else { + sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader) + .append("(").append(onDiskSizeWithoutHeader) + .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); + } + sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader()) + .append(" totalChecksumBytes()=").append(totalChecksumBytes()) + .append(" isUnpacked()=").append(isUnpacked()) + .append(" buf=[ ") + .append(buf) + .append(", array().length=").append(buf.array().length) + .append(", arrayOffset()=").append(buf.arrayOffset()) + .append(" ]") + .append(" dataBeginsWith=") + .append(Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), + Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()))) + .append(" fileContext=").append(fileContext) + .append(" ]"); + return sb.toString(); } + /** + * Called after reading a block with provided onDiskSizeWithHeader. + */ private void validateOnDiskSizeWithoutHeader( int expectedOnDiskSizeWithoutHeader) throws IOException { if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) { @@ -456,32 +483,80 @@ public class HFileBlock implements Cacheable { } /** + * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its + * encoded structure. Internal structures are shared between instances where applicable. + */ + HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { + if (!fileContext.isCompressedOrEncrypted()) { + // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), + // which is used for block serialization to L2 cache, does not preserve encoding and + // encryption details. + return this; + } + + HFileBlock unpacked = new HFileBlock(this); + unpacked.allocateBuffer(); // allocates space for the decompressed block + + HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? + reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); + ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), + unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), + this.getBufferReadOnlyWithHeader().array(), this.headerSize()); + + // Preserve the next block's header bytes in the new block if we have them. + if (unpacked.hasNextBlockHeader()) { + System.arraycopy(this.buf.array(), this.buf.arrayOffset() + this.onDiskDataSizeWithHeader, + unpacked.buf.array(), unpacked.buf.arrayOffset() + unpacked.headerSize() + + unpacked.uncompressedSizeWithoutHeader + unpacked.totalChecksumBytes(), + unpacked.headerSize()); + } + return unpacked; + } + + /** + * Return true when this buffer includes next block's header. + */ + private boolean hasNextBlockHeader() { + return nextBlockOnDiskSizeWithHeader > 0; + } + + /** * Always allocates a new buffer of the correct size. Copies header bytes * from the existing buffer. Does not change header fields. * Reserve room to keep checksum bytes too. - * - * @param extraBytes whether to reserve room in the buffer to read the next - * block's header */ - private void allocateBuffer(boolean extraBytes) { + private void allocateBuffer() { int cksumBytes = totalChecksumBytes(); - int capacityNeeded = headerSize() + uncompressedSizeWithoutHeader + - cksumBytes + - (extraBytes ? headerSize() : 0); + int headerSize = headerSize(); + int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + + cksumBytes + (hasNextBlockHeader() ? headerSize : 0); ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded); // Copy header bytes. System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(), - newBuf.arrayOffset(), headerSize()); + newBuf.arrayOffset(), headerSize); buf = newBuf; - buf.limit(headerSize() + uncompressedSizeWithoutHeader + cksumBytes); + // set limit to exclude next block's header + buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes); + } + + /** + * Return true when this block's buffer has been unpacked, false otherwise. Note this is a + * calculated heuristic, not tracked attribute of the block. + */ + public boolean isUnpacked() { + final int cksumBytes = totalChecksumBytes(); + final int headerSize = headerSize(); + final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes; + final int bufCapacity = buf.capacity(); + return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; } - /** An additional sanity-check in case no compression is being used. */ + /** An additional sanity-check in case no compression or encryption is being used. */ public void assumeUncompressed() throws IOException { - if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + + if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { throw new IOException("Using no compression but " + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " @@ -511,7 +586,7 @@ public class HFileBlock implements Cacheable { } /** - * @return a byte stream reading the data section of this block + * @return a byte stream reading the data + checksum of this block */ public DataInputStream getByteStream() { return new DataInputStream(new ByteArrayInputStream(buf.array(), @@ -587,7 +662,6 @@ public class HFileBlock implements Cacheable { return nextBlockOnDiskSizeWithHeader; } - /** * Unified version 2 {@link HFile} block writer. The intended usage pattern * is as follows: @@ -630,7 +704,7 @@ public class HFileBlock implements Cacheable { /** * Current block type. Set in {@link #startWriting(BlockType)}. Could be - * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} + * changed in {@link #finishBlock()} from {@link BlockType#DATA} * to {@link BlockType#ENCODED_DATA}. */ private BlockType blockType; @@ -643,7 +717,7 @@ public class HFileBlock implements Cacheable { /** * Bytes to be written to the file system, including the header. Compressed - * if compression is turned on. It also includes the checksum data that + * if compression is turned on. It also includes the checksum data that * immediately follows the block data. (header + data + checksums) */ private byte[] onDiskBytesWithHeader; @@ -990,6 +1064,19 @@ public class HFileBlock implements Cacheable { return ByteBuffer.wrap(uncompressedBytesWithHeader); } + /** + * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is + * needed for storing packed blocks in the block cache. Expects calling semantics identical to + * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data, + * Does not include checksum data. + * + * @return packed block bytes for caching on write + */ + ByteBuffer getOnDiskBufferWithHeader() { + expectState(State.BLOCK_READY); + return ByteBuffer.wrap(onDiskBytesWithHeader); + } + private void expectState(State expectedState) { if (state != expectedState) { throw new IllegalStateException("Expected state: " + expectedState + @@ -1020,7 +1107,7 @@ public class HFileBlock implements Cacheable { * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a * 0 value in bytesPerChecksum. */ - public HFileBlock getBlockForCaching() { + public HFileBlock getBlockForCaching(CacheConfig cacheConf) { HFileContext newContext = new HFileContextBuilder() .withBlockSize(fileContext.getBlocksize()) .withBytesPerCheckSum(0) @@ -1033,7 +1120,10 @@ public class HFileBlock implements Cacheable { .withIncludesTags(fileContext.isIncludesTags()) .build(); return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), - getUncompressedSizeWithoutHeader(), prevOffset, getUncompressedBufferWithHeader(), + getUncompressedSizeWithoutHeader(), prevOffset, + cacheConf.shouldCacheCompressed(blockType.getCategory()) ? + getOnDiskBufferWithHeader() : + getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset, onDiskBytesWithHeader.length + onDiskChecksum.length, newContext); } @@ -1091,7 +1181,7 @@ public class HFileBlock implements Cacheable { /** * Creates a block iterator over the given portion of the {@link HFile}. * The iterator returns blocks starting with offset such that offset <= - * startOffset < endOffset. + * startOffset < endOffset. Returned blocks are always unpacked. * * @param startOffset the offset of the block to start iteration with * @param endOffset the offset to end iteration at (exclusive) @@ -1101,6 +1191,12 @@ public class HFileBlock implements Cacheable { /** Closes the backing streams */ void closeStreams() throws IOException; + + /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */ + HFileBlockDecodingContext getBlockDecodingContext(); + + /** Get the default decoder for blocks from this file. */ + HFileBlockDecodingContext getDefaultBlockDecodingContext(); } /** @@ -1141,6 +1237,7 @@ public class HFileBlock implements Cacheable { @Override public BlockIterator blockRange(final long startOffset, final long endOffset) { + final FSReader owner = this; // handle for inner class return new BlockIterator() { private long offset = startOffset; @@ -1150,7 +1247,7 @@ public class HFileBlock implements Cacheable { return null; HFileBlock b = readBlockData(offset, -1, -1, false); offset += b.getOnDiskSizeWithHeader(); - return b; + return b.unpack(fileContext, owner); } @Override @@ -1256,7 +1353,8 @@ public class HFileBlock implements Cacheable { private HFileBlockDecodingContext encodedBlockDecodingCtx; - private HFileBlockDefaultDecodingContext defaultDecodingCtx; + /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ + private final HFileBlockDefaultDecodingContext defaultDecodingCtx; private ThreadLocal prefetchedHeaderForThread = new ThreadLocal() { @@ -1272,10 +1370,8 @@ public class HFileBlock implements Cacheable { this.streamWrapper = stream; // Older versions of HBase didn't support checksum. this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); - defaultDecodingCtx = - new HFileBlockDefaultDecodingContext(fileContext); - encodedBlockDecodingCtx = - new HFileBlockDefaultDecodingContext(fileContext); + defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext); + encodedBlockDecodingCtx = defaultDecodingCtx; } /** @@ -1416,9 +1512,8 @@ public class HFileBlock implements Cacheable { HFileBlock b = null; if (onDiskSizeWithHeader > 0) { - // We know the total on-disk size but not the uncompressed size. Read - // the entire block into memory, then parse the header and decompress - // from memory if using compression. This code path is used when + // We know the total on-disk size. Read the entire block into memory, + // then parse the header. This code path is used when // doing a random read operation relying on the block index, as well as // when the client knows the on-disk size from peeking into the next // block's header (e.g. this block's header) when reading the previous @@ -1426,7 +1521,8 @@ public class HFileBlock implements Cacheable { // Size that we have to skip in case we have already read the header. int preReadHeaderSize = headerBuf == null ? 0 : hdrSize; - onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; + onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the + // next block's header nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); @@ -1439,11 +1535,10 @@ public class HFileBlock implements Cacheable { headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize); } // We know the total on-disk size but not the uncompressed size. Read - // the entire block into memory, then parse the header and decompress - // from memory if using compression. Here we have already read the - // block's header + // the entire block into memory, then parse the header. Here we have + // already read the block's header try { - b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum()); + b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum()); } catch (IOException ex) { // Seen in load testing. Provide comprehensive debug info. throw new IOException("Failed to read compressed block at " @@ -1481,66 +1576,34 @@ public class HFileBlock implements Cacheable { readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false, offset, pread); } - b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum()); + b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum()); onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize]; - System.arraycopy(headerBuf.array(), - headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); + System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader() - hdrSize, true, offset + hdrSize, pread); onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize; } - Algorithm compressAlgo = fileContext.getCompression(); - boolean isCompressed = - compressAlgo != null - && compressAlgo != Compression.Algorithm.NONE; - - Encryption.Context cryptoContext = fileContext.getEncryptionContext(); - boolean isEncrypted = cryptoContext != null - && cryptoContext != Encryption.Context.NONE; - - if (!isCompressed && !isEncrypted) { + if (!fileContext.isCompressedOrEncrypted()) { b.assumeUncompressed(); } - if (verifyChecksum && - !validateBlockChecksum(b, onDiskBlock, hdrSize)) { + if (verifyChecksum && !validateBlockChecksum(b, onDiskBlock, hdrSize)) { return null; // checksum mismatch } - if (isCompressed || isEncrypted) { - // This will allocate a new buffer but keep header bytes. - b.allocateBuffer(nextBlockOnDiskSize > 0); - if (b.blockType == BlockType.ENCODED_DATA) { - encodedBlockDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(), - b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock, - hdrSize); - } else { - defaultDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(), - b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock, - hdrSize); - } - if (nextBlockOnDiskSize > 0) { - // Copy next block's header bytes into the new block if we have them. - System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(), - b.buf.arrayOffset() + hdrSize - + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), - hdrSize); - } - } else { - // The onDiskBlock will become the headerAndDataBuffer for this block. - // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already - // contains the header of next block, so no need to set next - // block's header in it. - b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, - onDiskSizeWithHeader), this.fileContext.isUseHBaseChecksum()); - } + // The onDiskBlock will become the headerAndDataBuffer for this block. + // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already + // contains the header of next block, so no need to set next + // block's header in it. + b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader), + this.fileContext.isUseHBaseChecksum()); b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize; // Set prefetched header - if (b.nextBlockOnDiskSizeWithHeader > 0) { + if (b.hasNextBlockHeader()) { prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader(); System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize); @@ -1560,37 +1623,53 @@ public class HFileBlock implements Cacheable { encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext); } + @Override + public HFileBlockDecodingContext getBlockDecodingContext() { + return this.encodedBlockDecodingCtx; + } + + @Override + public HFileBlockDecodingContext getDefaultBlockDecodingContext() { + return this.defaultDecodingCtx; + } + /** * Generates the checksum for the header as well as the data and * then validates that it matches the value stored in the header. * If there is a checksum mismatch, then return false. Otherwise * return true. */ - protected boolean validateBlockChecksum(HFileBlock block, - byte[] data, int hdrSize) throws IOException { - return ChecksumUtil.validateBlockChecksum(path, block, - data, hdrSize); + protected boolean validateBlockChecksum(HFileBlock block, byte[] data, int hdrSize) + throws IOException { + return ChecksumUtil.validateBlockChecksum(path, block, data, hdrSize); } @Override public void closeStreams() throws IOException { streamWrapper.close(); } + + @Override + public String toString() { + return "FSReaderV2 [ hfs=" + hfs + " path=" + path + " fileContext=" + fileContext + " ]"; + } } @Override public int getSerializedLength() { if (buf != null) { - return this.buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE; + // include extra bytes for the next header when it's available. + int extraSpace = hasNextBlockHeader() ? headerSize() : 0; + return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE; } return 0; } @Override public void serialize(ByteBuffer destination) { - ByteBuffer dupBuf = this.buf.duplicate(); - dupBuf.rewind(); - destination.put(dupBuf); + // assumes HeapByteBuffer + destination.put(this.buf.array(), this.buf.arrayOffset(), + getSerializedLength() - EXTRA_SERIALIZATION_SPACE); serializeExtraInfo(destination); } @@ -1638,13 +1717,9 @@ public class HFileBlock implements Cacheable { if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { return false; } - if (this.buf.compareTo(castedComparison.buf) != 0) { - return false; - } - if (this.buf.position() != castedComparison.buf.position()){ - return false; - } - if (this.buf.limit() != castedComparison.buf.limit()){ + if (Bytes.compareTo(this.buf.array(), this.buf.arrayOffset(), this.buf.limit(), + castedComparison.buf.array(), castedComparison.buf.arrayOffset(), + castedComparison.buf.limit()) != 0) { return false; } return true; @@ -1665,6 +1740,7 @@ public class HFileBlock implements Cacheable { return this.fileContext.getBytesPerChecksum(); } + /** @return the size of data on disk + header. Excludes checksum. */ int getOnDiskDataSizeWithHeader() { return this.onDiskDataSizeWithHeader; } @@ -1718,6 +1794,10 @@ public class HFileBlock implements Cacheable { return DUMMY_HEADER_NO_CHECKSUM; } + /** + * @return the HFileContext used to create this HFileBlock. Not necessary the + * fileContext for the file from which this block's data was originally read. + */ public HFileContext getHFileContext() { return this.fileContext; } @@ -1730,7 +1810,7 @@ public class HFileBlock implements Cacheable { static String toStringHeader(ByteBuffer buf) throws IOException { int offset = buf.arrayOffset(); byte[] b = buf.array(); - long magic = Bytes.toLong(b, offset); + long magic = Bytes.toLong(b, offset); BlockType bt = BlockType.read(buf); offset += Bytes.SIZEOF_LONG; int compressedBlockSizeNoHeader = Bytes.toInt(b, offset); @@ -1757,4 +1837,3 @@ public class HFileBlock implements Cacheable { " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader; } } - diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 2992466..5d20cdf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -740,7 +740,7 @@ public class HFileBlockIndex { * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The * initial value accounts for the root level, and will be increased to two * as soon as we find out there is a leaf-level in - * {@link #blockWritten(long, int)}. + * {@link #blockWritten(long, int, int)}. */ private int numLevels = 1; @@ -766,8 +766,8 @@ public class HFileBlockIndex { /** Whether we require this block index to always be single-level. */ private boolean singleLevelOnly; - /** Block cache, or null if cache-on-write is disabled */ - private BlockCache blockCache; + /** CacheConfig, or null if cache-on-write is disabled */ + private CacheConfig cacheConf; /** Name to use for computing cache keys */ private String nameForCaching; @@ -782,18 +782,17 @@ public class HFileBlockIndex { * Creates a multi-level block index writer. * * @param blockWriter the block writer to use to write index blocks - * @param blockCache if this is not null, index blocks will be cached - * on write into this block cache. + * @param cacheConf used to determine when and how a block should be cached-on-write. */ public BlockIndexWriter(HFileBlock.Writer blockWriter, - BlockCache blockCache, String nameForCaching) { - if ((blockCache == null) != (nameForCaching == null)) { + CacheConfig cacheConf, String nameForCaching) { + if ((cacheConf == null) != (nameForCaching == null)) { throw new IllegalArgumentException("Block cache and file name for " + "caching must be both specified or both null"); } this.blockWriter = blockWriter; - this.blockCache = blockCache; + this.cacheConf = cacheConf; this.nameForCaching = nameForCaching; this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE; } @@ -947,11 +946,11 @@ public class HFileBlockIndex { byte[] curFirstKey = curChunk.getBlockKey(0); blockWriter.writeHeaderAndData(out); - if (blockCache != null) { - HFileBlock blockForCaching = blockWriter.getBlockForCaching(); - blockCache.cacheBlock(new BlockCacheKey(nameForCaching, - beginOffset, DataBlockEncoding.NONE, - blockForCaching.getBlockType()), blockForCaching); + if (cacheConf != null) { + HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf); + cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching, + beginOffset, DataBlockEncoding.NONE, + blockForCaching.getBlockType()), blockForCaching); } // Add intermediate index block size @@ -1059,8 +1058,7 @@ public class HFileBlockIndex { * entry referring to that block to the parent-level index. */ @Override - public void blockWritten(long offset, int onDiskSize, int uncompressedSize) - { + public void blockWritten(long offset, int onDiskSize, int uncompressedSize) { // Add leaf index block size totalBlockOnDiskSize += onDiskSize; totalBlockUncompressedSize += uncompressedSize; @@ -1125,7 +1123,7 @@ public class HFileBlockIndex { */ @Override public boolean getCacheOnWrite() { - return blockCache != null; + return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 41d5062..12565fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -283,6 +283,7 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock cachedBlock = (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, false, true); if (cachedBlock != null) { + assert cachedBlock.isUnpacked() : "Packed block leak."; // Return a distinct 'shallow copy' of the block, // so pos does not get messed by the scanner return cachedBlock.getBufferWithoutHeader(); @@ -291,7 +292,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, - blockSize, -1, true); + blockSize, -1, true).unpack(hfileContext, fsBlockReader); // Cache the block if (cacheBlock) { @@ -359,7 +360,10 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock cachedBlock = (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, useLock, updateCacheMetrics); if (cachedBlock != null) { - validateBlockType(cachedBlock, expectedBlockType); + if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { + cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader); + } + assert cachedBlock.isUnpacked() : "Packed block leak."; if (cachedBlock.getBlockType().isData()) { HFile.dataBlockReadCnt.incrementAndGet(); @@ -387,17 +391,21 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, pread); validateBlockType(hfileBlock, expectedBlockType); + HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); + BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); // Cache the block if necessary - if (cacheBlock && cacheConf.shouldCacheBlockOnRead(hfileBlock.getBlockType().getCategory())) { - cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory()); + if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { + cacheConf.getBlockCache().cacheBlock(cacheKey, + cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked, + cacheConf.isInMemory()); } if (updateCacheMetrics && hfileBlock.getBlockType().isData()) { HFile.dataBlockReadCnt.incrementAndGet(); } - return hfileBlock; + return unpacked; } } finally { traceScope.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index fdea542..21f0147 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -119,7 +119,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter, - cacheIndexesOnWrite ? cacheConf.getBlockCache(): null, + cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null); dataBlockIndexWriter.setMaxChunkSize( HFileBlockIndex.getMaxChunkSize(conf)); @@ -144,7 +144,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { newBlock(); } - /** Clean up the current block */ + /** Clean up the current data block */ private void finishBlock() throws IOException { if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return; @@ -192,7 +192,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { * the cache key. */ private void doCacheOnWrite(long offset) { - HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(); + HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf); cacheConf.getBlockCache().cacheBlock( new BlockCacheKey(name, offset, blockEncoder.getDataBlockEncoding(), cacheFormatBlock.getBlockType()), cacheFormatBlock); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index bc3f8d3..a5fd71d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.base.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -325,12 +326,35 @@ public class LruBlockCache implements BlockCache, HeapSize { cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); long newSize = updateSizeMetrics(cb, false); map.put(cacheKey, cb); - elements.incrementAndGet(); - if(newSize > acceptableSize() && !evictionInProgress) { + long val = elements.incrementAndGet(); + if (LOG.isTraceEnabled()) { + long size = map.size(); + assertCounterSanity(size, val); + } + if (newSize > acceptableSize() && !evictionInProgress) { runEviction(); } } + /** + * Sanity-checking for parity between actual block cache content and metrics. + * Intended only for use with TRACE level logging and -ea JVM. + */ + private static void assertCounterSanity(long mapSize, long counterVal) { + if (counterVal < 0) { + LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal + + ", mapSize=" + mapSize); + return; + } + if (mapSize < Integer.MAX_VALUE) { + double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.); + if (pct_diff > 0.05) { + LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal + + ", mapSize=" + mapSize); + } + } + } + private int compare(Cacheable left, Cacheable right) { ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength()); left.serialize(l); @@ -447,7 +471,11 @@ public class LruBlockCache implements BlockCache, HeapSize { protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) { map.remove(block.getCacheKey()); updateSizeMetrics(block, true); - elements.decrementAndGet(); + long val = elements.decrementAndGet(); + if (LOG.isTraceEnabled()) { + long size = map.size(); + assertCounterSanity(size, val); + } stats.evicted(); if (evictedByEvictionProcess && victimHandler != null) { boolean wait = getCurrentSize() < acceptableSize(); @@ -491,9 +519,12 @@ public class LruBlockCache implements BlockCache, HeapSize { if(bytesToFree <= 0) return; // Instantiate priority buckets - BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize()); - BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize()); - BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize()); + BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, + singleSize()); + BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, + multiSize()); + BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, + memorySize()); // Scan entire map putting into appropriate buckets for(LruCachedBlock cachedBlock : map.values()) { @@ -522,7 +553,15 @@ public class LruBlockCache implements BlockCache, HeapSize { // so the single and multi buckets will be emptied bytesFreed = bucketSingle.free(s); bytesFreed += bucketMulti.free(m); + if (LOG.isTraceEnabled()) { + LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + + " from single and multi buckets"); + } bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); + if (LOG.isTraceEnabled()) { + LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + + " total from all three buckets "); + } } else { // this means no need to evict block in memory bucket, // and we try best to make the ratio between single-bucket and @@ -584,6 +623,23 @@ public class LruBlockCache implements BlockCache, HeapSize { } } + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("blockCount", getBlockCount()) + .add("currentSize", getCurrentSize()) + .add("freeSize", getFreeSize()) + .add("maxSize", getMaxSize()) + .add("heapSize", heapSize()) + .add("minSize", minSize()) + .add("minFactor", minFactor) + .add("multiSize", multiSize()) + .add("multiFactor", multiFactor) + .add("singleSize", singleSize()) + .add("singleFactor", singleFactor) + .toString(); + } + /** * Used to group blocks into priority buckets. There will be a BlockBucket * for each priority (single, multi, memory). Once bucketed, the eviction @@ -591,11 +647,14 @@ public class LruBlockCache implements BlockCache, HeapSize { * to configuration parameters and their relatives sizes. */ private class BlockBucket implements Comparable { + + private final String name; private LruCachedBlockQueue queue; private long totalSize = 0; private long bucketSize; - public BlockBucket(long bytesToFree, long blockSize, long bucketSize) { + public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) { + this.name = name; this.bucketSize = bucketSize; queue = new LruCachedBlockQueue(bytesToFree, blockSize); totalSize = 0; @@ -607,6 +666,9 @@ public class LruBlockCache implements BlockCache, HeapSize { } public long free(long toFree) { + if (LOG.isTraceEnabled()) { + LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this); + } LruCachedBlock cb; long freedBytes = 0; while ((cb = queue.pollLast()) != null) { @@ -615,6 +677,9 @@ public class LruBlockCache implements BlockCache, HeapSize { return freedBytes; } } + if (LOG.isTraceEnabled()) { + LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this); + } return freedBytes; } @@ -636,13 +701,22 @@ public class LruBlockCache implements BlockCache, HeapSize { if (that == null || !(that instanceof BlockBucket)){ return false; } + return compareTo((BlockBucket)that) == 0; } @Override public int hashCode() { - // Nothing distingushing about each instance unless I pass in a 'name' or something - return super.hashCode(); + return Objects.hashCode(name, bucketSize, queue, totalSize); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("name", name) + .add("totalSize", StringUtils.byteDesc(totalSize)) + .add("bucketSize", StringUtils.byteDesc(bucketSize)) + .toString(); } } @@ -757,7 +831,7 @@ public class LruBlockCache implements BlockCache, HeapSize { LruBlockCache.LOG.debug("Total=" + StringUtils.byteDesc(totalSize) + ", " + "free=" + StringUtils.byteDesc(freeSize) + ", " + "max=" + StringUtils.byteDesc(this.maxSize) + ", " + - "blocks=" + size() +", " + + "blockCount=" + getBlockCount() + ", " + "accesses=" + stats.getRequestCount() + ", " + "hits=" + stats.getHitCount() + ", " + "hitRatio=" + @@ -920,6 +994,7 @@ public class LruBlockCache implements BlockCache, HeapSize { } /** Clears the cache. Used in tests. */ + @VisibleForTesting public void clearCache() { map.clear(); } @@ -928,6 +1003,7 @@ public class LruBlockCache implements BlockCache, HeapSize { * Used in testing. May be very inefficient. * @return the set of cached file names */ + @VisibleForTesting SortedSet getCachedFileNamesForTest() { SortedSet fileNames = new TreeSet(); for (BlockCacheKey cacheKey : map.keySet()) { @@ -948,6 +1024,7 @@ public class LruBlockCache implements BlockCache, HeapSize { return counts; } + @VisibleForTesting public Map getEncodingCountsForTest() { Map counts = new EnumMap(DataBlockEncoding.class); @@ -964,6 +1041,11 @@ public class LruBlockCache implements BlockCache, HeapSize { victimHandler = handler; } + @VisibleForTesting + Map getMapForTests() { + return map; + } + @Override public BlockCache[] getBlockCaches() { return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 1bac569..c0e5ade 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -84,6 +85,7 @@ public class TestCacheOnWrite { private final Compression.Algorithm compress; private final BlockEncoderTestType encoderType; private final HFileDataBlockEncoder encoder; + private final boolean cacheCompressedData; private static final int DATA_BLOCK_SIZE = 2048; private static final int NUM_KV = 25000; @@ -154,14 +156,15 @@ public class TestCacheOnWrite { } } - public TestCacheOnWrite(CacheOnWriteType cowType, - Compression.Algorithm compress, BlockEncoderTestType encoderType) { + public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, + BlockEncoderTestType encoderType, boolean cacheCompressedData) { this.cowType = cowType; this.compress = compress; this.encoderType = encoderType; this.encoder = encoderType.getEncoder(); - testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + - ", encoderType=" + encoderType + "]"; + this.cacheCompressedData = cacheCompressedData; + testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + + ", encoderType=" + encoderType + ", cacheCompressedData=" + cacheCompressedData + "]"; System.out.println(testDescription); } @@ -173,7 +176,9 @@ public class TestCacheOnWrite { HBaseTestingUtility.COMPRESSION_ALGORITHMS) { for (BlockEncoderTestType encoderType : BlockEncoderTestType.values()) { - cowTypes.add(new Object[] { cowType, compress, encoderType }); + for (boolean cacheCompressedData : new boolean[] { false, true }) { + cowTypes.add(new Object[] { cowType, compress, encoderType, cacheCompressedData }); + } } } } @@ -189,11 +194,12 @@ public class TestCacheOnWrite { conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE); conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, - cowType.shouldBeCached(BlockType.DATA)); + cowType.shouldBeCached(BlockType.DATA)); conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.LEAF_INDEX)); conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.BLOOM_CHUNK)); + conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData); cowType.modifyConf(conf); fs = HFileSystem.get(conf); cacheConf = new CacheConfig(conf); @@ -225,6 +231,10 @@ public class TestCacheOnWrite { reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf); } LOG.info("HFile information: " + reader); + HFileContext meta = new HFileContextBuilder().withCompression(compress) + .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) + .withBlockSize(DATA_BLOCK_SIZE).withDataBlockEncoding(encoder.getDataBlockEncoding()) + .withIncludesTags(useTags).build(); final boolean cacheBlocks = false; final boolean pread = false; HFileScanner scanner = reader.getScanner(cacheBlocks, pread); @@ -247,17 +257,37 @@ public class TestCacheOnWrite { HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, false, true, null); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), - offset, encodingInCache, block.getBlockType()); - boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null; + offset, encodingInCache, block.getBlockType()); + HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); + boolean isCached = fromCache != null; boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); - if (shouldBeCached != isCached) { - throw new AssertionError( - "shouldBeCached: " + shouldBeCached+ "\n" + - "isCached: " + isCached + "\n" + - "Test description: " + testDescription + "\n" + - "block: " + block + "\n" + - "encodingInCache: " + encodingInCache + "\n" + - "blockCacheKey: " + blockCacheKey); + assertTrue("shouldBeCached: " + shouldBeCached+ "\n" + + "isCached: " + isCached + "\n" + + "Test description: " + testDescription + "\n" + + "block: " + block + "\n" + + "encodingInCache: " + encodingInCache + "\n" + + "blockCacheKey: " + blockCacheKey, + shouldBeCached == isCached); + if (isCached) { + if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) { + if (compress != Compression.Algorithm.NONE) { + assertFalse(fromCache.isUnpacked()); + } + fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader()); + } else { + assertTrue(fromCache.isUnpacked()); + } + // block we cached at write-time and block read from file should be identical + assertEquals(block.getChecksumType(), fromCache.getChecksumType()); + assertEquals(block.getBlockType(), fromCache.getBlockType()); + if (block.getBlockType() == BlockType.ENCODED_DATA) { + assertEquals(block.getDataBlockEncodingId(), fromCache.getDataBlockEncodingId()); + assertEquals(block.getDataBlockEncoding(), fromCache.getDataBlockEncoding()); + } + assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader()); + assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader()); + assertEquals( + block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader()); } prevBlock = block; offset += block.getOnDiskSizeWithHeader(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index 020a293..f70976f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -125,7 +125,7 @@ public class TestChecksum { assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); // read data back from the hfile, exclude header and checksum - ByteBuffer bb = b.getBufferWithoutHeader(); // read back data + ByteBuffer bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data DataInputStream in = new DataInputStream( new ByteArrayInputStream( bb.array(), bb.arrayOffset(), bb.limit())); @@ -164,6 +164,7 @@ public class TestChecksum { b = hbr.readBlockData(0, -1, -1, pread); is.close(); b.sanityCheck(); + b = b.unpack(meta, hbr); assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); @@ -274,12 +275,7 @@ public class TestChecksum { // validate data for (int i = 0; i < 1234; i++) { int val = in.readInt(); - if (val != i) { - String msg = "testChecksumCorruption: data mismatch at index " + - i + " expected " + i + " found " + val; - LOG.warn(msg); - assertEquals(i, val); - } + assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java index 665a7f3..d7eb0c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java @@ -74,14 +74,15 @@ public class TestForceCacheImportantBlocks { @Parameters public static Collection parameters() { // HFile versions - return Arrays.asList(new Object[][] { - new Object[] { new Integer(2), false }, - new Object[] { new Integer(2), true } - }); + return Arrays.asList( + new Object[] { 2, true }, + new Object[] { 2, false }, + new Object[] { 3, true }, + new Object[] { 3, false } + ); } - public TestForceCacheImportantBlocks(int hfileVersion, - boolean cfCacheEnabled) { + public TestForceCacheImportantBlocks(int hfileVersion, boolean cfCacheEnabled) { this.hfileVersion = hfileVersion; this.cfCacheEnabled = cfCacheEnabled; TEST_UTIL.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, @@ -119,7 +120,6 @@ public class TestForceCacheImportantBlocks { } } - private void writeTestData(HRegion region) throws IOException { for (int i = 0; i < NUM_ROWS; ++i) { Put put = new Put(Bytes.toBytes("row" + i)); @@ -135,6 +135,4 @@ public class TestForceCacheImportantBlocks { } } } - } - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index 20463f7..f46bf13 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -301,7 +301,8 @@ public class TestHFile extends HBaseTestCase { ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false); ByteBuffer expected = ByteBuffer.wrap(("something to test" + i).getBytes()); - assertTrue("failed to match metadata", actual.compareTo(expected) == 0); + assertEquals("failed to match metadata", + Bytes.toStringBinary(expected), Bytes.toStringBinary(actual)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 59deb49..71c0c0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -73,6 +71,7 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import org.mockito.Mockito; @Category(MediumTests.class) @RunWith(Parameterized.class) @@ -253,8 +252,14 @@ public class TestHFileBlock { @Test public void testNoCompression() throws IOException { - assertEquals(4000, createTestV2Block(NONE, includesMemstoreTS, false). - getBlockForCaching().getUncompressedSizeWithoutHeader()); + CacheConfig cacheConf = Mockito.mock(CacheConfig.class); + Mockito.when(cacheConf.isBlockCacheEnabled()).thenReturn(false); + + HFileBlock block = + createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf); + assertEquals(4000, block.getUncompressedSizeWithoutHeader()); + assertEquals(4004, block.getOnDiskSizeWithoutHeader()); + assertTrue(block.isUnpacked()); } @Test @@ -336,14 +341,14 @@ public class TestHFileBlock { assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); - String blockStr = b.toString(); + HFileBlock expected = b; if (algo == GZ) { is = fs.open(path); hbr = new HFileBlock.FSReaderV2(is, totalSize, meta); b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), -1, pread); - assertEquals(blockStr, b.toString()); + assertEquals(expected, b); int wrongCompressedSize = 2172; try { b = hbr.readBlockData(0, wrongCompressedSize @@ -416,20 +421,33 @@ public class TestHFileBlock { HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, meta); hbr.setDataBlockEncoder(dataBlockEncoder); hbr.setIncludesMemstoreTS(includesMemstoreTS); - HFileBlock b; + HFileBlock blockFromHFile, blockUnpacked; int pos = 0; for (int blockId = 0; blockId < numBlocks; ++blockId) { - b = hbr.readBlockData(pos, -1, -1, pread); + blockFromHFile = hbr.readBlockData(pos, -1, -1, pread); assertEquals(0, HFile.getChecksumFailuresCount()); - b.sanityCheck(); - pos += b.getOnDiskSizeWithHeader(); - assertEquals((int) encodedSizes.get(blockId), - b.getUncompressedSizeWithoutHeader()); - ByteBuffer actualBuffer = b.getBufferWithoutHeader(); + blockFromHFile.sanityCheck(); + pos += blockFromHFile.getOnDiskSizeWithHeader(); + assertEquals((int) encodedSizes.get(blockId), blockFromHFile.getUncompressedSizeWithoutHeader()); + assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked()); + long packedHeapsize = blockFromHFile.heapSize(); + blockUnpacked = blockFromHFile.unpack(meta, hbr); + assertTrue(blockUnpacked.isUnpacked()); + if (meta.isCompressedOrEncrypted()) { + LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked.heapSize()); + assertFalse(packedHeapsize == blockUnpacked.heapSize()); + assertTrue("Packed heapSize should be < unpacked heapSize", + packedHeapsize < blockUnpacked.heapSize()); + } + ByteBuffer actualBuffer = blockUnpacked.getBufferWithoutHeader(); if (encoding != DataBlockEncoding.NONE) { // We expect a two-byte big-endian encoding id. - assertEquals(0, actualBuffer.get(0)); - assertEquals(encoding.getId(), actualBuffer.get(1)); + assertEquals( + "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread), + Long.toHexString(0), Long.toHexString(actualBuffer.get(0))); + assertEquals( + "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread), + Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1))); actualBuffer.position(2); actualBuffer = actualBuffer.slice(); } @@ -438,8 +456,23 @@ public class TestHFileBlock { expectedBuffer.rewind(); // test if content matches, produce nice message - assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, - pread); + assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, pread); + + // test serialized blocks + for (boolean reuseBuffer : new boolean[] { false, true }) { + ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength()); + blockFromHFile.serialize(serialized); + HFileBlock deserialized = + (HFileBlock) blockFromHFile.getDeserializer().deserialize(serialized, reuseBuffer); + assertEquals( + "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer, + blockFromHFile, deserialized); + // intentional reference comparison + if (blockFromHFile != blockUnpacked) { + assertEquals("Deserializaed block cannot be unpacked correctly.", + blockUnpacked, deserialized.unpack(meta, hbr)); + } + } } is.close(); } @@ -501,6 +534,11 @@ public class TestHFileBlock { encodedBlocks.add(encodedBuf); } + static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding, + boolean pread) { + return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread); + } + static void assertBuffersEqual(ByteBuffer expectedBuffer, ByteBuffer actualBuffer, Compression.Algorithm compression, DataBlockEncoding encoding, boolean pread) { @@ -513,9 +551,8 @@ public class TestHFileBlock { } fail(String.format( - "Content mismath for compression %s, encoding %s, " + - "pread %s, commonPrefix %d, expected %s, got %s", - compression, encoding, pread, prefix, + "Content mismatch for %s, commonPrefix %d, expected %s, got %s", + buildMessageDetails(compression, encoding, pread), prefix, nextBytesToStr(expectedBuffer, prefix), nextBytesToStr(actualBuffer, prefix))); } @@ -538,6 +575,7 @@ public class TestHFileBlock { } protected void testPreviousOffsetInternals() throws IOException { + // TODO: parameterize these nested loops. for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { for (boolean pread : BOOLEAN_VALUES) { for (boolean cacheOnWrite : BOOLEAN_VALUES) { @@ -607,8 +645,10 @@ public class TestHFileBlock { curOffset += b.getOnDiskSizeWithHeader(); if (cacheOnWrite) { - // In the cache-on-write mode we store uncompressed bytes so we - // can compare them to what was read by the block reader. + // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply + // verifies that the unpacked value read back off disk matches the unpacked value + // generated before writing to disk. + b = b.unpack(meta, hbr); // b's buffer has header + data + checksum while // expectedContents have header + data only ByteBuffer bufRead = b.getBufferWithHeader(); @@ -627,11 +667,10 @@ public class TestHFileBlock { + algo + ", pread=" + pread + ", cacheOnWrite=" + cacheOnWrite + "):\n"; wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(), - bufExpected.arrayOffset(), Math.min(32, - bufExpected.limit())) + bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) + ", actual:\n" + Bytes.toStringBinary(bufRead.array(), - bufRead.arrayOffset(), Math.min(32, bufRead.limit())); + bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit())); if (detailedLogging) { LOG.warn("expected header" + HFileBlock.toStringHeader(bufExpected) + @@ -821,6 +860,7 @@ public class TestHFileBlock { if (detailedLogging) { LOG.info("Written block #" + i + " of type " + bt + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader() + + ", packed size " + hbw.getOnDiskSizeWithoutHeader() + " at offset " + pos); } } @@ -869,7 +909,4 @@ public class TestHFileBlock { block.heapSize()); } } - - } - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java index 1d48509..e7cbbd4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -212,7 +210,7 @@ public class TestHFileBlockCompatibility { assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); - String blockStr = b.toString(); + HFileBlock expected = b; if (algo == GZ) { is = fs.open(path); @@ -220,7 +218,7 @@ public class TestHFileBlockCompatibility { meta); b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + b.totalChecksumBytes(), -1, pread); - assertEquals(blockStr, b.toString()); + assertEquals(expected, b); int wrongCompressedSize = 2172; try { b = hbr.readBlockData(0, wrongCompressedSize @@ -295,6 +293,10 @@ public class TestHFileBlockCompatibility { for (int blockId = 0; blockId < numBlocks; ++blockId) { b = hbr.readBlockData(pos, -1, -1, pread); b.sanityCheck(); + if (meta.isCompressedOrEncrypted()) { + assertFalse(b.isUnpacked()); + b = b.unpack(meta, hbr); + } pos += b.getOnDiskSizeWithHeader(); assertEquals((int) encodedSizes.get(blockId), @@ -410,10 +412,6 @@ public class TestHFileBlockCompatibility { private HFileContext meta; - /** - * @param compressionAlgorithm compression algorithm to use - * @param dataBlockEncoderAlgo data block encoding algorithm to use - */ public Writer(Compression.Algorithm compressionAlgorithm, HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) { compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java index 3556b79..191e8e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile; -import static org.junit.Assert.*; - import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -49,6 +47,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.*; + @Category(SmallTests.class) public class TestHFileEncryption { private static final Log LOG = LogFactory.getLog(TestHFileEncryption.class); @@ -91,11 +91,13 @@ public class TestHFileEncryption { return hbw.getOnDiskSizeWithHeader(); } - private long readAndVerifyBlock(long pos, HFileBlock.FSReaderV2 hbr, int size) + private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderV2 hbr, int size) throws IOException { HFileBlock b = hbr.readBlockData(pos, -1, -1, false); assertEquals(0, HFile.getChecksumFailuresCount()); b.sanityCheck(); + assertFalse(b.isUnpacked()); + b = b.unpack(ctx, hbr); LOG.info("Read a block at " + pos + " with" + " onDiskSizeWithHeader=" + b.getOnDiskSizeWithHeader() + " uncompressedSizeWithoutHeader=" + b.getOnDiskSizeWithoutHeader() + @@ -138,7 +140,7 @@ public class TestHFileEncryption { HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, fileContext); long pos = 0; for (int i = 0; i < blocks; i++) { - pos += readAndVerifyBlock(pos, hbr, blockSizes[i]); + pos += readAndVerifyBlock(pos, fileContext, hbr, blockSizes[i]); } } finally { is.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java index 27e7051..b27f5b3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; @@ -170,8 +171,8 @@ public class TestHFileWriterV2 { // Meta index. metaBlockIndexReader.readRootIndex( - blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(), - trailer.getMetaIndexCount()); + blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX) + .getByteStream(), trailer.getMetaIndexCount()); // File info FileInfo fileInfo = new FileInfo(); fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); @@ -191,6 +192,10 @@ public class TestHFileWriterV2 { while (curBlockPos <= trailer.getLastDataBlockOffset()) { HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); assertEquals(BlockType.DATA, block.getBlockType()); + if (meta.isCompressedOrEncrypted()) { + assertFalse(block.isUnpacked()); + block = block.unpack(meta, blockReader); + } ByteBuffer buf = block.getBufferWithoutHeader(); while (buf.hasRemaining()) { int keyLen = buf.getInt(); @@ -232,7 +237,8 @@ public class TestHFileWriterV2 { while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + trailer.getLoadOnOpenDataOffset()); - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + .unpack(meta, blockReader); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); ByteBuffer buf = block.getBufferWithoutHeader(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java index 2fc701a..29df78a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java @@ -191,8 +191,7 @@ public class TestHFileWriterV3 { // Data index. We also read statistics about the block index written after // the root level. dataBlockIndexReader.readMultiLevelIndexRoot( - blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), - trailer.getDataIndexCount()); + blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount()); if (findMidKey) { byte[] midkey = dataBlockIndexReader.midkey(); @@ -201,8 +200,8 @@ public class TestHFileWriterV3 { // Meta index. metaBlockIndexReader.readRootIndex( - blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(), - trailer.getMetaIndexCount()); + blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX) + .getByteStream(), trailer.getMetaIndexCount()); // File info FileInfo fileInfo = new FileInfo(); fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); @@ -220,7 +219,8 @@ public class TestHFileWriterV3 { fsdis.seek(0); long curBlockPos = 0; while (curBlockPos <= trailer.getLastDataBlockOffset()) { - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + .unpack(context, blockReader); assertEquals(BlockType.DATA, block.getBlockType()); ByteBuffer buf = block.getBufferWithoutHeader(); int keyLen = -1; @@ -278,7 +278,8 @@ public class TestHFileWriterV3 { while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + trailer.getLoadOnOpenDataOffset()); - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + .unpack(context, blockReader); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); ByteBuffer buf = block.getBufferWithoutHeader(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java new file mode 100644 index 0000000..0b051ff --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import com.google.common.collect.Iterables; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.*; + +/** + * A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig}, + * and {@link LruBlockCache}. + */ +@Category(SmallTests.class) +@RunWith(Parameterized.class) +public class TestLazyDataBlockDecompression { + private static final Log LOG = LogFactory.getLog(TestLazyDataBlockDecompression.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private FileSystem fs; + + @Parameterized.Parameter(0) + public boolean cacheOnWrite; + + @Parameterized.Parameters + public static Iterable data() { + return Arrays.asList(new Object[][] { + { false }, + { true } + }); + } + + @Before + public void setUp() throws IOException { + CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; + fs = FileSystem.get(TEST_UTIL.getConfiguration()); + } + + @After + public void tearDown() { + CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; + fs = null; + } + + /** + * Write {@code entryCount} random keyvalues to a new HFile at {@code path}. Returns the row + * bytes of the KeyValues written, in the order they were written. + */ + private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path, + HFileContext cxt, int entryCount) throws IOException { + HFileWriterV2 writer = (HFileWriterV2) + new HFileWriterV2.WriterFactoryV2(conf, cc) + .withPath(fs, path) + .withFileContext(cxt) + .create(); + + // write a bunch of random kv's + Random rand = new Random(9713312); // some seed. + final byte[] family = Bytes.toBytes("f"); + final byte[] qualifier = Bytes.toBytes("q"); + + for (int i = 0; i < entryCount; i++) { + byte[] keyBytes = TestHFileWriterV2.randomOrderedKey(rand, i); + byte[] valueBytes = TestHFileWriterV2.randomValue(rand); + // make a real keyvalue so that hfile tool can examine it + writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes)); + } + writer.close(); + } + + /** + * Read all blocks from {@code path} to populate {@code blockCache}. + */ + private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs, + Path path, HFileContext cxt) throws IOException { + FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path); + long fileSize = fs.getFileStatus(path).getLen(); + FixedFileTrailer trailer = + FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize); + HFileReaderV2 reader = new HFileReaderV2(path, trailer, fsdis, fileSize, cacheConfig, + fsdis.getHfs(), conf); + reader.loadFileInfo(); + long offset = trailer.getFirstDataBlockOffset(), + max = trailer.getLastDataBlockOffset(); + List blocks = new ArrayList(4); + HFileBlock block; + while (offset <= max) { + block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false, + /* isCompaction */ false, /* updateCacheMetrics */ true, null); + offset += block.getOnDiskSizeWithHeader(); + blocks.add(block); + } + LOG.info("read " + Iterables.toString(blocks)); + } + + @Test + public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception { + // enough room for 2 uncompressed block + int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1); + Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), + "testCompressionIncreasesEffectiveBlockcacheSize"); + HFileContext context = new HFileContextBuilder() + .withCompression(Compression.Algorithm.GZ) + .build(); + LOG.info("context=" + context); + + // setup cache with lazy-decompression disabled. + Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); + lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); + lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); + lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); + CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = + new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled); + CacheConfig cc = new CacheConfig(lazyCompressDisabled); + assertFalse(cc.shouldCacheDataCompressed()); + assertTrue(cc.getBlockCache() instanceof LruBlockCache); + LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache(); + LOG.info("disabledBlockCache=" + disabledBlockCache); + assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize()); + assertTrue("eviction thread spawned unintentionally.", + disabledBlockCache.getEvictionThread() == null); + assertEquals("freshly created blockcache contains blocks.", + 0, disabledBlockCache.getBlockCount()); + + // 2000 kv's is ~3.6 full unencoded data blocks. + // Requires a conf and CacheConfig but should not be specific to this instance's cache settings + writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000); + + // populate the cache + cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context); + long disabledBlockCount = disabledBlockCache.getBlockCount(); + assertTrue("blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount, + disabledBlockCount > 0); + long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount(); + for (Map.Entry e : + disabledBlockCache.getMapForTests().entrySet()) { + HFileBlock block = (HFileBlock) e.getValue().getBuffer(); + assertTrue("found a packed block, block=" + block, block.isUnpacked()); + } + + // count blocks with lazy decompression + Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); + lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); + lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); + lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); + CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = + new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled); + cc = new CacheConfig(lazyCompressEnabled); + assertTrue("test improperly configured.", cc.shouldCacheDataCompressed()); + assertTrue(cc.getBlockCache() instanceof LruBlockCache); + LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache(); + LOG.info("enabledBlockCache=" + enabledBlockCache); + assertEquals("test inconsistency detected", maxSize, enabledBlockCache.getMaxSize()); + assertTrue("eviction thread spawned unintentionally.", + enabledBlockCache.getEvictionThread() == null); + assertEquals("freshly created blockcache contains blocks.", + 0, enabledBlockCache.getBlockCount()); + + cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context); + long enabledBlockCount = enabledBlockCache.getBlockCount(); + assertTrue("blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount, + enabledBlockCount > 0); + long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount(); + int candidatesFound = 0; + for (Map.Entry e : + enabledBlockCache.getMapForTests().entrySet()) { + candidatesFound++; + HFileBlock block = (HFileBlock) e.getValue().getBuffer(); + if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) { + assertFalse("found an unpacked block, block=" + block + ", block buffer capacity=" + + block.getBufferWithoutHeader().capacity(), block.isUnpacked()); + } + } + assertTrue("did not find any candidates for compressed caching. Invalid test.", + candidatesFound > 0); + + LOG.info("disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + + enabledBlockCount); + assertTrue("enabling compressed data blocks should increase the effective cache size. " + + "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + + enabledBlockCount, disabledBlockCount < enabledBlockCount); + + LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" + + enabledEvictedCount); + assertTrue("enabling compressed data blocks should reduce the number of evictions. " + + "disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" + + enabledEvictedCount, enabledEvictedCount < disabledEvictedCount); + } +} -- 1.9.0