From 391266ffb8cc415b0a9acb33e089c708e02bf591 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Tue, 9 Sep 2014 15:50:53 -0700 Subject: [PATCH] HBASE-11331 [blockcache] lazy block decompression When hbase.block.data.cachecompressed=true, DATA (and ENCODED_DATA) blocks are cached in the BlockCache in their on-disk format. This is different from the default behavior, which decompresses and decrypts a block before caching. --- .../apache/hadoop/hbase/io/hfile/HFileContext.java | 17 + .../hbase/tmpl/regionserver/BlockCacheTmpl.jamon | 6 +- .../apache/hadoop/hbase/io/hfile/CacheConfig.java | 66 +-- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 441 ++++++++++++--------- .../hadoop/hbase/io/hfile/HFileBlockIndex.java | 26 +- .../hadoop/hbase/io/hfile/HFileReaderV2.java | 28 +- .../hadoop/hbase/io/hfile/HFileWriterV2.java | 6 +- .../hadoop/hbase/io/hfile/LruBlockCache.java | 100 ++++- .../hadoop/hbase/io/hfile/TestCacheConfig.java | 3 +- .../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 60 ++- .../apache/hadoop/hbase/io/hfile/TestChecksum.java | 10 +- .../io/hfile/TestForceCacheImportantBlocks.java | 19 +- .../apache/hadoop/hbase/io/hfile/TestHFile.java | 3 +- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 92 +++-- .../io/hfile/TestHFileBlockCompatibility.java | 18 +- .../hadoop/hbase/io/hfile/TestHFileEncryption.java | 13 +- .../hadoop/hbase/io/hfile/TestHFileWriterV2.java | 12 +- .../hadoop/hbase/io/hfile/TestHFileWriterV3.java | 13 +- .../io/hfile/TestLazyDataBlockDecompression.java | 231 +++++++++++ 19 files changed, 834 insertions(+), 330 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java index 3299e41..8cfc8a7 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java @@ -96,6 +96,23 @@ public class HFileContext implements HeapSize, Cloneable { this.cryptoContext = cryptoContext; } + /** + * @return true when on-disk blocks from this file are compressed, and/or encrypted; + * false otherwise. + */ + public boolean isCompressedOrEncrypted() { + Compression.Algorithm compressAlgo = getCompression(); + boolean compressed = + compressAlgo != null + && compressAlgo != Compression.Algorithm.NONE; + + Encryption.Context cryptoContext = getEncryptionContext(); + boolean encrypted = cryptoContext != null + && cryptoContext != Encryption.Context.NONE; + + return compressed || encrypted; + } + public Compression.Algorithm getCompression() { return compressAlgo; } diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon index 162ac46..a6a3cf9 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon @@ -157,9 +157,9 @@ org.apache.hadoop.util.StringUtils; reader is closed - Compress blocks - <% cacheConfig.shouldCacheCompressed() %> - True if blocks are compressed in cache + Cache DATA in compressed format + <% cacheConfig.shouldCacheDataCompressed() %> + True if DATA blocks are cached in their compressed form Prefetch on Open diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 82bbeee..3ffac52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -64,11 +64,10 @@ public class CacheConfig { "hfile.block.bloom.cacheonwrite"; /** - * TODO: Implement this (jgray) - * Configuration key to cache data blocks in compressed format. + * Configuration key to cache data blocks in compressed and/or encrypted format. */ public static final String CACHE_DATA_BLOCKS_COMPRESSED_KEY = - "hbase.rs.blockcache.cachedatacompressed"; + "hbase.block.data.cachecompressed"; /** * Configuration key to evict all blocks of a given file from the block cache @@ -119,6 +118,14 @@ public class CacheConfig { public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = "hbase.rs.prefetchblocksonopen"; + /** + * The target block size used by blockcache instances. Defaults to + * {@link HConstants#DEFAULT_BLOCKSIZE}. + * TODO: this config point is completely wrong, as it's used to determine the + * target block size of BlockCache instances. Rename. + */ + public static final String BLOCKCACHE_BLOCKSIZE_KEY = "hbase.offheapcache.minblocksize"; + // Defaults public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; @@ -127,7 +134,7 @@ public class CacheConfig { public static final boolean DEFAULT_CACHE_INDEXES_ON_WRITE = false; public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false; public static final boolean DEFAULT_EVICT_ON_CLOSE = false; - public static final boolean DEFAULT_COMPRESSED_CACHE = false; + public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false; public static final boolean DEFAULT_PREFETCH_ON_OPEN = false; /** Local reference to the block cache, null if completely disabled */ @@ -156,8 +163,8 @@ public class CacheConfig { /** Whether blocks of a file should be evicted when the file is closed */ private boolean evictOnClose; - /** Whether data blocks should be stored in compressed form in the cache */ - private final boolean cacheCompressed; + /** Whether data blocks should be stored in compressed and/or encrypted form in the cache */ + private final boolean cacheDataCompressed; /** Whether data blocks should be prefetched into the cache */ private final boolean prefetchOnOpen; @@ -189,7 +196,7 @@ public class CacheConfig { DEFAULT_CACHE_BLOOMS_ON_WRITE) || family.shouldCacheBloomsOnWrite(), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(), - conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE), + conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED), conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen(), conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, @@ -208,13 +215,10 @@ public class CacheConfig { DEFAULT_IN_MEMORY, // This is a family-level setting so can't be set // strictly from conf conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), - conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, - DEFAULT_CACHE_INDEXES_ON_WRITE), - conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, - DEFAULT_CACHE_BLOOMS_ON_WRITE), + conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_INDEXES_ON_WRITE), + conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_BLOOMS_ON_WRITE), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), - conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, - DEFAULT_COMPRESSED_CACHE), + conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED), conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN), conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) @@ -232,7 +236,7 @@ public class CacheConfig { * @param cacheIndexesOnWrite whether index blocks should be cached on write * @param cacheBloomsOnWrite whether blooms should be cached on write * @param evictOnClose whether blocks should be evicted when HFile is closed - * @param cacheCompressed whether to store blocks as compressed in the cache + * @param cacheDataCompressed whether to store blocks as compressed in the cache * @param prefetchOnOpen whether to prefetch blocks upon open * @param cacheDataInL1 If more than one cache tier deployed, if true, cache this column families * data blocks up in the L1 tier. @@ -241,7 +245,7 @@ public class CacheConfig { final boolean cacheDataOnRead, final boolean inMemory, final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheBloomsOnWrite, final boolean evictOnClose, - final boolean cacheCompressed, final boolean prefetchOnOpen, + final boolean cacheDataCompressed, final boolean prefetchOnOpen, final boolean cacheDataInL1) { this.blockCache = blockCache; this.cacheDataOnRead = cacheDataOnRead; @@ -250,7 +254,7 @@ public class CacheConfig { this.cacheIndexesOnWrite = cacheIndexesOnWrite; this.cacheBloomsOnWrite = cacheBloomsOnWrite; this.evictOnClose = evictOnClose; - this.cacheCompressed = cacheCompressed; + this.cacheDataCompressed = cacheDataCompressed; this.prefetchOnOpen = prefetchOnOpen; this.cacheDataInL1 = cacheDataInL1; LOG.info(this); @@ -264,7 +268,7 @@ public class CacheConfig { this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory, cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose, - cacheConf.cacheCompressed, cacheConf.prefetchOnOpen, + cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen, cacheConf.cacheDataInL1); } @@ -298,14 +302,13 @@ public class CacheConfig { * available. */ public boolean shouldCacheBlockOnRead(BlockCategory category) { - boolean shouldCache = isBlockCacheEnabled() + return isBlockCacheEnabled() && (cacheDataOnRead || category == BlockCategory.INDEX || category == BlockCategory.BLOOM || (prefetchOnOpen && (category != BlockCategory.META && category != BlockCategory.UNKNOWN))); - return shouldCache; } /** @@ -384,10 +387,23 @@ public class CacheConfig { } /** - * @return true if blocks should be compressed in the cache, false if not + * @return true if data blocks should be compressed in the cache, false if not */ - public boolean shouldCacheCompressed() { - return isBlockCacheEnabled() && this.cacheCompressed; + public boolean shouldCacheDataCompressed() { + return isBlockCacheEnabled() && this.cacheDataCompressed; + } + + /** + * @return true if this {@link BlockCategory} should be compressed in blockcache, false otherwise + */ + public boolean shouldCacheCompressed(BlockCategory category) { + if (!isBlockCacheEnabled()) return false; + switch (category) { + case DATA: + return this.cacheDataCompressed; + default: + return false; + } } /** @@ -408,7 +424,7 @@ public class CacheConfig { ", cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() + ", cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() + ", cacheEvictOnClose=" + shouldEvictOnClose() + - ", cacheCompressed=" + shouldCacheCompressed() + + ", cacheDataCompressed=" + shouldCacheDataCompressed() + ", prefetchOnOpen=" + shouldPrefetchOnOpen(); } @@ -449,7 +465,7 @@ public class CacheConfig { */ private static LruBlockCache getL1(final Configuration c, final MemoryUsage mu) { long lruCacheSize = getLruCacheSize(c, mu); - int blockSize = c.getInt("hbase.offheapcache.minblocksize", HConstants.DEFAULT_BLOCKSIZE); + int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); LOG.info("Allocating LruBlockCache size=" + StringUtils.byteDesc(lruCacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); return new LruBlockCache(lruCacheSize, blockSize, true, c); @@ -466,7 +482,7 @@ public class CacheConfig { String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null); if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null; - int blockSize = c.getInt("hbase.offheapcache.minblocksize", HConstants.DEFAULT_BLOCKSIZE); + int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); float bucketCachePercentage = c.getFloat(BUCKET_CACHE_SIZE_KEY, 0F); long bucketCacheSize = (long) (bucketCachePercentage < 1? mu.getMax() * bucketCachePercentage: bucketCachePercentage * 1024 * 1024); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 3e26107..75a87a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -36,9 +36,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; -import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; @@ -64,25 +61,26 @@ import com.google.common.base.Preconditions; * information from the block index are required to read a block. *
  • In version 2 a block is structured as follows: * * - * The version 2 block representation in the block cache is the same as above, - * except that the data section is always uncompressed in the cache. */ @InterfaceAudience.Private public class HFileBlock implements Cacheable { @@ -111,7 +109,7 @@ public class HFileBlock implements Cacheable { ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false); // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader - public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT + public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG; /** @@ -136,6 +134,9 @@ public class HFileBlock implements Cacheable { HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum); ourBuffer.offset = buf.getLong(); ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt(); + if (ourBuffer.hasNextBlockHeader()) { + ourBuffer.buf.limit(ourBuffer.buf.limit() - ourBuffer.headerSize()); + } return ourBuffer; } @@ -155,23 +156,28 @@ public class HFileBlock implements Cacheable { .registerDeserializer(blockDeserializer); } + /** Type of block. Header field 0. */ private BlockType blockType; - /** Size on disk without the header. It includes checksum data too. */ + /** Size on disk excluding header, including checksum. Header field 1. */ private int onDiskSizeWithoutHeader; - /** Size of pure data. Does not include header or checksums */ + /** Size of pure data. Does not include header or checksums. Header field 2. */ private final int uncompressedSizeWithoutHeader; - /** The offset of the previous block on disk */ + /** The offset of the previous block on disk. Header field 3. */ private final long prevBlockOffset; - /** Size on disk of header and data. Does not include checksum data */ + /** + * Size on disk of header + data. Excludes checksum. Header field 6, + * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum. + */ private final int onDiskDataSizeWithHeader; /** The in-memory representation of the hfile block */ private ByteBuffer buf; - /** Meta data that holds meta information on the hfileblock**/ + + /** Meta data that holds meta information on the hfileblock */ private HFileContext fileContext; /** @@ -193,27 +199,18 @@ public class HFileBlock implements Cacheable { * and is sitting in a byte buffer. * * @param blockType the type of this block, see {@link BlockType} - * @param onDiskSizeWithoutHeader compressed size of the block if compression - * is used, otherwise uncompressed size, header size not included - * @param uncompressedSizeWithoutHeader uncompressed size of the block, - * header size not included. Equals onDiskSizeWithoutHeader if - * compression is disabled. - * @param prevBlockOffset the offset of the previous block in the - * {@link HFile} + * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} + * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} + * @param prevBlockOffset see {@link #prevBlockOffset} * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by * uncompressed data. This - * @param fillHeader true to fill in the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of - * the buffer based on the header fields provided + * @param fillHeader when true, parse {@code buf} and override the first 4 header fields. * @param offset the file offset the block was read from - * @param bytesPerChecksum the number of bytes per checksum chunk - * @param checksumType the checksum algorithm to use - * @param onDiskDataSizeWithHeader size of header and data on disk not - * including checksum data + * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} * @param fileContext HFile meta data */ - HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, - int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf, - boolean fillHeader, long offset, + HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, + long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset, int onDiskDataSizeWithHeader, HFileContext fileContext) { this.blockType = blockType; this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; @@ -225,6 +222,22 @@ public class HFileBlock implements Cacheable { this.offset = offset; this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; this.fileContext = fileContext; + this.buf.rewind(); + } + + /** + * Copy constructor. Creates a shallow copy of {@code that}'s buffer. + */ + HFileBlock(HFileBlock that) { + this.blockType = that.blockType; + this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader; + this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader; + this.prevBlockOffset = that.prevBlockOffset; + this.buf = that.buf.duplicate(); + this.offset = that.offset; + this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader; + this.fileContext = that.fileContext; + this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader; } /** @@ -272,28 +285,21 @@ public class HFileBlock implements Cacheable { } /** - * @return the on-disk size of the block with header size included. This - * includes the header, the data and the checksum data. + * @return the on-disk size of header + data part + checksum. */ public int getOnDiskSizeWithHeader() { return onDiskSizeWithoutHeader + headerSize(); } /** - * Returns the size of the compressed part of the block in case compression - * is used, or the uncompressed size of the data part otherwise. Header size - * and checksum data size is not included. - * - * @return the on-disk size of the data part of the block, header and - * checksum not included. + * @return the on-disk size of the data part + checksum (header excluded). */ public int getOnDiskSizeWithoutHeader() { return onDiskSizeWithoutHeader; } /** - * @return the uncompressed size of the data part of the block, header not - * included + * @return the uncompressed size of data part (header and checksum excluded). */ public int getUncompressedSizeWithoutHeader() { return uncompressedSizeWithoutHeader; @@ -308,8 +314,8 @@ public class HFileBlock implements Cacheable { } /** - * Writes header fields into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the - * buffer. Resets the buffer position to the end of header as side effect. + * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position + * is modified as side-effect. */ private void overwriteHeader() { buf.rewind(); @@ -320,11 +326,9 @@ public class HFileBlock implements Cacheable { } /** - * Returns a buffer that does not include the header. The array offset points - * to the start of the block data right after the header. The underlying data - * array is not copied. Checksum data is not included in the returned buffer. + * Returns a buffer that does not include the header or checksum. * - * @return the buffer with header skipped + * @return the buffer with header skipped and checksum omitted. */ public ByteBuffer getBufferWithoutHeader() { return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(), @@ -336,7 +340,7 @@ public class HFileBlock implements Cacheable { * modify the buffer object. This method has to be public because it is * used in {@link CompoundBloomFilter} to avoid object creation on every * Bloom filter lookup, but has to be used with caution. Checksum data - * is not included in the returned buffer. + * is not included in the returned buffer but header data is. * * @return the buffer of this block for read-only operations */ @@ -350,17 +354,17 @@ public class HFileBlock implements Cacheable { * not modify the buffer object. This method has to be public because it is * used in {@link BucketCache} to avoid buffer copy. * - * @return the byte buffer with header included for read-only operations + * @return the buffer with header and checksum included for read-only operations */ public ByteBuffer getBufferReadOnlyWithHeader() { return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice(); } /** - * Returns a byte buffer of this block, including header data, positioned at + * Returns a byte buffer of this block, including header data and checksum, positioned at * the beginning of header. The underlying data array is not copied. * - * @return the byte buffer with header included + * @return the byte buffer with header and checksum included */ ByteBuffer getBufferWithHeader() { ByteBuffer dupBuf = buf.duplicate(); @@ -376,22 +380,25 @@ public class HFileBlock implements Cacheable { } } + private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) + throws IOException { + if (valueFromBuf != valueFromField) { + throw new IOException("Block type stored in the buffer: " + + valueFromBuf + ", block type field: " + valueFromField); + } + } + /** * Checks if the block is internally consistent, i.e. the first - * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent - * with the fields. This function is primary for testing and debugging, and - * is not thread-safe, because it alters the internal buffer pointer. + * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a + * valid header consistent with the fields. Assumes a packed block structure. + * This function is primary for testing and debugging, and is not + * thread-safe, because it alters the internal buffer pointer. */ void sanityCheck() throws IOException { buf.rewind(); - { - BlockType blockTypeFromBuf = BlockType.read(buf); - if (blockTypeFromBuf != blockType) { - throw new IOException("Block type stored in the buffer: " + - blockTypeFromBuf + ", block type field: " + blockType); - } - } + sanityCheckAssertion(BlockType.read(buf), blockType); sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); @@ -403,45 +410,65 @@ public class HFileBlock implements Cacheable { if (this.fileContext.isUseHBaseChecksum()) { sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum"); - sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, - "onDiskDataSizeWithHeader"); + sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); } int cksumBytes = totalChecksumBytes(); - int hdrSize = headerSize(); - int expectedBufLimit = uncompressedSizeWithoutHeader + headerSize() + - cksumBytes; + int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; if (buf.limit() != expectedBufLimit) { throw new AssertionError("Expected buffer limit " + expectedBufLimit + ", got " + buf.limit()); } // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next - // block's, header, so there are two sensible values for buffer capacity. - int size = uncompressedSizeWithoutHeader + hdrSize + cksumBytes; - if (buf.capacity() != size && - buf.capacity() != size + hdrSize) { + // block's header, so there are two sensible values for buffer capacity. + int hdrSize = headerSize(); + if (buf.capacity() != expectedBufLimit && + buf.capacity() != expectedBufLimit + hdrSize) { throw new AssertionError("Invalid buffer capacity: " + buf.capacity() + - ", expected " + size + " or " + (size + hdrSize)); + ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); } } @Override public String toString() { - return "blockType=" - + blockType - + ", onDiskSizeWithoutHeader=" - + onDiskSizeWithoutHeader - + ", uncompressedSizeWithoutHeader=" - + uncompressedSizeWithoutHeader - + ", prevBlockOffset=" - + prevBlockOffset - + ", dataBeginsWith=" - + Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), - Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())) - + ", fileOffset=" + offset; + StringBuilder sb = new StringBuilder() + .append("HFileBlock [") + .append(" fileOffset=").append(offset) + .append(" headerSize()=").append(headerSize()) + .append(" blockType=").append(blockType) + .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) + .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) + .append(" prevBlockOffset=").append(prevBlockOffset) + .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum()); + if (fileContext.isUseHBaseChecksum()) { + sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24))) + .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1)) + .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); + } else { + sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader) + .append("(").append(onDiskSizeWithoutHeader) + .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); + } + sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader()) + .append(" totalChecksumBytes()=").append(totalChecksumBytes()) + .append(" isUnpacked()=").append(isUnpacked()) + .append(" buf=[ ") + .append(buf) + .append(", array().length=").append(buf.array().length) + .append(", arrayOffset()=").append(buf.arrayOffset()) + .append(" ]") + .append(" dataBeginsWith=") + .append(Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), + Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()))) + .append(" fileContext=").append(fileContext) + .append(" ]"); + return sb.toString(); } + /** + * Called after reading a block with provided onDiskSizeWithHeader. + */ private void validateOnDiskSizeWithoutHeader( int expectedOnDiskSizeWithoutHeader) throws IOException { if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) { @@ -457,32 +484,80 @@ public class HFileBlock implements Cacheable { } /** + * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its + * encoded structure. Internal structures are shared between instances where applicable. + */ + HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { + if (!fileContext.isCompressedOrEncrypted()) { + // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), + // which is used for block serialization to L2 cache, does not preserve encoding and + // encryption details. + return this; + } + + HFileBlock unpacked = new HFileBlock(this); + unpacked.allocateBuffer(); // allocates space for the decompressed block + + HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? + reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); + ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), + unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), + this.getBufferReadOnlyWithHeader().array(), this.headerSize()); + + // Preserve the next block's header bytes in the new block if we have them. + if (unpacked.hasNextBlockHeader()) { + System.arraycopy(this.buf.array(), this.buf.arrayOffset() + this.onDiskDataSizeWithHeader, + unpacked.buf.array(), unpacked.buf.arrayOffset() + unpacked.headerSize() + + unpacked.uncompressedSizeWithoutHeader + unpacked.totalChecksumBytes(), + unpacked.headerSize()); + } + return unpacked; + } + + /** + * Return true when this buffer includes next block's header. + */ + private boolean hasNextBlockHeader() { + return nextBlockOnDiskSizeWithHeader > 0; + } + + /** * Always allocates a new buffer of the correct size. Copies header bytes * from the existing buffer. Does not change header fields. * Reserve room to keep checksum bytes too. - * - * @param extraBytes whether to reserve room in the buffer to read the next - * block's header */ - private void allocateBuffer(boolean extraBytes) { + private void allocateBuffer() { int cksumBytes = totalChecksumBytes(); - int capacityNeeded = headerSize() + uncompressedSizeWithoutHeader + - cksumBytes + - (extraBytes ? headerSize() : 0); + int headerSize = headerSize(); + int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + + cksumBytes + (hasNextBlockHeader() ? headerSize : 0); ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded); // Copy header bytes. System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(), - newBuf.arrayOffset(), headerSize()); + newBuf.arrayOffset(), headerSize); buf = newBuf; - buf.limit(headerSize() + uncompressedSizeWithoutHeader + cksumBytes); + // set limit to exclude next block's header + buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes); + } + + /** + * Return true when this block's buffer has been unpacked, false otherwise. Note this is a + * calculated heuristic, not tracked attribute of the block. + */ + public boolean isUnpacked() { + final int cksumBytes = totalChecksumBytes(); + final int headerSize = headerSize(); + final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes; + final int bufCapacity = buf.capacity(); + return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; } - /** An additional sanity-check in case no compression is being used. */ + /** An additional sanity-check in case no compression or encryption is being used. */ public void assumeUncompressed() throws IOException { - if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + + if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { throw new IOException("Using no compression but " + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " @@ -512,7 +587,7 @@ public class HFileBlock implements Cacheable { } /** - * @return a byte stream reading the data section of this block + * @return a byte stream reading the data + checksum of this block */ public DataInputStream getByteStream() { return new DataInputStream(new ByteArrayInputStream(buf.array(), @@ -588,7 +663,6 @@ public class HFileBlock implements Cacheable { return nextBlockOnDiskSizeWithHeader; } - /** * Unified version 2 {@link HFile} block writer. The intended usage pattern * is as follows: @@ -631,7 +705,7 @@ public class HFileBlock implements Cacheable { /** * Current block type. Set in {@link #startWriting(BlockType)}. Could be - * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} + * changed in {@link #finishBlock()} from {@link BlockType#DATA} * to {@link BlockType#ENCODED_DATA}. */ private BlockType blockType; @@ -648,7 +722,7 @@ public class HFileBlock implements Cacheable { /** * Bytes to be written to the file system, including the header. Compressed - * if compression is turned on. It also includes the checksum data that + * if compression is turned on. It also includes the checksum data that * immediately follows the block data. (header + data + checksums) */ private byte[] onDiskBytesWithHeader; @@ -1008,6 +1082,19 @@ public class HFileBlock implements Cacheable { return ByteBuffer.wrap(uncompressedBytesWithHeader); } + /** + * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is + * needed for storing packed blocks in the block cache. Expects calling semantics identical to + * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data, + * Does not include checksum data. + * + * @return packed block bytes for caching on write + */ + ByteBuffer getOnDiskBufferWithHeader() { + expectState(State.BLOCK_READY); + return ByteBuffer.wrap(onDiskBytesWithHeader); + } + private void expectState(State expectedState) { if (state != expectedState) { throw new IllegalStateException("Expected state: " + expectedState + @@ -1038,7 +1125,7 @@ public class HFileBlock implements Cacheable { * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a * 0 value in bytesPerChecksum. */ - public HFileBlock getBlockForCaching() { + public HFileBlock getBlockForCaching(CacheConfig cacheConf) { HFileContext newContext = new HFileContextBuilder() .withBlockSize(fileContext.getBlocksize()) .withBytesPerCheckSum(0) @@ -1051,7 +1138,10 @@ public class HFileBlock implements Cacheable { .withIncludesTags(fileContext.isIncludesTags()) .build(); return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), - getUncompressedSizeWithoutHeader(), prevOffset, getUncompressedBufferWithHeader(), + getUncompressedSizeWithoutHeader(), prevOffset, + cacheConf.shouldCacheCompressed(blockType.getCategory()) ? + getOnDiskBufferWithHeader() : + getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset, onDiskBytesWithHeader.length + onDiskChecksum.length, newContext); } @@ -1109,7 +1199,7 @@ public class HFileBlock implements Cacheable { /** * Creates a block iterator over the given portion of the {@link HFile}. * The iterator returns blocks starting with offset such that offset <= - * startOffset < endOffset. + * startOffset < endOffset. Returned blocks are always unpacked. * * @param startOffset the offset of the block to start iteration with * @param endOffset the offset to end iteration at (exclusive) @@ -1119,6 +1209,12 @@ public class HFileBlock implements Cacheable { /** Closes the backing streams */ void closeStreams() throws IOException; + + /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */ + HFileBlockDecodingContext getBlockDecodingContext(); + + /** Get the default decoder for blocks from this file. */ + HFileBlockDecodingContext getDefaultBlockDecodingContext(); } /** @@ -1159,6 +1255,7 @@ public class HFileBlock implements Cacheable { @Override public BlockIterator blockRange(final long startOffset, final long endOffset) { + final FSReader owner = this; // handle for inner class return new BlockIterator() { private long offset = startOffset; @@ -1168,7 +1265,7 @@ public class HFileBlock implements Cacheable { return null; HFileBlock b = readBlockData(offset, -1, -1, false); offset += b.getOnDiskSizeWithHeader(); - return b; + return b.unpack(fileContext, owner); } @Override @@ -1274,7 +1371,8 @@ public class HFileBlock implements Cacheable { private HFileBlockDecodingContext encodedBlockDecodingCtx; - private HFileBlockDefaultDecodingContext defaultDecodingCtx; + /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ + private final HFileBlockDefaultDecodingContext defaultDecodingCtx; private ThreadLocal prefetchedHeaderForThread = new ThreadLocal() { @@ -1290,10 +1388,8 @@ public class HFileBlock implements Cacheable { this.streamWrapper = stream; // Older versions of HBase didn't support checksum. this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); - defaultDecodingCtx = - new HFileBlockDefaultDecodingContext(fileContext); - encodedBlockDecodingCtx = - new HFileBlockDefaultDecodingContext(fileContext); + defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext); + encodedBlockDecodingCtx = defaultDecodingCtx; } /** @@ -1434,9 +1530,8 @@ public class HFileBlock implements Cacheable { HFileBlock b = null; if (onDiskSizeWithHeader > 0) { - // We know the total on-disk size but not the uncompressed size. Read - // the entire block into memory, then parse the header and decompress - // from memory if using compression. This code path is used when + // We know the total on-disk size. Read the entire block into memory, + // then parse the header. This code path is used when // doing a random read operation relying on the block index, as well as // when the client knows the on-disk size from peeking into the next // block's header (e.g. this block's header) when reading the previous @@ -1444,7 +1539,8 @@ public class HFileBlock implements Cacheable { // Size that we have to skip in case we have already read the header. int preReadHeaderSize = headerBuf == null ? 0 : hdrSize; - onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; + onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the + // next block's header nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); @@ -1457,11 +1553,10 @@ public class HFileBlock implements Cacheable { headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize); } // We know the total on-disk size but not the uncompressed size. Read - // the entire block into memory, then parse the header and decompress - // from memory if using compression. Here we have already read the - // block's header + // the entire block into memory, then parse the header. Here we have + // already read the block's header try { - b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum()); + b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum()); } catch (IOException ex) { // Seen in load testing. Provide comprehensive debug info. throw new IOException("Failed to read compressed block at " @@ -1499,66 +1594,34 @@ public class HFileBlock implements Cacheable { readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false, offset, pread); } - b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum()); + b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum()); onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize]; - System.arraycopy(headerBuf.array(), - headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); + System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader() - hdrSize, true, offset + hdrSize, pread); onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize; } - Algorithm compressAlgo = fileContext.getCompression(); - boolean isCompressed = - compressAlgo != null - && compressAlgo != Compression.Algorithm.NONE; - - Encryption.Context cryptoContext = fileContext.getEncryptionContext(); - boolean isEncrypted = cryptoContext != null - && cryptoContext != Encryption.Context.NONE; - - if (!isCompressed && !isEncrypted) { + if (!fileContext.isCompressedOrEncrypted()) { b.assumeUncompressed(); } - if (verifyChecksum && - !validateBlockChecksum(b, onDiskBlock, hdrSize)) { + if (verifyChecksum && !validateBlockChecksum(b, onDiskBlock, hdrSize)) { return null; // checksum mismatch } - if (isCompressed || isEncrypted) { - // This will allocate a new buffer but keep header bytes. - b.allocateBuffer(nextBlockOnDiskSize > 0); - if (b.blockType == BlockType.ENCODED_DATA) { - encodedBlockDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(), - b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock, - hdrSize); - } else { - defaultDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(), - b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock, - hdrSize); - } - if (nextBlockOnDiskSize > 0) { - // Copy next block's header bytes into the new block if we have them. - System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(), - b.buf.arrayOffset() + hdrSize - + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), - hdrSize); - } - } else { - // The onDiskBlock will become the headerAndDataBuffer for this block. - // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already - // contains the header of next block, so no need to set next - // block's header in it. - b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, - onDiskSizeWithHeader), this.fileContext.isUseHBaseChecksum()); - } + // The onDiskBlock will become the headerAndDataBuffer for this block. + // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already + // contains the header of next block, so no need to set next + // block's header in it. + b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader), + this.fileContext.isUseHBaseChecksum()); b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize; // Set prefetched header - if (b.nextBlockOnDiskSizeWithHeader > 0) { + if (b.hasNextBlockHeader()) { prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader(); System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize); @@ -1578,37 +1641,53 @@ public class HFileBlock implements Cacheable { encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext); } + @Override + public HFileBlockDecodingContext getBlockDecodingContext() { + return this.encodedBlockDecodingCtx; + } + + @Override + public HFileBlockDecodingContext getDefaultBlockDecodingContext() { + return this.defaultDecodingCtx; + } + /** * Generates the checksum for the header as well as the data and * then validates that it matches the value stored in the header. * If there is a checksum mismatch, then return false. Otherwise * return true. */ - protected boolean validateBlockChecksum(HFileBlock block, - byte[] data, int hdrSize) throws IOException { - return ChecksumUtil.validateBlockChecksum(path, block, - data, hdrSize); + protected boolean validateBlockChecksum(HFileBlock block, byte[] data, int hdrSize) + throws IOException { + return ChecksumUtil.validateBlockChecksum(path, block, data, hdrSize); } @Override public void closeStreams() throws IOException { streamWrapper.close(); } + + @Override + public String toString() { + return "FSReaderV2 [ hfs=" + hfs + " path=" + path + " fileContext=" + fileContext + " ]"; + } } @Override public int getSerializedLength() { if (buf != null) { - return this.buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE; + // include extra bytes for the next header when it's available. + int extraSpace = hasNextBlockHeader() ? headerSize() : 0; + return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE; } return 0; } @Override public void serialize(ByteBuffer destination) { - ByteBuffer dupBuf = this.buf.duplicate(); - dupBuf.rewind(); - destination.put(dupBuf); + // assumes HeapByteBuffer + destination.put(this.buf.array(), this.buf.arrayOffset(), + getSerializedLength() - EXTRA_SERIALIZATION_SPACE); serializeExtraInfo(destination); } @@ -1656,13 +1735,9 @@ public class HFileBlock implements Cacheable { if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { return false; } - if (this.buf.compareTo(castedComparison.buf) != 0) { - return false; - } - if (this.buf.position() != castedComparison.buf.position()){ - return false; - } - if (this.buf.limit() != castedComparison.buf.limit()){ + if (Bytes.compareTo(this.buf.array(), this.buf.arrayOffset(), this.buf.limit(), + castedComparison.buf.array(), castedComparison.buf.arrayOffset(), + castedComparison.buf.limit()) != 0) { return false; } return true; @@ -1683,6 +1758,7 @@ public class HFileBlock implements Cacheable { return this.fileContext.getBytesPerChecksum(); } + /** @return the size of data on disk + header. Excludes checksum. */ int getOnDiskDataSizeWithHeader() { return this.onDiskDataSizeWithHeader; } @@ -1736,6 +1812,10 @@ public class HFileBlock implements Cacheable { return DUMMY_HEADER_NO_CHECKSUM; } + /** + * @return the HFileContext used to create this HFileBlock. Not necessary the + * fileContext for the file from which this block's data was originally read. + */ public HFileContext getHFileContext() { return this.fileContext; } @@ -1748,7 +1828,7 @@ public class HFileBlock implements Cacheable { static String toStringHeader(ByteBuffer buf) throws IOException { int offset = buf.arrayOffset(); byte[] b = buf.array(); - long magic = Bytes.toLong(b, offset); + long magic = Bytes.toLong(b, offset); BlockType bt = BlockType.read(buf); offset += Bytes.SIZEOF_LONG; int compressedBlockSizeNoHeader = Bytes.toInt(b, offset); @@ -1775,4 +1855,3 @@ public class HFileBlock implements Cacheable { " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader; } } - diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index f7b5b9d..1073e93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -772,7 +772,7 @@ public class HFileBlockIndex { * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The * initial value accounts for the root level, and will be increased to two * as soon as we find out there is a leaf-level in - * {@link #blockWritten(long, int)}. + * {@link #blockWritten(long, int, int)}. */ private int numLevels = 1; @@ -798,8 +798,8 @@ public class HFileBlockIndex { /** Whether we require this block index to always be single-level. */ private boolean singleLevelOnly; - /** Block cache, or null if cache-on-write is disabled */ - private BlockCache blockCache; + /** CacheConfig, or null if cache-on-write is disabled */ + private CacheConfig cacheConf; /** Name to use for computing cache keys */ private String nameForCaching; @@ -814,18 +814,17 @@ public class HFileBlockIndex { * Creates a multi-level block index writer. * * @param blockWriter the block writer to use to write index blocks - * @param blockCache if this is not null, index blocks will be cached - * on write into this block cache. + * @param cacheConf used to determine when and how a block should be cached-on-write. */ public BlockIndexWriter(HFileBlock.Writer blockWriter, - BlockCache blockCache, String nameForCaching) { - if ((blockCache == null) != (nameForCaching == null)) { + CacheConfig cacheConf, String nameForCaching) { + if ((cacheConf == null) != (nameForCaching == null)) { throw new IllegalArgumentException("Block cache and file name for " + "caching must be both specified or both null"); } this.blockWriter = blockWriter; - this.blockCache = blockCache; + this.cacheConf = cacheConf; this.nameForCaching = nameForCaching; this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE; } @@ -979,9 +978,9 @@ public class HFileBlockIndex { byte[] curFirstKey = curChunk.getBlockKey(0); blockWriter.writeHeaderAndData(out); - if (blockCache != null) { - HFileBlock blockForCaching = blockWriter.getBlockForCaching(); - blockCache.cacheBlock(new BlockCacheKey(nameForCaching, + if (cacheConf != null) { + HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf); + cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching, beginOffset), blockForCaching); } @@ -1090,8 +1089,7 @@ public class HFileBlockIndex { * entry referring to that block to the parent-level index. */ @Override - public void blockWritten(long offset, int onDiskSize, int uncompressedSize) - { + public void blockWritten(long offset, int onDiskSize, int uncompressedSize) { // Add leaf index block size totalBlockOnDiskSize += onDiskSize; totalBlockUncompressedSize += uncompressedSize; @@ -1156,7 +1154,7 @@ public class HFileBlockIndex { */ @Override public boolean getCacheOnWrite() { - return blockCache != null; + return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 1292319..6f67df3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -249,6 +249,10 @@ public class HFileReaderV2 extends AbstractHFileReader { return new ScannerV2(this, cacheBlocks, pread, isCompaction); } + /** + * Retrieve block from cache. Validates the retrieved block's type vs {@code expectedBlockType} + * and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary. + */ private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock, boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException { @@ -258,6 +262,9 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, updateCacheMetrics); if (cachedBlock != null) { + if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { + cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader); + } validateBlockType(cachedBlock, expectedBlockType); if (expectedDataBlockEncoding == null) { @@ -337,6 +344,7 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true, BlockType.META, null); if (cachedBlock != null) { + assert cachedBlock.isUnpacked() : "Packed block leak."; // Return a distinct 'shallow copy' of the block, // so pos does not get messed by the scanner return cachedBlock.getBufferWithoutHeader(); @@ -345,7 +353,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, - blockSize, -1, true); + blockSize, -1, true).unpack(hfileContext, fsBlockReader); // Cache the block if (cacheBlock) { @@ -359,7 +367,7 @@ public class HFileReaderV2 extends AbstractHFileReader { /** * Read in a file block of the given {@link BlockType} and - * {@link DataBlockEncoding}. + * {@link DataBlockEncoding}. Unpacks the block as necessary. * @param dataBlockOffset offset to read. * @param onDiskBlockSize size of the block * @param cacheBlock @@ -400,8 +408,7 @@ public class HFileReaderV2 extends AbstractHFileReader { // the other choice is to duplicate work (which the cache would prevent you // from doing). - BlockCacheKey cacheKey = - new BlockCacheKey(name, dataBlockOffset); + BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset); boolean useLock = false; IdLock.Entry lockEntry = null; @@ -419,7 +426,7 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction, updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); if (cachedBlock != null) { - validateBlockType(cachedBlock, expectedBlockType); + assert cachedBlock.isUnpacked() : "Packed block leak."; if (cachedBlock.getBlockType().isData()) { if (updateCacheMetrics) { HFile.dataBlockReadCnt.incrementAndGet(); @@ -448,18 +455,21 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, pread); validateBlockType(hfileBlock, expectedBlockType); + HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); + BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); // Cache the block if necessary - if (cacheBlock && cacheConf.shouldCacheBlockOnRead(hfileBlock.getBlockType().getCategory())) { - cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), - this.cacheConf.isCacheDataInL1()); + if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { + cacheConf.getBlockCache().cacheBlock(cacheKey, + cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked, + cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1()); } if (updateCacheMetrics && hfileBlock.getBlockType().isData()) { HFile.dataBlockReadCnt.incrementAndGet(); } - return hfileBlock; + return unpacked; } } finally { traceScope.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index e6201bf..1d71f3e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -118,7 +118,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter, - cacheIndexesOnWrite ? cacheConf.getBlockCache(): null, + cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null); dataBlockIndexWriter.setMaxChunkSize( HFileBlockIndex.getMaxChunkSize(conf)); @@ -143,7 +143,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { newBlock(); } - /** Clean up the current block */ + /** Clean up the current data block */ private void finishBlock() throws IOException { if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return; @@ -191,7 +191,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { * the cache key. */ private void doCacheOnWrite(long offset) { - HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(); + HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf); cacheConf.getBlockCache().cacheBlock( new BlockCacheKey(name, offset), cacheFormatBlock); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 1fc2ecb..deeac13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.base.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -340,12 +341,35 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); long newSize = updateSizeMetrics(cb, false); map.put(cacheKey, cb); - elements.incrementAndGet(); + long val = elements.incrementAndGet(); + if (LOG.isTraceEnabled()) { + long size = map.size(); + assertCounterSanity(size, val); + } if (newSize > acceptableSize() && !evictionInProgress) { runEviction(); } } + /** + * Sanity-checking for parity between actual block cache content and metrics. + * Intended only for use with TRACE level logging and -ea JVM. + */ + private static void assertCounterSanity(long mapSize, long counterVal) { + if (counterVal < 0) { + LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal + + ", mapSize=" + mapSize); + return; + } + if (mapSize < Integer.MAX_VALUE) { + double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.); + if (pct_diff > 0.05) { + LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal + + ", mapSize=" + mapSize); + } + } + } + private int compare(Cacheable left, Cacheable right) { ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength()); left.serialize(l); @@ -459,7 +483,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) { map.remove(block.getCacheKey()); updateSizeMetrics(block, true); - elements.decrementAndGet(); + long val = elements.decrementAndGet(); + if (LOG.isTraceEnabled()) { + long size = map.size(); + assertCounterSanity(size, val); + } stats.evicted(block.getCachedTime()); if (evictedByEvictionProcess && victimHandler != null) { boolean wait = getCurrentSize() < acceptableSize(); @@ -503,9 +531,12 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { if(bytesToFree <= 0) return; // Instantiate priority buckets - BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize()); - BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize()); - BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize()); + BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, + singleSize()); + BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, + multiSize()); + BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, + memorySize()); // Scan entire map putting into appropriate buckets for(LruCachedBlock cachedBlock : map.values()) { @@ -534,7 +565,15 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { // so the single and multi buckets will be emptied bytesFreed = bucketSingle.free(s); bytesFreed += bucketMulti.free(m); + if (LOG.isTraceEnabled()) { + LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + + " from single and multi buckets"); + } bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); + if (LOG.isTraceEnabled()) { + LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + + " total from all three buckets "); + } } else { // this means no need to evict block in memory bucket, // and we try best to make the ratio between single-bucket and @@ -596,6 +635,23 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } } + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("blockCount", getBlockCount()) + .add("currentSize", getCurrentSize()) + .add("freeSize", getFreeSize()) + .add("maxSize", getMaxSize()) + .add("heapSize", heapSize()) + .add("minSize", minSize()) + .add("minFactor", minFactor) + .add("multiSize", multiSize()) + .add("multiFactor", multiFactor) + .add("singleSize", singleSize()) + .add("singleFactor", singleFactor) + .toString(); + } + /** * Used to group blocks into priority buckets. There will be a BlockBucket * for each priority (single, multi, memory). Once bucketed, the eviction @@ -603,11 +659,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { * to configuration parameters and their relatives sizes. */ private class BlockBucket implements Comparable { + + private final String name; private LruCachedBlockQueue queue; private long totalSize = 0; private long bucketSize; - public BlockBucket(long bytesToFree, long blockSize, long bucketSize) { + public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) { + this.name = name; this.bucketSize = bucketSize; queue = new LruCachedBlockQueue(bytesToFree, blockSize); totalSize = 0; @@ -619,6 +678,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } public long free(long toFree) { + if (LOG.isTraceEnabled()) { + LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this); + } LruCachedBlock cb; long freedBytes = 0; while ((cb = queue.pollLast()) != null) { @@ -627,6 +689,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return freedBytes; } } + if (LOG.isTraceEnabled()) { + LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this); + } return freedBytes; } @@ -653,8 +718,16 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { @Override public int hashCode() { - // Nothing distingushing about each instance unless I pass in a 'name' or something - return super.hashCode(); + return Objects.hashCode(name, bucketSize, queue, totalSize); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("name", name) + .add("totalSize", StringUtils.byteDesc(totalSize)) + .add("bucketSize", StringUtils.byteDesc(bucketSize)) + .toString(); } } @@ -769,6 +842,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + "max=" + StringUtils.byteDesc(this.maxSize) + ", " + + "blockCount=" + getBlockCount() + ", " + "accesses=" + stats.getRequestCount() + ", " + "hits=" + stats.getHitCount() + ", " + "hitRatio=" + (stats.getHitCount() == 0 ? @@ -940,6 +1014,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } /** Clears the cache. Used in tests. */ + @VisibleForTesting public void clearCache() { this.map.clear(); this.elements.set(0); @@ -949,6 +1024,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { * Used in testing. May be very inefficient. * @return the set of cached file names */ + @VisibleForTesting SortedSet getCachedFileNamesForTest() { SortedSet fileNames = new TreeSet(); for (BlockCacheKey cacheKey : map.keySet()) { @@ -969,6 +1045,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return counts; } + @VisibleForTesting public Map getEncodingCountsForTest() { Map counts = new EnumMap(DataBlockEncoding.class); @@ -986,6 +1063,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { victimHandler = handler; } + @VisibleForTesting + Map getMapForTests() { + return map; + } + BucketCache getVictimHandler() { return this.victimHandler; } @@ -994,4 +1076,4 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { public BlockCache[] getBlockCaches() { return null; } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java index 343caad..2aacc67 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java @@ -182,10 +182,9 @@ public class TestCacheConfig { if (sizing) { long originalSize = bc.getCurrentSize(); bc.cacheBlock(bck, c, cc.isInMemory(), cc.isCacheDataInL1()); - long size = bc.getCurrentSize(); assertTrue(bc.getCurrentSize() > originalSize); bc.evictBlock(bck); - size = bc.getCurrentSize(); + long size = bc.getCurrentSize(); assertEquals(originalSize, size); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index c7050c8..d8c8669 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -84,6 +85,7 @@ public class TestCacheOnWrite { private final Compression.Algorithm compress; private final BlockEncoderTestType encoderType; private final HFileDataBlockEncoder encoder; + private final boolean cacheCompressedData; private static final int DATA_BLOCK_SIZE = 2048; private static final int NUM_KV = 25000; @@ -154,14 +156,15 @@ public class TestCacheOnWrite { } } - public TestCacheOnWrite(CacheOnWriteType cowType, - Compression.Algorithm compress, BlockEncoderTestType encoderType) { + public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, + BlockEncoderTestType encoderType, boolean cacheCompressedData) { this.cowType = cowType; this.compress = compress; this.encoderType = encoderType; this.encoder = encoderType.getEncoder(); - testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + - ", encoderType=" + encoderType + "]"; + this.cacheCompressedData = cacheCompressedData; + testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + + ", encoderType=" + encoderType + ", cacheCompressedData=" + cacheCompressedData + "]"; System.out.println(testDescription); } @@ -173,7 +176,9 @@ public class TestCacheOnWrite { HBaseTestingUtility.COMPRESSION_ALGORITHMS) { for (BlockEncoderTestType encoderType : BlockEncoderTestType.values()) { - cowTypes.add(new Object[] { cowType, compress, encoderType }); + for (boolean cacheCompressedData : new boolean[] { false, true }) { + cowTypes.add(new Object[] { cowType, compress, encoderType, cacheCompressedData }); + } } } } @@ -189,11 +194,12 @@ public class TestCacheOnWrite { conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE); conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, - cowType.shouldBeCached(BlockType.DATA)); + cowType.shouldBeCached(BlockType.DATA)); conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.LEAF_INDEX)); conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.BLOOM_CHUNK)); + conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData); cowType.modifyConf(conf); fs = HFileSystem.get(conf); cacheConf = new CacheConfig(conf); @@ -225,6 +231,10 @@ public class TestCacheOnWrite { reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf); } LOG.info("HFile information: " + reader); + HFileContext meta = new HFileContextBuilder().withCompression(compress) + .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) + .withBlockSize(DATA_BLOCK_SIZE).withDataBlockEncoding(encoder.getDataBlockEncoding()) + .withIncludesTags(useTags).build(); final boolean cacheBlocks = false; final boolean pread = false; HFileScanner scanner = reader.getScanner(cacheBlocks, pread); @@ -248,16 +258,36 @@ public class TestCacheOnWrite { false, true, null, encodingInCache); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); - boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null; + HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); + boolean isCached = fromCache != null; boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); - if (shouldBeCached != isCached) { - throw new AssertionError( - "shouldBeCached: " + shouldBeCached+ "\n" + - "isCached: " + isCached + "\n" + - "Test description: " + testDescription + "\n" + - "block: " + block + "\n" + - "encodingInCache: " + encodingInCache + "\n" + - "blockCacheKey: " + blockCacheKey); + assertTrue("shouldBeCached: " + shouldBeCached+ "\n" + + "isCached: " + isCached + "\n" + + "Test description: " + testDescription + "\n" + + "block: " + block + "\n" + + "encodingInCache: " + encodingInCache + "\n" + + "blockCacheKey: " + blockCacheKey, + shouldBeCached == isCached); + if (isCached) { + if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) { + if (compress != Compression.Algorithm.NONE) { + assertFalse(fromCache.isUnpacked()); + } + fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader()); + } else { + assertTrue(fromCache.isUnpacked()); + } + // block we cached at write-time and block read from file should be identical + assertEquals(block.getChecksumType(), fromCache.getChecksumType()); + assertEquals(block.getBlockType(), fromCache.getBlockType()); + if (block.getBlockType() == BlockType.ENCODED_DATA) { + assertEquals(block.getDataBlockEncodingId(), fromCache.getDataBlockEncodingId()); + assertEquals(block.getDataBlockEncoding(), fromCache.getDataBlockEncoding()); + } + assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader()); + assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader()); + assertEquals( + block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader()); } prevBlock = block; offset += block.getOnDiskSizeWithHeader(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index 20a450c..8b29ea1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -124,7 +124,7 @@ public class TestChecksum { assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); // read data back from the hfile, exclude header and checksum - ByteBuffer bb = b.getBufferWithoutHeader(); // read back data + ByteBuffer bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data DataInputStream in = new DataInputStream( new ByteArrayInputStream( bb.array(), bb.arrayOffset(), bb.limit())); @@ -163,6 +163,7 @@ public class TestChecksum { b = hbr.readBlockData(0, -1, -1, pread); is.close(); b.sanityCheck(); + b = b.unpack(meta, hbr); assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); @@ -272,12 +273,7 @@ public class TestChecksum { // validate data for (int i = 0; i < 1234; i++) { int val = in.readInt(); - if (val != i) { - String msg = "testChecksumCorruption: data mismatch at index " + - i + " expected " + i + " found " + val; - LOG.warn(msg); - assertEquals(i, val); - } + assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java index 7ed3959..35b4c61 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java @@ -80,14 +80,15 @@ public class TestForceCacheImportantBlocks { @Parameters public static Collection parameters() { // HFile versions - return Arrays.asList(new Object[][] { - new Object[] { new Integer(2), false }, - new Object[] { new Integer(3), true } - }); + return Arrays.asList( + new Object[] { 2, true }, + new Object[] { 2, false }, + new Object[] { 3, true }, + new Object[] { 3, false } + ); } - public TestForceCacheImportantBlocks(int hfileVersion, - boolean cfCacheEnabled) { + public TestForceCacheImportantBlocks(int hfileVersion, boolean cfCacheEnabled) { this.hfileVersion = hfileVersion; this.cfCacheEnabled = cfCacheEnabled; TEST_UTIL.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, hfileVersion); @@ -110,9 +111,9 @@ public class TestForceCacheImportantBlocks { hcd.setBlocksize(BLOCK_SIZE); hcd.setBlockCacheEnabled(cfCacheEnabled); HRegion region = TEST_UTIL.createTestRegion(TABLE, hcd); + BlockCache cache = region.getStore(hcd.getName()).getCacheConfig().getBlockCache(); + CacheStats stats = cache.getStats(); writeTestData(region); - CacheStats stats = - region.getStores().get(hcd.getName()).getCacheConfig().getBlockCache().getStats(); assertEquals(0, stats.getHitCount()); assertEquals(0, HFile.dataBlockReadCnt.get()); // Do a single get, take count of caches. If we are NOT caching DATA blocks, the miss @@ -141,4 +142,4 @@ public class TestForceCacheImportantBlocks { } } } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index ef9a74f..11ac986 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -316,7 +316,8 @@ public class TestHFile extends HBaseTestCase { ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false); ByteBuffer expected = ByteBuffer.wrap(("something to test" + i).getBytes()); - assertTrue("failed to match metadata", actual.compareTo(expected) == 0); + assertEquals("failed to match metadata", + Bytes.toStringBinary(expected), Bytes.toStringBinary(actual)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index bc09c88..96247c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -69,6 +67,7 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import org.mockito.Mockito; @Category(MediumTests.class) @RunWith(Parameterized.class) @@ -234,8 +233,14 @@ public class TestHFileBlock { @Test public void testNoCompression() throws IOException { - assertEquals(4000, createTestV2Block(NONE, includesMemstoreTS, false). - getBlockForCaching().getUncompressedSizeWithoutHeader()); + CacheConfig cacheConf = Mockito.mock(CacheConfig.class); + Mockito.when(cacheConf.isBlockCacheEnabled()).thenReturn(false); + + HFileBlock block = + createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf); + assertEquals(4000, block.getUncompressedSizeWithoutHeader()); + assertEquals(4004, block.getOnDiskSizeWithoutHeader()); + assertTrue(block.isUnpacked()); } @Test @@ -316,14 +321,14 @@ public class TestHFileBlock { assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); - String blockStr = b.toString(); + HFileBlock expected = b; if (algo == GZ) { is = fs.open(path); hbr = new HFileBlock.FSReaderV2(is, totalSize, meta); b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), -1, pread); - assertEquals(blockStr, b.toString()); + assertEquals(expected, b); int wrongCompressedSize = 2172; try { b = hbr.readBlockData(0, wrongCompressedSize @@ -409,20 +414,35 @@ public class TestHFileBlock { HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, meta); hbr.setDataBlockEncoder(dataBlockEncoder); hbr.setIncludesMemstoreTS(includesMemstoreTS); - HFileBlock b; + HFileBlock blockFromHFile, blockUnpacked; int pos = 0; for (int blockId = 0; blockId < numBlocks; ++blockId) { - b = hbr.readBlockData(pos, -1, -1, pread); + blockFromHFile = hbr.readBlockData(pos, -1, -1, pread); assertEquals(0, HFile.getChecksumFailuresCount()); - b.sanityCheck(); - pos += b.getOnDiskSizeWithHeader(); + blockFromHFile.sanityCheck(); + pos += blockFromHFile.getOnDiskSizeWithHeader(); assertEquals((int) encodedSizes.get(blockId), - b.getUncompressedSizeWithoutHeader()); - ByteBuffer actualBuffer = b.getBufferWithoutHeader(); + blockFromHFile.getUncompressedSizeWithoutHeader()); + assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked()); + long packedHeapsize = blockFromHFile.heapSize(); + blockUnpacked = blockFromHFile.unpack(meta, hbr); + assertTrue(blockUnpacked.isUnpacked()); + if (meta.isCompressedOrEncrypted()) { + LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked + .heapSize()); + assertFalse(packedHeapsize == blockUnpacked.heapSize()); + assertTrue("Packed heapSize should be < unpacked heapSize", + packedHeapsize < blockUnpacked.heapSize()); + } + ByteBuffer actualBuffer = blockUnpacked.getBufferWithoutHeader(); if (encoding != DataBlockEncoding.NONE) { // We expect a two-byte big-endian encoding id. - assertEquals(0, actualBuffer.get(0)); - assertEquals(encoding.getId(), actualBuffer.get(1)); + assertEquals( + "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread), + Long.toHexString(0), Long.toHexString(actualBuffer.get(0))); + assertEquals( + "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread), + Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1))); actualBuffer.position(2); actualBuffer = actualBuffer.slice(); } @@ -432,6 +452,22 @@ public class TestHFileBlock { // test if content matches, produce nice message assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, pread); + + // test serialized blocks + for (boolean reuseBuffer : new boolean[] { false, true }) { + ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength()); + blockFromHFile.serialize(serialized); + HFileBlock deserialized = + (HFileBlock) blockFromHFile.getDeserializer().deserialize(serialized, reuseBuffer); + assertEquals( + "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer, + blockFromHFile, deserialized); + // intentional reference comparison + if (blockFromHFile != blockUnpacked) { + assertEquals("Deserializaed block cannot be unpacked correctly.", + blockUnpacked, deserialized.unpack(meta, hbr)); + } + } } is.close(); } @@ -439,6 +475,11 @@ public class TestHFileBlock { } } + static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding, + boolean pread) { + return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread); + } + static void assertBuffersEqual(ByteBuffer expectedBuffer, ByteBuffer actualBuffer, Compression.Algorithm compression, DataBlockEncoding encoding, boolean pread) { @@ -451,9 +492,8 @@ public class TestHFileBlock { } fail(String.format( - "Content mismath for compression %s, encoding %s, " + - "pread %s, commonPrefix %d, expected %s, got %s", - compression, encoding, pread, prefix, + "Content mismatch for %s, commonPrefix %d, expected %s, got %s", + buildMessageDetails(compression, encoding, pread), prefix, nextBytesToStr(expectedBuffer, prefix), nextBytesToStr(actualBuffer, prefix))); } @@ -476,6 +516,7 @@ public class TestHFileBlock { } protected void testPreviousOffsetInternals() throws IOException { + // TODO: parameterize these nested loops. for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { for (boolean pread : BOOLEAN_VALUES) { for (boolean cacheOnWrite : BOOLEAN_VALUES) { @@ -545,8 +586,10 @@ public class TestHFileBlock { curOffset += b.getOnDiskSizeWithHeader(); if (cacheOnWrite) { - // In the cache-on-write mode we store uncompressed bytes so we - // can compare them to what was read by the block reader. + // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply + // verifies that the unpacked value read back off disk matches the unpacked value + // generated before writing to disk. + b = b.unpack(meta, hbr); // b's buffer has header + data + checksum while // expectedContents have header + data only ByteBuffer bufRead = b.getBufferWithHeader(); @@ -565,11 +608,10 @@ public class TestHFileBlock { + algo + ", pread=" + pread + ", cacheOnWrite=" + cacheOnWrite + "):\n"; wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(), - bufExpected.arrayOffset(), Math.min(32, - bufExpected.limit())) + bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) + ", actual:\n" + Bytes.toStringBinary(bufRead.array(), - bufRead.arrayOffset(), Math.min(32, bufRead.limit())); + bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit())); if (detailedLogging) { LOG.warn("expected header" + HFileBlock.toStringHeader(bufExpected) + @@ -758,6 +800,7 @@ public class TestHFileBlock { if (detailedLogging) { LOG.info("Written block #" + i + " of type " + bt + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader() + + ", packed size " + hbw.getOnDiskSizeWithoutHeader() + " at offset " + pos); } } @@ -806,7 +849,4 @@ public class TestHFileBlock { block.heapSize()); } } - - } - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java index 88fdb77..166e2cb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -205,7 +203,7 @@ public class TestHFileBlockCompatibility { assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); - String blockStr = b.toString(); + HFileBlock expected = b; if (algo == GZ) { is = fs.open(path); @@ -213,7 +211,7 @@ public class TestHFileBlockCompatibility { meta); b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + b.totalChecksumBytes(), -1, pread); - assertEquals(blockStr, b.toString()); + assertEquals(expected, b); int wrongCompressedSize = 2172; try { b = hbr.readBlockData(0, wrongCompressedSize @@ -301,6 +299,10 @@ public class TestHFileBlockCompatibility { for (int blockId = 0; blockId < numBlocks; ++blockId) { b = hbr.readBlockData(pos, -1, -1, pread); b.sanityCheck(); + if (meta.isCompressedOrEncrypted()) { + assertFalse(b.isUnpacked()); + b = b.unpack(meta, hbr); + } pos += b.getOnDiskSizeWithHeader(); assertEquals((int) encodedSizes.get(blockId), @@ -335,7 +337,7 @@ public class TestHFileBlockCompatibility { * in this class but the code in HFileBlock.Writer will continually * evolve. */ - public static final class Writer extends HFileBlock.Writer{ + public static final class Writer extends HFileBlock.Writer { // These constants are as they were in minorVersion 0. private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; @@ -416,10 +418,6 @@ public class TestHFileBlockCompatibility { private int unencodedDataSizeWritten; - /** - * @param compressionAlgorithm compression algorithm to use - * @param dataBlockEncoderAlgo data block encoding algorithm to use - */ public Writer(Compression.Algorithm compressionAlgorithm, HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) { this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java index 31546e2..6ec45a6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java @@ -17,11 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -53,6 +48,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.*; + @Category(SmallTests.class) public class TestHFileEncryption { private static final Log LOG = LogFactory.getLog(TestHFileEncryption.class); @@ -95,11 +92,13 @@ public class TestHFileEncryption { return hbw.getOnDiskSizeWithHeader(); } - private long readAndVerifyBlock(long pos, HFileBlock.FSReaderV2 hbr, int size) + private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderV2 hbr, int size) throws IOException { HFileBlock b = hbr.readBlockData(pos, -1, -1, false); assertEquals(0, HFile.getChecksumFailuresCount()); b.sanityCheck(); + assertFalse(b.isUnpacked()); + b = b.unpack(ctx, hbr); LOG.info("Read a block at " + pos + " with" + " onDiskSizeWithHeader=" + b.getOnDiskSizeWithHeader() + " uncompressedSizeWithoutHeader=" + b.getOnDiskSizeWithoutHeader() + @@ -142,7 +141,7 @@ public class TestHFileEncryption { HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, fileContext); long pos = 0; for (int i = 0; i < blocks; i++) { - pos += readAndVerifyBlock(pos, hbr, blockSizes[i]); + pos += readAndVerifyBlock(pos, fileContext, hbr, blockSizes[i]); } } finally { is.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java index 27e7051..b27f5b3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; @@ -170,8 +171,8 @@ public class TestHFileWriterV2 { // Meta index. metaBlockIndexReader.readRootIndex( - blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(), - trailer.getMetaIndexCount()); + blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX) + .getByteStream(), trailer.getMetaIndexCount()); // File info FileInfo fileInfo = new FileInfo(); fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); @@ -191,6 +192,10 @@ public class TestHFileWriterV2 { while (curBlockPos <= trailer.getLastDataBlockOffset()) { HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); assertEquals(BlockType.DATA, block.getBlockType()); + if (meta.isCompressedOrEncrypted()) { + assertFalse(block.isUnpacked()); + block = block.unpack(meta, blockReader); + } ByteBuffer buf = block.getBufferWithoutHeader(); while (buf.hasRemaining()) { int keyLen = buf.getInt(); @@ -232,7 +237,8 @@ public class TestHFileWriterV2 { while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + trailer.getLoadOnOpenDataOffset()); - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + .unpack(meta, blockReader); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); ByteBuffer buf = block.getBufferWithoutHeader(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java index 8b92c56..b19efff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java @@ -191,8 +191,7 @@ public class TestHFileWriterV3 { // Data index. We also read statistics about the block index written after // the root level. dataBlockIndexReader.readMultiLevelIndexRoot( - blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), - trailer.getDataIndexCount()); + blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount()); if (findMidKey) { byte[] midkey = dataBlockIndexReader.midkey(); @@ -201,8 +200,8 @@ public class TestHFileWriterV3 { // Meta index. metaBlockIndexReader.readRootIndex( - blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(), - trailer.getMetaIndexCount()); + blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX) + .getByteStream(), trailer.getMetaIndexCount()); // File info FileInfo fileInfo = new FileInfo(); fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); @@ -220,7 +219,8 @@ public class TestHFileWriterV3 { fsdis.seek(0); long curBlockPos = 0; while (curBlockPos <= trailer.getLastDataBlockOffset()) { - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + .unpack(context, blockReader); assertEquals(BlockType.DATA, block.getBlockType()); ByteBuffer buf = block.getBufferWithoutHeader(); int keyLen = -1; @@ -278,7 +278,8 @@ public class TestHFileWriterV3 { while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + trailer.getLoadOnOpenDataOffset()); - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + .unpack(context, blockReader); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); ByteBuffer buf = block.getBufferWithoutHeader(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java new file mode 100644 index 0000000..e752dd2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import com.google.common.collect.Iterables; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.*; + +/** + * A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig}, + * and {@link LruBlockCache}. + */ +@Category(SmallTests.class) +@RunWith(Parameterized.class) +public class TestLazyDataBlockDecompression { + private static final Log LOG = LogFactory.getLog(TestLazyDataBlockDecompression.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private FileSystem fs; + + @Parameterized.Parameter(0) + public boolean cacheOnWrite; + + @Parameterized.Parameters + public static Iterable data() { + return Arrays.asList(new Object[][] { + { false }, + { true } + }); + } + + @Before + public void setUp() throws IOException { + CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; + fs = FileSystem.get(TEST_UTIL.getConfiguration()); + } + + @After + public void tearDown() { + CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; + fs = null; + } + + /** + * Write {@code entryCount} random keyvalues to a new HFile at {@code path}. Returns the row + * bytes of the KeyValues written, in the order they were written. + */ + private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path, + HFileContext cxt, int entryCount) throws IOException { + HFileWriterV2 writer = (HFileWriterV2) + new HFileWriterV2.WriterFactoryV2(conf, cc) + .withPath(fs, path) + .withFileContext(cxt) + .create(); + + // write a bunch of random kv's + Random rand = new Random(9713312); // some seed. + final byte[] family = Bytes.toBytes("f"); + final byte[] qualifier = Bytes.toBytes("q"); + + for (int i = 0; i < entryCount; i++) { + byte[] keyBytes = TestHFileWriterV2.randomOrderedKey(rand, i); + byte[] valueBytes = TestHFileWriterV2.randomValue(rand); + // make a real keyvalue so that hfile tool can examine it + writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes)); + } + writer.close(); + } + + /** + * Read all blocks from {@code path} to populate {@code blockCache}. + */ + private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs, + Path path, HFileContext cxt) throws IOException { + FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path); + long fileSize = fs.getFileStatus(path).getLen(); + FixedFileTrailer trailer = + FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize); + HFileReaderV2 reader = new HFileReaderV2(path, trailer, fsdis, fileSize, cacheConfig, + fsdis.getHfs(), conf); + reader.loadFileInfo(); + long offset = trailer.getFirstDataBlockOffset(), + max = trailer.getLastDataBlockOffset(); + List blocks = new ArrayList(4); + HFileBlock block; + while (offset <= max) { + block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false, + /* isCompaction */ false, /* updateCacheMetrics */ true, null, null); + offset += block.getOnDiskSizeWithHeader(); + blocks.add(block); + } + LOG.info("read " + Iterables.toString(blocks)); + } + + @Test + public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception { + // enough room for 2 uncompressed block + int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1); + Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), + "testCompressionIncreasesEffectiveBlockcacheSize"); + HFileContext context = new HFileContextBuilder() + .withCompression(Compression.Algorithm.GZ) + .build(); + LOG.info("context=" + context); + + // setup cache with lazy-decompression disabled. + Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); + lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); + lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); + lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); + CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = + new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled); + CacheConfig cc = new CacheConfig(lazyCompressDisabled); + assertFalse(cc.shouldCacheDataCompressed()); + assertTrue(cc.getBlockCache() instanceof LruBlockCache); + LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache(); + LOG.info("disabledBlockCache=" + disabledBlockCache); + assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize()); + assertTrue("eviction thread spawned unintentionally.", + disabledBlockCache.getEvictionThread() == null); + assertEquals("freshly created blockcache contains blocks.", + 0, disabledBlockCache.getBlockCount()); + + // 2000 kv's is ~3.6 full unencoded data blocks. + // Requires a conf and CacheConfig but should not be specific to this instance's cache settings + writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000); + + // populate the cache + cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context); + long disabledBlockCount = disabledBlockCache.getBlockCount(); + assertTrue("blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount, + disabledBlockCount > 0); + long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount(); + for (Map.Entry e : + disabledBlockCache.getMapForTests().entrySet()) { + HFileBlock block = (HFileBlock) e.getValue().getBuffer(); + assertTrue("found a packed block, block=" + block, block.isUnpacked()); + } + + // count blocks with lazy decompression + Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); + lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); + lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); + lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); + CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = + new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled); + cc = new CacheConfig(lazyCompressEnabled); + assertTrue("test improperly configured.", cc.shouldCacheDataCompressed()); + assertTrue(cc.getBlockCache() instanceof LruBlockCache); + LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache(); + LOG.info("enabledBlockCache=" + enabledBlockCache); + assertEquals("test inconsistency detected", maxSize, enabledBlockCache.getMaxSize()); + assertTrue("eviction thread spawned unintentionally.", + enabledBlockCache.getEvictionThread() == null); + assertEquals("freshly created blockcache contains blocks.", + 0, enabledBlockCache.getBlockCount()); + + cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context); + long enabledBlockCount = enabledBlockCache.getBlockCount(); + assertTrue("blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount, + enabledBlockCount > 0); + long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount(); + int candidatesFound = 0; + for (Map.Entry e : + enabledBlockCache.getMapForTests().entrySet()) { + candidatesFound++; + HFileBlock block = (HFileBlock) e.getValue().getBuffer(); + if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) { + assertFalse("found an unpacked block, block=" + block + ", block buffer capacity=" + + block.getBufferWithoutHeader().capacity(), block.isUnpacked()); + } + } + assertTrue("did not find any candidates for compressed caching. Invalid test.", + candidatesFound > 0); + + LOG.info("disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + + enabledBlockCount); + assertTrue("enabling compressed data blocks should increase the effective cache size. " + + "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + + enabledBlockCount, disabledBlockCount < enabledBlockCount); + + LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" + + enabledEvictedCount); + assertTrue("enabling compressed data blocks should reduce the number of evictions. " + + "disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" + + enabledEvictedCount, enabledEvictedCount < disabledEvictedCount); + } +} -- 1.9.0