From 8a7189f4e5da400e31af07af4f4a8f9d40c9db60 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Fri, 1 Aug 2014 17:00:56 -0700 Subject: [PATCH] HBASE-11331 [blockcache] lazy block decompression --- .../apache/hadoop/hbase/io/hfile/HFileContext.java | 17 + .../hbase/tmpl/regionserver/BlockCacheTmpl.jamon | 6 +- .../apache/hadoop/hbase/io/hfile/CacheConfig.java | 52 +-- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 350 ++++++++++++--------- .../hadoop/hbase/io/hfile/HFileBlockIndex.java | 26 +- .../hadoop/hbase/io/hfile/HFileReaderV2.java | 26 +- .../hadoop/hbase/io/hfile/HFileWriterV2.java | 8 +- .../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 60 +++- .../apache/hadoop/hbase/io/hfile/TestChecksum.java | 10 +- .../io/hfile/TestForceCacheImportantBlocks.java | 19 +- .../apache/hadoop/hbase/io/hfile/TestHFile.java | 3 +- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 52 ++- .../io/hfile/TestHFileBlockCompatibility.java | 14 +- .../hadoop/hbase/io/hfile/TestHFileEncryption.java | 13 +- .../hadoop/hbase/io/hfile/TestHFileWriterV2.java | 12 +- .../hadoop/hbase/io/hfile/TestHFileWriterV3.java | 13 +- 16 files changed, 401 insertions(+), 280 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java index 3299e41..ca5f25a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java @@ -96,6 +96,23 @@ public class HFileContext implements HeapSize, Cloneable { this.cryptoContext = cryptoContext; } + /** + * @return true when on-disk blocks from this file are compressed, and/or encrypted; + * false otherwise. + */ + public boolean isCompressedOrEncrypted() { + Compression.Algorithm compressAlgo = getCompression(); + boolean isCompressed = + compressAlgo != null + && compressAlgo != Compression.Algorithm.NONE; + + Encryption.Context cryptoContext = getEncryptionContext(); + boolean isEncrypted = cryptoContext != null + && cryptoContext != Encryption.Context.NONE; + + return isCompressed || isEncrypted; + } + public Compression.Algorithm getCompression() { return compressAlgo; } diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon index 702122a..5a19a43 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon @@ -157,9 +157,9 @@ org.apache.hadoop.util.StringUtils; reader is closed - Compress blocks - <% cacheConfig.shouldCacheCompressed() %> - True if blocks are compressed in cache + Compress data blocks + <% cacheConfig.shouldCacheDataCompressed() %> + True if data blocks are compressed in cache Prefetch on Open diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 25f9727..6dc8480 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -61,11 +61,10 @@ public class CacheConfig { "hfile.block.bloom.cacheonwrite"; /** - * TODO: Implement this (jgray) - * Configuration key to cache data blocks in compressed format. + * Configuration key to cache data blocks in compressed and/or encrypted format. */ public static final String CACHE_DATA_BLOCKS_COMPRESSED_KEY = - "hbase.rs.blockcache.cachedatacompressed"; + "hbase.block.data.cachecompressed"; /** * Configuration key to evict all blocks of a given file from the block cache @@ -123,7 +122,6 @@ public class CacheConfig { public static final boolean DEFAULT_BUCKET_CACHE_COMBINED = true; public static final int DEFAULT_BUCKET_CACHE_WRITER_THREADS = 3; public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64; - public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 0.9f; /** * Configuration key to prefetch all blocks of a given file into the block cache @@ -140,7 +138,7 @@ public class CacheConfig { public static final boolean DEFAULT_CACHE_INDEXES_ON_WRITE = false; public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false; public static final boolean DEFAULT_EVICT_ON_CLOSE = false; - public static final boolean DEFAULT_COMPRESSED_CACHE = false; + public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false; public static final boolean DEFAULT_PREFETCH_ON_OPEN = false; /** Local reference to the block cache, null if completely disabled */ @@ -169,8 +167,8 @@ public class CacheConfig { /** Whether blocks of a file should be evicted when the file is closed */ private boolean evictOnClose; - /** Whether data blocks should be stored in compressed form in the cache */ - private final boolean cacheCompressed; + /** Whether data blocks should be stored in compressed and/or encrypted form in the cache */ + private final boolean cacheDataCompressed; /** Whether data blocks should be prefetched into the cache */ private final boolean prefetchOnOpen; @@ -202,7 +200,7 @@ public class CacheConfig { DEFAULT_CACHE_BLOOMS_ON_WRITE) || family.shouldCacheBloomsOnWrite(), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(), - conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE), + conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED), conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen(), conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, @@ -221,13 +219,10 @@ public class CacheConfig { DEFAULT_IN_MEMORY, // This is a family-level setting so can't be set // strictly from conf conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), - conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, - DEFAULT_CACHE_INDEXES_ON_WRITE), - conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, - DEFAULT_CACHE_BLOOMS_ON_WRITE), + conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_INDEXES_ON_WRITE), + conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_BLOOMS_ON_WRITE), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), - conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, - DEFAULT_COMPRESSED_CACHE), + conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED), conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN), conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) @@ -245,7 +240,7 @@ public class CacheConfig { * @param cacheIndexesOnWrite whether index blocks should be cached on write * @param cacheBloomsOnWrite whether blooms should be cached on write * @param evictOnClose whether blocks should be evicted when HFile is closed - * @param cacheCompressed whether to store blocks as compressed in the cache + * @param cacheDataCompressed whether to store blocks as compressed in the cache * @param prefetchOnOpen whether to prefetch blocks upon open * @param cacheDataInL1 If more than one cache tier deployed, if true, cache this column families * data blocks up in the L1 tier. @@ -254,7 +249,7 @@ public class CacheConfig { final boolean cacheDataOnRead, final boolean inMemory, final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheBloomsOnWrite, final boolean evictOnClose, - final boolean cacheCompressed, final boolean prefetchOnOpen, + final boolean cacheDataCompressed, final boolean prefetchOnOpen, final boolean cacheDataInL1) { this.blockCache = blockCache; this.cacheDataOnRead = cacheDataOnRead; @@ -263,7 +258,7 @@ public class CacheConfig { this.cacheIndexesOnWrite = cacheIndexesOnWrite; this.cacheBloomsOnWrite = cacheBloomsOnWrite; this.evictOnClose = evictOnClose; - this.cacheCompressed = cacheCompressed; + this.cacheDataCompressed = cacheDataCompressed; this.prefetchOnOpen = prefetchOnOpen; this.cacheDataInL1 = cacheDataInL1; LOG.info(this); @@ -277,7 +272,7 @@ public class CacheConfig { this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory, cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose, - cacheConf.cacheCompressed, cacheConf.prefetchOnOpen, + cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen, cacheConf.cacheDataInL1); } @@ -397,10 +392,23 @@ public class CacheConfig { } /** - * @return true if blocks should be compressed in the cache, false if not + * @return true if data blocks should be compressed in the cache, false if not + */ + public boolean shouldCacheDataCompressed() { + return isBlockCacheEnabled() && this.cacheDataCompressed; + } + + /** + * @return true if this {@link BlockCategory} should be compressed in ths cache, false otherwise. */ - public boolean shouldCacheCompressed() { - return isBlockCacheEnabled() && this.cacheCompressed; + public boolean shouldCacheCompressed(BlockCategory category) { + if (!isBlockCacheEnabled()) return false; + switch (category) { + case DATA: + return this.cacheDataCompressed; + default: + return false; + } } /** @@ -421,7 +429,7 @@ public class CacheConfig { ", cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() + ", cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() + ", cacheEvictOnClose=" + shouldEvictOnClose() + - ", cacheCompressed=" + shouldCacheCompressed() + + ", cacheDataCompressed=" + shouldCacheDataCompressed() + ", prefetchOnOpen=" + shouldPrefetchOnOpen(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 3e26107..c1e16b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -36,9 +36,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; -import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; @@ -64,25 +61,26 @@ import com.google.common.base.Preconditions; * information from the block index are required to read a block. *
  • In version 2 a block is structured as follows: * * - * The version 2 block representation in the block cache is the same as above, - * except that the data section is always uncompressed in the cache. */ @InterfaceAudience.Private public class HFileBlock implements Cacheable { @@ -111,7 +109,7 @@ public class HFileBlock implements Cacheable { ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false); // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader - public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT + public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG; /** @@ -155,23 +153,25 @@ public class HFileBlock implements Cacheable { .registerDeserializer(blockDeserializer); } + /** Type of block. Header field 0. */ private BlockType blockType; - /** Size on disk without the header. It includes checksum data too. */ + /** Size on disk excluding header, including checksum. Header field 1. */ private int onDiskSizeWithoutHeader; - /** Size of pure data. Does not include header or checksums */ + /** Size of pure data. Does not include header or checksums. Header field 2. */ private final int uncompressedSizeWithoutHeader; - /** The offset of the previous block on disk */ + /** The offset of the previous block on disk. Header field 3. */ private final long prevBlockOffset; - /** Size on disk of header and data. Does not include checksum data */ + /** Size on disk of header + data. Excludes checksum. Header field 6. */ private final int onDiskDataSizeWithHeader; /** The in-memory representation of the hfile block */ private ByteBuffer buf; - /** Meta data that holds meta information on the hfileblock**/ + + /** Meta data that holds meta information on the hfileblock */ private HFileContext fileContext; /** @@ -193,27 +193,18 @@ public class HFileBlock implements Cacheable { * and is sitting in a byte buffer. * * @param blockType the type of this block, see {@link BlockType} - * @param onDiskSizeWithoutHeader compressed size of the block if compression - * is used, otherwise uncompressed size, header size not included - * @param uncompressedSizeWithoutHeader uncompressed size of the block, - * header size not included. Equals onDiskSizeWithoutHeader if - * compression is disabled. - * @param prevBlockOffset the offset of the previous block in the - * {@link HFile} + * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} + * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} + * @param prevBlockOffset see {@link #prevBlockOffset} * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by * uncompressed data. This - * @param fillHeader true to fill in the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of - * the buffer based on the header fields provided + * @param fillHeader when true, parse {@code buf} and override the first 4 header fields. * @param offset the file offset the block was read from - * @param bytesPerChecksum the number of bytes per checksum chunk - * @param checksumType the checksum algorithm to use - * @param onDiskDataSizeWithHeader size of header and data on disk not - * including checksum data + * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} * @param fileContext HFile meta data */ - HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, - int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf, - boolean fillHeader, long offset, + HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, + long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset, int onDiskDataSizeWithHeader, HFileContext fileContext) { this.blockType = blockType; this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; @@ -228,6 +219,21 @@ public class HFileBlock implements Cacheable { } /** + * Copy constructor. Creates a shallow copy of {@code that}'s buffer. + */ + HFileBlock(HFileBlock that) { + this.blockType = that.blockType; + this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader; + this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader; + this.prevBlockOffset = that.prevBlockOffset; + this.buf = that.buf.duplicate(); + this.offset = that.offset; + this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader; + this.fileContext = that.fileContext; + this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader; + } + + /** * Creates a block from an existing buffer starting with a header. Rewinds * and takes ownership of the buffer. By definition of rewind, ignores the * buffer position, but if you slice the buffer beforehand, it will rewind @@ -272,28 +278,21 @@ public class HFileBlock implements Cacheable { } /** - * @return the on-disk size of the block with header size included. This - * includes the header, the data and the checksum data. + * @return the on-disk size of header + data part + checksum. */ public int getOnDiskSizeWithHeader() { return onDiskSizeWithoutHeader + headerSize(); } /** - * Returns the size of the compressed part of the block in case compression - * is used, or the uncompressed size of the data part otherwise. Header size - * and checksum data size is not included. - * - * @return the on-disk size of the data part of the block, header and - * checksum not included. + * @return the on-disk size of the data part + checksum (header excluded). */ public int getOnDiskSizeWithoutHeader() { return onDiskSizeWithoutHeader; } /** - * @return the uncompressed size of the data part of the block, header not - * included + * @return the uncompressed size of data part (header and checksum excluded). */ public int getUncompressedSizeWithoutHeader() { return uncompressedSizeWithoutHeader; @@ -308,8 +307,8 @@ public class HFileBlock implements Cacheable { } /** - * Writes header fields into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the - * buffer. Resets the buffer position to the end of header as side effect. + * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position + * is modified as side-effect. */ private void overwriteHeader() { buf.rewind(); @@ -320,11 +319,9 @@ public class HFileBlock implements Cacheable { } /** - * Returns a buffer that does not include the header. The array offset points - * to the start of the block data right after the header. The underlying data - * array is not copied. Checksum data is not included in the returned buffer. + * Returns a buffer that does not include the header or checksum. * - * @return the buffer with header skipped + * @return the buffer with header skipped and checksum omitted. */ public ByteBuffer getBufferWithoutHeader() { return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(), @@ -336,7 +333,7 @@ public class HFileBlock implements Cacheable { * modify the buffer object. This method has to be public because it is * used in {@link CompoundBloomFilter} to avoid object creation on every * Bloom filter lookup, but has to be used with caution. Checksum data - * is not included in the returned buffer. + * is not included in the returned buffer but header data is. * * @return the buffer of this block for read-only operations */ @@ -350,17 +347,17 @@ public class HFileBlock implements Cacheable { * not modify the buffer object. This method has to be public because it is * used in {@link BucketCache} to avoid buffer copy. * - * @return the byte buffer with header included for read-only operations + * @return the buffer with header and checksum included for read-only operations */ public ByteBuffer getBufferReadOnlyWithHeader() { return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice(); } /** - * Returns a byte buffer of this block, including header data, positioned at + * Returns a byte buffer of this block, including header data and checksum, positioned at * the beginning of header. The underlying data array is not copied. * - * @return the byte buffer with header included + * @return the byte buffer with header and checksum included */ ByteBuffer getBufferWithHeader() { ByteBuffer dupBuf = buf.duplicate(); @@ -376,22 +373,25 @@ public class HFileBlock implements Cacheable { } } + private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) + throws IOException { + if (valueFromBuf != valueFromField) { + throw new IOException("Block type stored in the buffer: " + + valueFromBuf + ", block type field: " + valueFromField); + } + } + /** * Checks if the block is internally consistent, i.e. the first - * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent - * with the fields. This function is primary for testing and debugging, and - * is not thread-safe, because it alters the internal buffer pointer. + * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a + * valid header consistent with the fields. Assumes a packed block structure. + * This function is primary for testing and debugging, and is not + * thread-safe, because it alters the internal buffer pointer. */ void sanityCheck() throws IOException { buf.rewind(); - { - BlockType blockTypeFromBuf = BlockType.read(buf); - if (blockTypeFromBuf != blockType) { - throw new IOException("Block type stored in the buffer: " + - blockTypeFromBuf + ", block type field: " + blockType); - } - } + sanityCheckAssertion(BlockType.read(buf), blockType); sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); @@ -403,26 +403,23 @@ public class HFileBlock implements Cacheable { if (this.fileContext.isUseHBaseChecksum()) { sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum"); - sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, - "onDiskDataSizeWithHeader"); + sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); } int cksumBytes = totalChecksumBytes(); - int hdrSize = headerSize(); - int expectedBufLimit = uncompressedSizeWithoutHeader + headerSize() + - cksumBytes; + int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; if (buf.limit() != expectedBufLimit) { throw new AssertionError("Expected buffer limit " + expectedBufLimit + ", got " + buf.limit()); } // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next - // block's, header, so there are two sensible values for buffer capacity. - int size = uncompressedSizeWithoutHeader + hdrSize + cksumBytes; - if (buf.capacity() != size && - buf.capacity() != size + hdrSize) { + // block's header, so there are two sensible values for buffer capacity. + int hdrSize = headerSize(); + if (buf.capacity() != expectedBufLimit && + buf.capacity() != expectedBufLimit + hdrSize) { throw new AssertionError("Invalid buffer capacity: " + buf.capacity() + - ", expected " + size + " or " + (size + hdrSize)); + ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); } } @@ -457,30 +454,71 @@ public class HFileBlock implements Cacheable { } /** + * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its + * encoded structure. Internal structures are shared between instances where applicable. + */ + HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { + if (!fileContext.isCompressedOrEncrypted()) { + // cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean) does not + // preserve encoding and encryption details. + return this; + } + + HFileBlock unpacked = new HFileBlock(this); + unpacked.allocateBuffer(); // allocates space for the decompressed block + + HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? + reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); + ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), + unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), + this.getBufferReadOnlyWithHeader().array(), this.headerSize()); + + if (unpacked.nextBlockOnDiskSizeWithHeader > 0) { + // Preserve the next block's header bytes in the new block if we have them. + System.arraycopy(this.buf.array(), this.buf.arrayOffset() + this.onDiskDataSizeWithHeader, + unpacked.buf.array(), unpacked.buf.arrayOffset() + unpacked.headerSize() + + unpacked.uncompressedSizeWithoutHeader + unpacked.totalChecksumBytes(), + unpacked.headerSize()); + } + return unpacked; + } + + /** * Always allocates a new buffer of the correct size. Copies header bytes * from the existing buffer. Does not change header fields. * Reserve room to keep checksum bytes too. - * - * @param extraBytes whether to reserve room in the buffer to read the next - * block's header */ - private void allocateBuffer(boolean extraBytes) { + private void allocateBuffer() { int cksumBytes = totalChecksumBytes(); - int capacityNeeded = headerSize() + uncompressedSizeWithoutHeader + - cksumBytes + - (extraBytes ? headerSize() : 0); + int headerSize = headerSize(); + boolean extraBytes = nextBlockOnDiskSizeWithHeader > 0; + int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + + cksumBytes + (extraBytes ? headerSize : 0); ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded); // Copy header bytes. System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(), - newBuf.arrayOffset(), headerSize()); + newBuf.arrayOffset(), headerSize); buf = newBuf; - buf.limit(headerSize() + uncompressedSizeWithoutHeader + cksumBytes); + // set limit to exclude next block's header + buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes); + } + + /** + * Return true when this block's buffer has been unpacked, false otherwise. Note this is a + * calculated heuristic, not tracked attribute of the block. + */ + public boolean isUnpacked() { + final int cksumBytes = totalChecksumBytes(); + final int headerSize = headerSize(); + final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes; + final int bufCapacity = buf.capacity(); + return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; } - /** An additional sanity-check in case no compression is being used. */ + /** An additional sanity-check in case no compression or encryption is being used. */ public void assumeUncompressed() throws IOException { if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { @@ -512,7 +550,7 @@ public class HFileBlock implements Cacheable { } /** - * @return a byte stream reading the data section of this block + * @return a byte stream reading the data + checksum of this block */ public DataInputStream getByteStream() { return new DataInputStream(new ByteArrayInputStream(buf.array(), @@ -588,7 +626,6 @@ public class HFileBlock implements Cacheable { return nextBlockOnDiskSizeWithHeader; } - /** * Unified version 2 {@link HFile} block writer. The intended usage pattern * is as follows: @@ -631,7 +668,7 @@ public class HFileBlock implements Cacheable { /** * Current block type. Set in {@link #startWriting(BlockType)}. Could be - * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} + * changed in {@link #finishBlock()} from {@link BlockType#DATA} * to {@link BlockType#ENCODED_DATA}. */ private BlockType blockType; @@ -648,8 +685,7 @@ public class HFileBlock implements Cacheable { /** * Bytes to be written to the file system, including the header. Compressed - * if compression is turned on. It also includes the checksum data that - * immediately follows the block data. (header + data + checksums) + * if compression is turned on. Does not include checksum bytes. (header + data) */ private byte[] onDiskBytesWithHeader; @@ -1008,6 +1044,19 @@ public class HFileBlock implements Cacheable { return ByteBuffer.wrap(uncompressedBytesWithHeader); } + /** + * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is + * needed for storing packed blocks in the block cache. Expects calling semantics identical to + * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data, + * Does not include checksum data. + * + * @return packed block bytes for caching on write + */ + ByteBuffer getOnDiskBufferWithHeader() { + expectState(State.BLOCK_READY); + return ByteBuffer.wrap(onDiskBytesWithHeader); + } + private void expectState(State expectedState) { if (state != expectedState) { throw new IllegalStateException("Expected state: " + expectedState + @@ -1038,7 +1087,7 @@ public class HFileBlock implements Cacheable { * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a * 0 value in bytesPerChecksum. */ - public HFileBlock getBlockForCaching() { + public HFileBlock getBlockForCaching(CacheConfig cacheConf) { HFileContext newContext = new HFileContextBuilder() .withBlockSize(fileContext.getBlocksize()) .withBytesPerCheckSum(0) @@ -1051,7 +1100,10 @@ public class HFileBlock implements Cacheable { .withIncludesTags(fileContext.isIncludesTags()) .build(); return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), - getUncompressedSizeWithoutHeader(), prevOffset, getUncompressedBufferWithHeader(), + getUncompressedSizeWithoutHeader(), prevOffset, + cacheConf.shouldCacheCompressed(blockType.getCategory()) ? + getOnDiskBufferWithHeader() : + getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset, onDiskBytesWithHeader.length + onDiskChecksum.length, newContext); } @@ -1119,6 +1171,12 @@ public class HFileBlock implements Cacheable { /** Closes the backing streams */ void closeStreams() throws IOException; + + /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */ + HFileBlockDecodingContext getBlockDecodingContext(); + + /** Get the default decoder for blocks from this file. */ + HFileBlockDecodingContext getDefaultBlockDecodingContext(); } /** @@ -1159,6 +1217,7 @@ public class HFileBlock implements Cacheable { @Override public BlockIterator blockRange(final long startOffset, final long endOffset) { + final FSReader owner = this; // handle for inner class return new BlockIterator() { private long offset = startOffset; @@ -1168,7 +1227,7 @@ public class HFileBlock implements Cacheable { return null; HFileBlock b = readBlockData(offset, -1, -1, false); offset += b.getOnDiskSizeWithHeader(); - return b; + return b.unpack(fileContext, owner); } @Override @@ -1274,7 +1333,8 @@ public class HFileBlock implements Cacheable { private HFileBlockDecodingContext encodedBlockDecodingCtx; - private HFileBlockDefaultDecodingContext defaultDecodingCtx; + /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ + private final HFileBlockDefaultDecodingContext defaultDecodingCtx; private ThreadLocal prefetchedHeaderForThread = new ThreadLocal() { @@ -1290,10 +1350,8 @@ public class HFileBlock implements Cacheable { this.streamWrapper = stream; // Older versions of HBase didn't support checksum. this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); - defaultDecodingCtx = - new HFileBlockDefaultDecodingContext(fileContext); - encodedBlockDecodingCtx = - new HFileBlockDefaultDecodingContext(fileContext); + defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext); + encodedBlockDecodingCtx = defaultDecodingCtx; } /** @@ -1434,9 +1492,8 @@ public class HFileBlock implements Cacheable { HFileBlock b = null; if (onDiskSizeWithHeader > 0) { - // We know the total on-disk size but not the uncompressed size. Read - // the entire block into memory, then parse the header and decompress - // from memory if using compression. This code path is used when + // We know the total on-disk size. Read the entire block into memory, + // then parse the header. This code path is used when // doing a random read operation relying on the block index, as well as // when the client knows the on-disk size from peeking into the next // block's header (e.g. this block's header) when reading the previous @@ -1444,7 +1501,8 @@ public class HFileBlock implements Cacheable { // Size that we have to skip in case we have already read the header. int preReadHeaderSize = headerBuf == null ? 0 : hdrSize; - onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; + onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the + // next block's header nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); @@ -1457,11 +1515,10 @@ public class HFileBlock implements Cacheable { headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize); } // We know the total on-disk size but not the uncompressed size. Read - // the entire block into memory, then parse the header and decompress - // from memory if using compression. Here we have already read the - // block's header + // the entire block into memory, then parse the header. Here we have + // already read the block's header try { - b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum()); + b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum()); } catch (IOException ex) { // Seen in load testing. Provide comprehensive debug info. throw new IOException("Failed to read compressed block at " @@ -1499,61 +1556,29 @@ public class HFileBlock implements Cacheable { readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false, offset, pread); } - b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum()); + b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum()); onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize]; - System.arraycopy(headerBuf.array(), - headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); + System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader() - hdrSize, true, offset + hdrSize, pread); onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize; } - Algorithm compressAlgo = fileContext.getCompression(); - boolean isCompressed = - compressAlgo != null - && compressAlgo != Compression.Algorithm.NONE; - - Encryption.Context cryptoContext = fileContext.getEncryptionContext(); - boolean isEncrypted = cryptoContext != null - && cryptoContext != Encryption.Context.NONE; - - if (!isCompressed && !isEncrypted) { + if (!fileContext.isCompressedOrEncrypted()) { b.assumeUncompressed(); } - if (verifyChecksum && - !validateBlockChecksum(b, onDiskBlock, hdrSize)) { + if (verifyChecksum && !validateBlockChecksum(b, onDiskBlock, hdrSize)) { return null; // checksum mismatch } - if (isCompressed || isEncrypted) { - // This will allocate a new buffer but keep header bytes. - b.allocateBuffer(nextBlockOnDiskSize > 0); - if (b.blockType == BlockType.ENCODED_DATA) { - encodedBlockDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(), - b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock, - hdrSize); - } else { - defaultDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(), - b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock, - hdrSize); - } - if (nextBlockOnDiskSize > 0) { - // Copy next block's header bytes into the new block if we have them. - System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(), - b.buf.arrayOffset() + hdrSize - + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), - hdrSize); - } - } else { - // The onDiskBlock will become the headerAndDataBuffer for this block. - // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already - // contains the header of next block, so no need to set next - // block's header in it. - b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, - onDiskSizeWithHeader), this.fileContext.isUseHBaseChecksum()); - } + // The onDiskBlock will become the headerAndDataBuffer for this block. + // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already + // contains the header of next block, so no need to set next + // block's header in it. + b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader), + this.fileContext.isUseHBaseChecksum()); b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize; @@ -1578,16 +1603,25 @@ public class HFileBlock implements Cacheable { encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext); } + @Override + public HFileBlockDecodingContext getBlockDecodingContext() { + return this.encodedBlockDecodingCtx; + } + + @Override + public HFileBlockDecodingContext getDefaultBlockDecodingContext() { + return this.defaultDecodingCtx; + } + /** * Generates the checksum for the header as well as the data and * then validates that it matches the value stored in the header. * If there is a checksum mismatch, then return false. Otherwise * return true. */ - protected boolean validateBlockChecksum(HFileBlock block, - byte[] data, int hdrSize) throws IOException { - return ChecksumUtil.validateBlockChecksum(path, block, - data, hdrSize); + protected boolean validateBlockChecksum(HFileBlock block, byte[] data, int hdrSize) + throws IOException { + return ChecksumUtil.validateBlockChecksum(path, block, data, hdrSize); } @Override @@ -1683,6 +1717,7 @@ public class HFileBlock implements Cacheable { return this.fileContext.getBytesPerChecksum(); } + /** @return the size of data on disk + header. Excludes checksum. */ int getOnDiskDataSizeWithHeader() { return this.onDiskDataSizeWithHeader; } @@ -1736,6 +1771,10 @@ public class HFileBlock implements Cacheable { return DUMMY_HEADER_NO_CHECKSUM; } + /** + * @return the HFileContext used to create this HFileBlock. Not necessary the + * fileContext for the file from which this block's data was originally read. + */ public HFileContext getHFileContext() { return this.fileContext; } @@ -1748,7 +1787,7 @@ public class HFileBlock implements Cacheable { static String toStringHeader(ByteBuffer buf) throws IOException { int offset = buf.arrayOffset(); byte[] b = buf.array(); - long magic = Bytes.toLong(b, offset); + long magic = Bytes.toLong(b, offset); BlockType bt = BlockType.read(buf); offset += Bytes.SIZEOF_LONG; int compressedBlockSizeNoHeader = Bytes.toInt(b, offset); @@ -1775,4 +1814,3 @@ public class HFileBlock implements Cacheable { " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader; } } - diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index f7b5b9d..1073e93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -772,7 +772,7 @@ public class HFileBlockIndex { * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The * initial value accounts for the root level, and will be increased to two * as soon as we find out there is a leaf-level in - * {@link #blockWritten(long, int)}. + * {@link #blockWritten(long, int, int)}. */ private int numLevels = 1; @@ -798,8 +798,8 @@ public class HFileBlockIndex { /** Whether we require this block index to always be single-level. */ private boolean singleLevelOnly; - /** Block cache, or null if cache-on-write is disabled */ - private BlockCache blockCache; + /** CacheConfig, or null if cache-on-write is disabled */ + private CacheConfig cacheConf; /** Name to use for computing cache keys */ private String nameForCaching; @@ -814,18 +814,17 @@ public class HFileBlockIndex { * Creates a multi-level block index writer. * * @param blockWriter the block writer to use to write index blocks - * @param blockCache if this is not null, index blocks will be cached - * on write into this block cache. + * @param cacheConf used to determine when and how a block should be cached-on-write. */ public BlockIndexWriter(HFileBlock.Writer blockWriter, - BlockCache blockCache, String nameForCaching) { - if ((blockCache == null) != (nameForCaching == null)) { + CacheConfig cacheConf, String nameForCaching) { + if ((cacheConf == null) != (nameForCaching == null)) { throw new IllegalArgumentException("Block cache and file name for " + "caching must be both specified or both null"); } this.blockWriter = blockWriter; - this.blockCache = blockCache; + this.cacheConf = cacheConf; this.nameForCaching = nameForCaching; this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE; } @@ -979,9 +978,9 @@ public class HFileBlockIndex { byte[] curFirstKey = curChunk.getBlockKey(0); blockWriter.writeHeaderAndData(out); - if (blockCache != null) { - HFileBlock blockForCaching = blockWriter.getBlockForCaching(); - blockCache.cacheBlock(new BlockCacheKey(nameForCaching, + if (cacheConf != null) { + HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf); + cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching, beginOffset), blockForCaching); } @@ -1090,8 +1089,7 @@ public class HFileBlockIndex { * entry referring to that block to the parent-level index. */ @Override - public void blockWritten(long offset, int onDiskSize, int uncompressedSize) - { + public void blockWritten(long offset, int onDiskSize, int uncompressedSize) { // Add leaf index block size totalBlockOnDiskSize += onDiskSize; totalBlockUncompressedSize += uncompressedSize; @@ -1156,7 +1154,7 @@ public class HFileBlockIndex { */ @Override public boolean getCacheOnWrite() { - return blockCache != null; + return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 1292319..7875716 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -249,6 +249,10 @@ public class HFileReaderV2 extends AbstractHFileReader { return new ScannerV2(this, cacheBlocks, pread, isCompaction); } + /** + * Retrieve block from cache. Validates the retrieved block's type vs {@code expectedBlockType} + * and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary. + */ private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock, boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException { @@ -258,6 +262,9 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, updateCacheMetrics); if (cachedBlock != null) { + if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { + cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader); + } validateBlockType(cachedBlock, expectedBlockType); if (expectedDataBlockEncoding == null) { @@ -337,6 +344,7 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true, BlockType.META, null); if (cachedBlock != null) { + assert cachedBlock.isUnpacked() : "Packed block leak."; // Return a distinct 'shallow copy' of the block, // so pos does not get messed by the scanner return cachedBlock.getBufferWithoutHeader(); @@ -345,7 +353,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, - blockSize, -1, true); + blockSize, -1, true).unpack(hfileContext, fsBlockReader); // Cache the block if (cacheBlock) { @@ -359,7 +367,7 @@ public class HFileReaderV2 extends AbstractHFileReader { /** * Read in a file block of the given {@link BlockType} and - * {@link DataBlockEncoding}. + * {@link DataBlockEncoding}. Unpacks the block as necessary. * @param dataBlockOffset offset to read. * @param onDiskBlockSize size of the block * @param cacheBlock @@ -400,8 +408,7 @@ public class HFileReaderV2 extends AbstractHFileReader { // the other choice is to duplicate work (which the cache would prevent you // from doing). - BlockCacheKey cacheKey = - new BlockCacheKey(name, dataBlockOffset); + BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset); boolean useLock = false; IdLock.Entry lockEntry = null; @@ -419,7 +426,7 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction, updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); if (cachedBlock != null) { - validateBlockType(cachedBlock, expectedBlockType); + assert cachedBlock.isUnpacked() : "Packed block leak."; if (cachedBlock.getBlockType().isData()) { if (updateCacheMetrics) { HFile.dataBlockReadCnt.incrementAndGet(); @@ -448,18 +455,21 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, pread); validateBlockType(hfileBlock, expectedBlockType); + HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); // Cache the block if necessary if (cacheBlock && cacheConf.shouldCacheBlockOnRead(hfileBlock.getBlockType().getCategory())) { - cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), - this.cacheConf.isCacheDataInL1()); + cacheConf.getBlockCache().cacheBlock(cacheKey, + cacheConf.shouldCacheCompressed(hfileBlock.getBlockType().getCategory()) ? + hfileBlock : unpacked, + cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1()); } if (updateCacheMetrics && hfileBlock.getBlockType().isData()) { HFile.dataBlockReadCnt.incrementAndGet(); } - return hfileBlock; + return unpacked; } } finally { traceScope.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index e6201bf..fec118d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -118,7 +118,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter, - cacheIndexesOnWrite ? cacheConf.getBlockCache(): null, + cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null); dataBlockIndexWriter.setMaxChunkSize( HFileBlockIndex.getMaxChunkSize(conf)); @@ -143,7 +143,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { newBlock(); } - /** Clean up the current block */ + /** Clean up the current data block */ private void finishBlock() throws IOException { if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return; @@ -191,9 +191,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { * the cache key. */ private void doCacheOnWrite(long offset) { - HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(); + HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf); cacheConf.getBlockCache().cacheBlock( - new BlockCacheKey(name, offset), cacheFormatBlock); + new BlockCacheKey(name, offset), cacheFormatBlock); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 50a9b9f5..1dd16bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -84,6 +85,7 @@ public class TestCacheOnWrite { private final Compression.Algorithm compress; private final BlockEncoderTestType encoderType; private final HFileDataBlockEncoder encoder; + private final boolean cacheCompressedData; private static final int DATA_BLOCK_SIZE = 2048; private static final int NUM_KV = 25000; @@ -154,14 +156,15 @@ public class TestCacheOnWrite { } } - public TestCacheOnWrite(CacheOnWriteType cowType, - Compression.Algorithm compress, BlockEncoderTestType encoderType) { + public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, + BlockEncoderTestType encoderType, boolean cacheCompressedData) { this.cowType = cowType; this.compress = compress; this.encoderType = encoderType; this.encoder = encoderType.getEncoder(); - testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + - ", encoderType=" + encoderType + "]"; + this.cacheCompressedData = cacheCompressedData; + testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + + ", encoderType=" + encoderType + ", cacheCompressedData=" + cacheCompressedData + "]"; System.out.println(testDescription); } @@ -173,7 +176,9 @@ public class TestCacheOnWrite { HBaseTestingUtility.COMPRESSION_ALGORITHMS) { for (BlockEncoderTestType encoderType : BlockEncoderTestType.values()) { - cowTypes.add(new Object[] { cowType, compress, encoderType }); + for (boolean cacheCompressedData : new boolean[] { false, true }) { + cowTypes.add(new Object[] { cowType, compress, encoderType, cacheCompressedData }); + } } } } @@ -189,11 +194,12 @@ public class TestCacheOnWrite { conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE); conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, - cowType.shouldBeCached(BlockType.DATA)); + cowType.shouldBeCached(BlockType.DATA)); conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.LEAF_INDEX)); conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.BLOOM_CHUNK)); + conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData); cowType.modifyConf(conf); fs = HFileSystem.get(conf); cacheConf = new CacheConfig(conf); @@ -225,6 +231,10 @@ public class TestCacheOnWrite { reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf); } LOG.info("HFile information: " + reader); + HFileContext meta = new HFileContextBuilder().withCompression(compress) + .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) + .withBlockSize(DATA_BLOCK_SIZE).withDataBlockEncoding(encoder.getDataBlockEncoding()) + .withIncludesTags(useTags).build(); final boolean cacheBlocks = false; final boolean pread = false; HFileScanner scanner = reader.getScanner(cacheBlocks, pread); @@ -248,16 +258,36 @@ public class TestCacheOnWrite { false, true, null, encodingInCache); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); - boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null; + HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); + boolean isCached = fromCache != null; boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); - if (shouldBeCached != isCached) { - throw new AssertionError( - "shouldBeCached: " + shouldBeCached+ "\n" + - "isCached: " + isCached + "\n" + - "Test description: " + testDescription + "\n" + - "block: " + block + "\n" + - "encodingInCache: " + encodingInCache + "\n" + - "blockCacheKey: " + blockCacheKey); + assertTrue("shouldBeCached: " + shouldBeCached+ "\n" + + "isCached: " + isCached + "\n" + + "Test description: " + testDescription + "\n" + + "block: " + block + "\n" + + "encodingInCache: " + encodingInCache + "\n" + + "blockCacheKey: " + blockCacheKey, + shouldBeCached == isCached); + if (isCached) { + if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) { + if (compress != Compression.Algorithm.NONE) { + assertFalse(fromCache.isUnpacked()); + } + fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader()); + } else { + assertTrue(fromCache.isUnpacked()); + } + // block we cached at write-time and block read from file should be identical + assertEquals(block.getChecksumType(), fromCache.getChecksumType()); + assertEquals(block.getBlockType(), fromCache.getBlockType()); + if (block.getBlockType() == BlockType.ENCODED_DATA) { + assertEquals(block.getDataBlockEncodingId(), fromCache.getDataBlockEncodingId()); + assertEquals(block.getDataBlockEncoding(), fromCache.getDataBlockEncoding()); + } + assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskDataSizeWithHeader()); + assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader()); + assertEquals( + block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader()); } prevBlock = block; offset += block.getOnDiskSizeWithHeader(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index 020a293..f70976f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -125,7 +125,7 @@ public class TestChecksum { assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); // read data back from the hfile, exclude header and checksum - ByteBuffer bb = b.getBufferWithoutHeader(); // read back data + ByteBuffer bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data DataInputStream in = new DataInputStream( new ByteArrayInputStream( bb.array(), bb.arrayOffset(), bb.limit())); @@ -164,6 +164,7 @@ public class TestChecksum { b = hbr.readBlockData(0, -1, -1, pread); is.close(); b.sanityCheck(); + b = b.unpack(meta, hbr); assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); @@ -274,12 +275,7 @@ public class TestChecksum { // validate data for (int i = 0; i < 1234; i++) { int val = in.readInt(); - if (val != i) { - String msg = "testChecksumCorruption: data mismatch at index " + - i + " expected " + i + " found " + val; - LOG.warn(msg); - assertEquals(i, val); - } + assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java index 7ed3959..35b4c61 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java @@ -80,14 +80,15 @@ public class TestForceCacheImportantBlocks { @Parameters public static Collection parameters() { // HFile versions - return Arrays.asList(new Object[][] { - new Object[] { new Integer(2), false }, - new Object[] { new Integer(3), true } - }); + return Arrays.asList( + new Object[] { 2, true }, + new Object[] { 2, false }, + new Object[] { 3, true }, + new Object[] { 3, false } + ); } - public TestForceCacheImportantBlocks(int hfileVersion, - boolean cfCacheEnabled) { + public TestForceCacheImportantBlocks(int hfileVersion, boolean cfCacheEnabled) { this.hfileVersion = hfileVersion; this.cfCacheEnabled = cfCacheEnabled; TEST_UTIL.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, hfileVersion); @@ -110,9 +111,9 @@ public class TestForceCacheImportantBlocks { hcd.setBlocksize(BLOCK_SIZE); hcd.setBlockCacheEnabled(cfCacheEnabled); HRegion region = TEST_UTIL.createTestRegion(TABLE, hcd); + BlockCache cache = region.getStore(hcd.getName()).getCacheConfig().getBlockCache(); + CacheStats stats = cache.getStats(); writeTestData(region); - CacheStats stats = - region.getStores().get(hcd.getName()).getCacheConfig().getBlockCache().getStats(); assertEquals(0, stats.getHitCount()); assertEquals(0, HFile.dataBlockReadCnt.get()); // Do a single get, take count of caches. If we are NOT caching DATA blocks, the miss @@ -141,4 +142,4 @@ public class TestForceCacheImportantBlocks { } } } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index ef9a74f..11ac986 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -316,7 +316,8 @@ public class TestHFile extends HBaseTestCase { ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false); ByteBuffer expected = ByteBuffer.wrap(("something to test" + i).getBytes()); - assertTrue("failed to match metadata", actual.compareTo(expected) == 0); + assertEquals("failed to match metadata", + Bytes.toStringBinary(expected), Bytes.toStringBinary(actual)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 09561cb..5db7ff6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -69,6 +67,7 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import org.mockito.Mockito; @Category(MediumTests.class) @RunWith(Parameterized.class) @@ -235,8 +234,14 @@ public class TestHFileBlock { @Test public void testNoCompression() throws IOException { - assertEquals(4000, createTestV2Block(NONE, includesMemstoreTS, false). - getBlockForCaching().getUncompressedSizeWithoutHeader()); + CacheConfig cacheConf = Mockito.mock(CacheConfig.class); + Mockito.when(cacheConf.isBlockCacheEnabled()).thenReturn(false); + + HFileBlock block = + createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf); + assertEquals(4000, block.getUncompressedSizeWithoutHeader()); + assertEquals(4004, block.getOnDiskSizeWithoutHeader()); + assertTrue(block.isUnpacked()); } @Test @@ -419,13 +424,19 @@ public class TestHFileBlock { assertEquals(0, HFile.getChecksumFailuresCount()); b.sanityCheck(); pos += b.getOnDiskSizeWithHeader(); - assertEquals((int) encodedSizes.get(blockId), - b.getUncompressedSizeWithoutHeader()); + assertEquals((int) encodedSizes.get(blockId), b.getUncompressedSizeWithoutHeader()); + assertEquals(meta.isCompressedOrEncrypted(), !b.isUnpacked()); + b = b.unpack(meta, hbr); + assertTrue(b.isUnpacked()); ByteBuffer actualBuffer = b.getBufferWithoutHeader(); if (encoding != DataBlockEncoding.NONE) { // We expect a two-byte big-endian encoding id. - assertEquals(0, actualBuffer.get(0)); - assertEquals(encoding.getId(), actualBuffer.get(1)); + assertEquals( + "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread), + Long.toHexString(0), Long.toHexString(actualBuffer.get(0))); + assertEquals( + "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread), + Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1))); actualBuffer.position(2); actualBuffer = actualBuffer.slice(); } @@ -442,6 +453,11 @@ public class TestHFileBlock { } } + static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding, + boolean pread) { + return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread); + } + static void assertBuffersEqual(ByteBuffer expectedBuffer, ByteBuffer actualBuffer, Compression.Algorithm compression, DataBlockEncoding encoding, boolean pread) { @@ -454,9 +470,8 @@ public class TestHFileBlock { } fail(String.format( - "Content mismath for compression %s, encoding %s, " + - "pread %s, commonPrefix %d, expected %s, got %s", - compression, encoding, pread, prefix, + "Content mismatch for %s, commonPrefix %d, expected %s, got %s", + buildMessageDetails(compression, encoding, pread), prefix, nextBytesToStr(expectedBuffer, prefix), nextBytesToStr(actualBuffer, prefix))); } @@ -479,6 +494,7 @@ public class TestHFileBlock { } protected void testPreviousOffsetInternals() throws IOException { + // TODO: parameterize these nested loops. for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { for (boolean pread : BOOLEAN_VALUES) { for (boolean cacheOnWrite : BOOLEAN_VALUES) { @@ -548,8 +564,10 @@ public class TestHFileBlock { curOffset += b.getOnDiskSizeWithHeader(); if (cacheOnWrite) { - // In the cache-on-write mode we store uncompressed bytes so we - // can compare them to what was read by the block reader. + // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply + // verifies that the unpacked value read back off disk matches the unpacked value + // generated before writing to disk. + b = b.unpack(meta, hbr); // b's buffer has header + data + checksum while // expectedContents have header + data only ByteBuffer bufRead = b.getBufferWithHeader(); @@ -568,11 +586,10 @@ public class TestHFileBlock { + algo + ", pread=" + pread + ", cacheOnWrite=" + cacheOnWrite + "):\n"; wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(), - bufExpected.arrayOffset(), Math.min(32, - bufExpected.limit())) + bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) + ", actual:\n" + Bytes.toStringBinary(bufRead.array(), - bufRead.arrayOffset(), Math.min(32, bufRead.limit())); + bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit())); if (detailedLogging) { LOG.warn("expected header" + HFileBlock.toStringHeader(bufExpected) + @@ -762,6 +779,7 @@ public class TestHFileBlock { if (detailedLogging) { LOG.info("Written block #" + i + " of type " + bt + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader() + + ", packed size " + hbw.getOnDiskSizeWithoutHeader() + " at offset " + pos); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java index 88fdb77..7928409 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -301,6 +299,10 @@ public class TestHFileBlockCompatibility { for (int blockId = 0; blockId < numBlocks; ++blockId) { b = hbr.readBlockData(pos, -1, -1, pread); b.sanityCheck(); + if (meta.isCompressedOrEncrypted()) { + assertFalse(b.isUnpacked()); + b = b.unpack(meta, hbr); + } pos += b.getOnDiskSizeWithHeader(); assertEquals((int) encodedSizes.get(blockId), @@ -335,7 +337,7 @@ public class TestHFileBlockCompatibility { * in this class but the code in HFileBlock.Writer will continually * evolve. */ - public static final class Writer extends HFileBlock.Writer{ + public static final class Writer extends HFileBlock.Writer { // These constants are as they were in minorVersion 0. private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; @@ -416,10 +418,6 @@ public class TestHFileBlockCompatibility { private int unencodedDataSizeWritten; - /** - * @param compressionAlgorithm compression algorithm to use - * @param dataBlockEncoderAlgo data block encoding algorithm to use - */ public Writer(Compression.Algorithm compressionAlgorithm, HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) { this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java index 31546e2..6ec45a6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java @@ -17,11 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -53,6 +48,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.*; + @Category(SmallTests.class) public class TestHFileEncryption { private static final Log LOG = LogFactory.getLog(TestHFileEncryption.class); @@ -95,11 +92,13 @@ public class TestHFileEncryption { return hbw.getOnDiskSizeWithHeader(); } - private long readAndVerifyBlock(long pos, HFileBlock.FSReaderV2 hbr, int size) + private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderV2 hbr, int size) throws IOException { HFileBlock b = hbr.readBlockData(pos, -1, -1, false); assertEquals(0, HFile.getChecksumFailuresCount()); b.sanityCheck(); + assertFalse(b.isUnpacked()); + b = b.unpack(ctx, hbr); LOG.info("Read a block at " + pos + " with" + " onDiskSizeWithHeader=" + b.getOnDiskSizeWithHeader() + " uncompressedSizeWithoutHeader=" + b.getOnDiskSizeWithoutHeader() + @@ -142,7 +141,7 @@ public class TestHFileEncryption { HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, fileContext); long pos = 0; for (int i = 0; i < blocks; i++) { - pos += readAndVerifyBlock(pos, hbr, blockSizes[i]); + pos += readAndVerifyBlock(pos, fileContext, hbr, blockSizes[i]); } } finally { is.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java index 27e7051..b27f5b3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; @@ -170,8 +171,8 @@ public class TestHFileWriterV2 { // Meta index. metaBlockIndexReader.readRootIndex( - blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(), - trailer.getMetaIndexCount()); + blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX) + .getByteStream(), trailer.getMetaIndexCount()); // File info FileInfo fileInfo = new FileInfo(); fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); @@ -191,6 +192,10 @@ public class TestHFileWriterV2 { while (curBlockPos <= trailer.getLastDataBlockOffset()) { HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); assertEquals(BlockType.DATA, block.getBlockType()); + if (meta.isCompressedOrEncrypted()) { + assertFalse(block.isUnpacked()); + block = block.unpack(meta, blockReader); + } ByteBuffer buf = block.getBufferWithoutHeader(); while (buf.hasRemaining()) { int keyLen = buf.getInt(); @@ -232,7 +237,8 @@ public class TestHFileWriterV2 { while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + trailer.getLoadOnOpenDataOffset()); - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + .unpack(meta, blockReader); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); ByteBuffer buf = block.getBufferWithoutHeader(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java index 8b92c56..b19efff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java @@ -191,8 +191,7 @@ public class TestHFileWriterV3 { // Data index. We also read statistics about the block index written after // the root level. dataBlockIndexReader.readMultiLevelIndexRoot( - blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), - trailer.getDataIndexCount()); + blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount()); if (findMidKey) { byte[] midkey = dataBlockIndexReader.midkey(); @@ -201,8 +200,8 @@ public class TestHFileWriterV3 { // Meta index. metaBlockIndexReader.readRootIndex( - blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(), - trailer.getMetaIndexCount()); + blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX) + .getByteStream(), trailer.getMetaIndexCount()); // File info FileInfo fileInfo = new FileInfo(); fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); @@ -220,7 +219,8 @@ public class TestHFileWriterV3 { fsdis.seek(0); long curBlockPos = 0; while (curBlockPos <= trailer.getLastDataBlockOffset()) { - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + .unpack(context, blockReader); assertEquals(BlockType.DATA, block.getBlockType()); ByteBuffer buf = block.getBufferWithoutHeader(); int keyLen = -1; @@ -278,7 +278,8 @@ public class TestHFileWriterV3 { while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + trailer.getLoadOnOpenDataOffset()); - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + .unpack(context, blockReader); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); ByteBuffer buf = block.getBufferWithoutHeader(); -- 1.9.0