From 6c2a23504b44e0961b37d3eb8746e0c260f51bc2 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Thu, 24 Jan 2013 11:12:48 -0800 Subject: [PATCH] Make backwards compatiblile HFileBlock This involves turning off HBaseChecksums by default. That means that the hfiles written are then in 2.0 format. Turning on hbase.regionserver.checksum.verify will mean that hfiles are written that can not roll back. --- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 150 ++++++++++++++------ .../hadoop/hbase/io/hfile/HFileWriterV2.java | 12 +- src/main/resources/hbase-default.xml | 8 ++ .../hadoop/hbase/io/hfile/CacheTestUtils.java | 2 +- .../apache/hadoop/hbase/io/hfile/TestChecksum.java | 21 +-- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 28 ++-- .../io/hfile/TestHFileBlockCompatibility.java | 16 +-- .../hadoop/hbase/io/hfile/TestHFileBlockIndex.java | 4 +- .../hbase/io/hfile/TestHFileDataBlockEncoder.java | 8 +- .../hadoop/hbase/io/hfile/TestHFileWriterV2.java | 21 ++- 10 files changed, 173 insertions(+), 97 deletions(-) diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index edc633f..6bf7437 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -112,18 +112,18 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { * There is a 1 byte checksum type, followed by a 4 byte bytesPerChecksum * followed by another 4 byte value to store sizeofDataOnDisk. */ - static final int HEADER_SIZE = HEADER_SIZE_NO_CHECKSUM + Bytes.SIZEOF_BYTE + + static final int HEADER_SIZE_WITH_CHECKSUMS = HEADER_SIZE_NO_CHECKSUM + Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT; /** * The size of block header when blockType is {@link BlockType#ENCODED_DATA}. * This extends normal header by adding the id of encoder. */ - public static final int ENCODED_HEADER_SIZE = HEADER_SIZE + public static final int ENCODED_HEADER_SIZE = HEADER_SIZE_WITH_CHECKSUMS + DataBlockEncoding.ID_SIZE; /** Just an array of bytes of the right size. */ - static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE]; + static final byte[] DUMMY_HEADER_WITH_CHECKSUM = new byte[HEADER_SIZE_WITH_CHECKSUMS]; static final byte[] DUMMY_HEADER_NO_CHECKSUM = new byte[HEADER_SIZE_NO_CHECKSUM]; @@ -194,7 +194,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { /** * The on-disk size of the next block, including the header, obtained by - * peeking into the first {@link HEADER_SIZE} bytes of the next block's + * peeking into the first {@link HFileBlock#headerSize(int)} bytes of the next block's * header, or -1 if unknown. */ private int nextBlockOnDiskSizeWithHeader = -1; @@ -212,9 +212,9 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { * compression is disabled. * @param prevBlockOffset the offset of the previous block in the * {@link HFile} - * @param buf block header ({@link #HEADER_SIZE} bytes) followed by + * @param buf block header {@link HFileBlock#headerSize(int)} bytes) followed by * uncompressed data. This - * @param fillHeader true to fill in the first {@link #HEADER_SIZE} bytes of + * @param fillHeader true to fill in the first {@link HFileBlock#headerSize(int)} bytes of * the buffer based on the header fields provided * @param offset the file offset the block was read from * @param minorVersion the minor version of this block @@ -322,7 +322,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { } /** - * Writes header fields into the first {@link HEADER_SIZE} bytes of the + * Writes header fields into the first {@link ©HEADER_SIZE_WITH_CHECKSUMS} bytes of the * buffer. Resets the buffer position to the end of header as side effect. */ private void overwriteHeader() { @@ -395,7 +395,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { /** * Checks if the block is internally consistent, i.e. the first - * {@link #HEADER_SIZE} bytes of the buffer contain a valid header consistent + * {@link HFileBlock#headerSize(int)} bytes of the buffer contain a valid header consistent * with the fields. This function is primary for testing and debugging, and * is not thread-safe, because it alters the internal buffer pointer. */ @@ -433,7 +433,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { + ", got " + buf.limit()); } - // We might optionally allocate HEADER_SIZE more bytes to read the next + // We might optionally allocate HEADER_SIZE_WITH_CHECKSUMS more bytes to read the next // block's, header, so there are two sensible values for buffer capacity. int size = uncompressedSizeWithoutHeader + hdrSize + cksumBytes; if (buf.capacity() != size && @@ -645,7 +645,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { /** * The stream we use to accumulate data in uncompressed format for each * block. We reset this stream at the end of each block and reuse it. The - * header is written as the first {@link #HEADER_SIZE} bytes into this + * header is written as the first {@link HFileBlock#headerSize(int)} bytes into this * stream. */ private ByteArrayOutputStream baosInMemory; @@ -696,7 +696,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { /** * Valid in the READY state. Contains the header and the uncompressed (but * potentially encoded, if this is a data block) bytes, so the length is - * {@link #uncompressedSizeWithoutHeader} + {@link HFileBlock#HEADER_SIZE}. + * {@link #uncompressedSizeWithoutHeader} + {@link HFileBlock#headerSize(int)}. * Does not store checksums. */ private byte[] uncompressedBytesWithHeader; @@ -723,6 +723,8 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { private ChecksumType checksumType; private int bytesPerChecksum; + private final int minorVersion; + /** * @param compressionAlgorithm compression algorithm to use * @param dataBlockEncoderAlgo data block encoding algorithm to use @@ -731,7 +733,9 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { */ public Writer(Compression.Algorithm compressionAlgorithm, HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, + int minorVersion, ChecksumType checksumType, int bytesPerChecksum) { + this.minorVersion = minorVersion; compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm; this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; @@ -749,9 +753,10 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { "for algorithm " + compressionAlgorithm, e); } } - if (bytesPerChecksum < HEADER_SIZE) { + if (minorVersion > MINOR_VERSION_NO_CHECKSUM + && bytesPerChecksum < HEADER_SIZE_WITH_CHECKSUMS) { throw new RuntimeException("Unsupported value of bytesPerChecksum. " + - " Minimum is " + HEADER_SIZE + " but the configured value is " + + " Minimum is " + HEADER_SIZE_WITH_CHECKSUMS + " but the configured value is " + bytesPerChecksum); } @@ -782,7 +787,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { blockType = newBlockType; baosInMemory.reset(); - baosInMemory.write(DUMMY_HEADER); + baosInMemory.write(getDummyHeaderForVersion(this.minorVersion)); state = State.WRITING; @@ -849,15 +854,62 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { * outputbyte stream 'baos'. */ private void doCompressionAndChecksumming() throws IOException { + if ( minorVersion <= MINOR_VERSION_NO_CHECKSUM) { + version20compression(); + } else { + version21ChecksumAndCompression(); + } + } + + private void version20compression() throws IOException { + onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; + + if (compressAlgo != NONE) { + compressedByteStream.reset(); + compressedByteStream.write(DUMMY_HEADER_NO_CHECKSUM); + + compressionStream.resetState(); + + compressionStream.write(uncompressedBytesWithHeader, headerSize(this.minorVersion), + uncompressedBytesWithHeader.length - headerSize(this.minorVersion)); + + + compressionStream.flush(); + compressionStream.finish(); + onDiskDataSizeWithHeader = compressedByteStream.size(); // data size + onDiskBytesWithHeader = compressedByteStream.toByteArray(); + + put20Header(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, + uncompressedBytesWithHeader.length); + + + //set the header for the uncompressed bytes (for cache-on-write) + put20Header(uncompressedBytesWithHeader, 0, + onDiskBytesWithHeader.length + onDiskChecksum.length, + uncompressedBytesWithHeader.length); + + } else { + onDiskBytesWithHeader = uncompressedBytesWithHeader; + + onDiskDataSizeWithHeader = onDiskBytesWithHeader.length; + + //set the header for the uncompressed bytes + put20Header(uncompressedBytesWithHeader, 0, + onDiskBytesWithHeader.length, + uncompressedBytesWithHeader.length); + } + } + + private void version21ChecksumAndCompression() throws IOException { // do the compression if (compressAlgo != NONE) { compressedByteStream.reset(); - compressedByteStream.write(DUMMY_HEADER); + compressedByteStream.write(DUMMY_HEADER_WITH_CHECKSUM); compressionStream.resetState(); - compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE, - uncompressedBytesWithHeader.length - HEADER_SIZE); + compressionStream.write(uncompressedBytesWithHeader, headerSize(this.minorVersion), + uncompressedBytesWithHeader.length - headerSize(this.minorVersion)); compressionStream.flush(); compressionStream.finish(); @@ -871,7 +923,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { onDiskBytesWithHeader = compressedByteStream.toByteArray(); - putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, + put21Header(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader); // generate checksums for header and data. The checksums are @@ -885,9 +937,9 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; //set the header for the uncompressed bytes (for cache-on-write) - putHeader(uncompressedBytesWithHeader, 0, - onDiskBytesWithHeader.length + onDiskChecksum.length, - uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader); + put21Header(uncompressedBytesWithHeader, 0, + onDiskBytesWithHeader.length + onDiskChecksum.length, + uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader); } else { // If we are not using any compression, then the @@ -901,9 +953,9 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { onDiskChecksum = new byte[numBytes]; //set the header for the uncompressed bytes - putHeader(uncompressedBytesWithHeader, 0, - onDiskBytesWithHeader.length + onDiskChecksum.length, - uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader); + put21Header(uncompressedBytesWithHeader, 0, + onDiskBytesWithHeader.length + onDiskChecksum.length, + uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader); ChecksumUtil.generateChecksums( uncompressedBytesWithHeader, 0, uncompressedBytesWithHeader.length, @@ -923,11 +975,11 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { // do data block encoding, if data block encoder is set ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader, - HEADER_SIZE, uncompressedBytesWithHeader.length - - HEADER_SIZE).slice(); + headerSize(this.minorVersion), uncompressedBytesWithHeader.length - + headerSize(this.minorVersion)).slice(); Pair encodingResult = dataBlockEncoder.beforeWriteToDisk(rawKeyValues, - includesMemstoreTS, DUMMY_HEADER); + includesMemstoreTS, getDummyHeaderForVersion(this.minorVersion)); BlockType encodedBlockType = encodingResult.getSecond(); if (encodedBlockType == BlockType.ENCODED_DATA) { @@ -940,10 +992,10 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { "block encoder: " + encodedBlockType); } if (userDataStream.size() != - uncompressedBytesWithHeader.length - HEADER_SIZE) { + uncompressedBytesWithHeader.length - headerSize(this.minorVersion)) { throw new IOException("Uncompressed size mismatch: " + userDataStream.size() + " vs. " - + (uncompressedBytesWithHeader.length - HEADER_SIZE)); + + (uncompressedBytesWithHeader.length - headerSize(this.minorVersion))); } } } @@ -956,17 +1008,25 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { * @param onDiskDataSize size of the block on disk with header * and data but not including the checksums */ - private void putHeader(byte[] dest, int offset, int onDiskSize, - int uncompressedSize, int onDiskDataSize) { + private void put21Header(byte[] dest, int offset, int onDiskSize, + int uncompressedSize, int onDiskDataSize) { offset = blockType.put(dest, offset); - offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE); - offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE); + offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE_WITH_CHECKSUMS); + offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE_WITH_CHECKSUMS); offset = Bytes.putLong(dest, offset, prevOffset); offset = Bytes.putByte(dest, offset, checksumType.getCode()); offset = Bytes.putInt(dest, offset, bytesPerChecksum); offset = Bytes.putInt(dest, offset, onDiskDataSizeWithHeader); } + + private void put20Header(byte[] dest, int offset, int onDiskSize, + int uncompressedSize) { + offset = blockType.put(dest, offset); + offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE_NO_CHECKSUM); + offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE_NO_CHECKSUM); + Bytes.putLong(dest, offset, prevOffset); + } /** * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records * the offset of this block so that it can be referenced in the next block @@ -999,7 +1059,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { private void writeHeaderAndData(DataOutputStream out) throws IOException { ensureBlockReady(); out.write(onDiskBytesWithHeader); - if (compressAlgo == NONE) { + if (compressAlgo == NONE && minorVersion > MINOR_VERSION_NO_CHECKSUM) { if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) { throw new IOException("A " + blockType + " without compression should have checksums " @@ -1062,7 +1122,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { */ int getOnDiskSizeWithoutHeader() { expectState(State.BLOCK_READY); - return onDiskBytesWithHeader.length + onDiskChecksum.length - HEADER_SIZE; + return onDiskBytesWithHeader.length + onDiskChecksum.length - headerSize(this.minorVersion); } /** @@ -1082,7 +1142,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { */ int getUncompressedSizeWithoutHeader() { expectState(State.BLOCK_READY); - return uncompressedBytesWithHeader.length - HEADER_SIZE; + return uncompressedBytesWithHeader.length - headerSize(this.minorVersion); } /** @@ -1158,7 +1218,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), getUncompressedSizeWithoutHeader(), prevOffset, getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset, - includesMemstoreTS, MINOR_VERSION_WITH_CHECKSUM, + includesMemstoreTS, this.minorVersion, 0, ChecksumType.NULL.getCode(), // no checksums in cached data onDiskBytesWithHeader.length + onDiskChecksum.length); } @@ -1458,7 +1518,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { * coming to end of the compressed section. * * The block returned is still a version 2 block, and in particular, its - * first {@link #HEADER_SIZE} bytes contain a valid version 2 header. + * first {@link #HEADER_SIZE_WITH_CHECKSUMS} bytes contain a valid version 2 header. * * @param offset the offset of the block to read in the file * @param onDiskSizeWithMagic the on-disk size of the version 1 block, @@ -1540,8 +1600,8 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { */ private static class PrefetchedHeader { long offset = -1; - byte[] header = new byte[HEADER_SIZE]; - ByteBuffer buf = ByteBuffer.wrap(header, 0, HEADER_SIZE); + byte[] header = new byte[HEADER_SIZE_WITH_CHECKSUMS]; + ByteBuffer buf = ByteBuffer.wrap(header, 0, HEADER_SIZE_WITH_CHECKSUMS); } /** Reads version 2 blocks from the filesystem. */ @@ -1607,7 +1667,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { */ FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo, long fileSize) throws IOException { - this(istream, istream, compressAlgo, fileSize, + this(istream, istream, compressAlgo, fileSize, HFileReaderV2.MAX_MINOR_VERSION, null, null); } @@ -2074,24 +2134,24 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { if (minorVersion < MINOR_VERSION_WITH_CHECKSUM) { return HEADER_SIZE_NO_CHECKSUM; } - return HEADER_SIZE; + return HEADER_SIZE_WITH_CHECKSUMS; } /** - * Return the appropriate DUMMY_HEADER for the minor version + * Return the appropriate DUMMY_HEADER_WITH_CHECKSUM for the minor version */ public byte[] getDummyHeaderForVersion() { return getDummyHeaderForVersion(minorVersion); } /** - * Return the appropriate DUMMY_HEADER for the minor version + * Return the appropriate DUMMY_HEADER_WITH_CHECKSUM for the minor version */ static private byte[] getDummyHeaderForVersion(int minorVersion) { if (minorVersion < MINOR_VERSION_WITH_CHECKSUM) { return DUMMY_HEADER_NO_CHECKSUM; } - return DUMMY_HEADER; + return DUMMY_HEADER_WITH_CHECKSUM; } /** diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index ae9513b..a299b3c 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -32,8 +32,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KeyComparator; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.HFile.Writer; import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; @@ -87,6 +89,8 @@ public class HFileWriterV2 extends AbstractHFileWriter { private final boolean includeMemstoreTS = true; private long maxMemstoreTS = 0; + private int minorVersion = HFileReaderV2.MAX_MINOR_VERSION; + static class WriterFactoryV2 extends HFile.WriterFactory { WriterFactoryV2(Configuration conf, CacheConfig cacheConf) { super(conf, cacheConf); @@ -115,6 +119,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { SchemaMetrics.configureGlobally(conf); this.checksumType = checksumType; this.bytesPerChecksum = bytesPerChecksum; + if (!conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, false)) { + this.minorVersion = 0; + } finishInit(conf); } @@ -125,7 +132,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { // HFile filesystem-level (non-caching) block writer fsBlockWriter = new HFileBlock.Writer(compressAlgo, blockEncoder, - includeMemstoreTS, checksumType, bytesPerChecksum); + includeMemstoreTS, minorVersion, checksumType, bytesPerChecksum); // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); @@ -364,8 +371,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { finishBlock(); writeInlineBlocks(true); - FixedFileTrailer trailer = new FixedFileTrailer(2, - HFileReaderV2.MAX_MINOR_VERSION); + FixedFileTrailer trailer = new FixedFileTrailer(2, minorVersion); // Write out the metadata blocks if any. if (!metaNames.isEmpty()) { diff --git src/main/resources/hbase-default.xml src/main/resources/hbase-default.xml index 999ae5f..31a7b80 100644 --- src/main/resources/hbase-default.xml +++ src/main/resources/hbase-default.xml @@ -486,6 +486,14 @@ + hbase.regionserver.checksum.verify + false + + Allow hbase to do checksums rather than using hdfs checksums. This is a backwards + incompatible change. + + + hfile.index.block.max.size 131072 diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index 609c69c..88b8708 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -326,7 +326,7 @@ public class CacheTestUtils { prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER, blockSize, includesMemstoreTS, HFileBlock.MINOR_VERSION_NO_CHECKSUM, 0, ChecksumType.NULL.getCode(), - onDiskSizeWithoutHeader + HFileBlock.HEADER_SIZE); + onDiskSizeWithoutHeader + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS); String strKey; /* No conflicting keys */ diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index 4179725..168fc67 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -21,20 +21,11 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.*; -import java.io.ByteArrayOutputStream; import java.io.ByteArrayInputStream; import java.io.DataOutputStream; import java.io.DataInputStream; import java.io.IOException; -import java.io.OutputStream; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.zip.Checksum; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,13 +35,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.compress.Compressor; import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*; import org.junit.Before; @@ -96,7 +83,7 @@ public class TestChecksum { + algo); FSDataOutputStream os = fs.create(path); HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null, - true, HFile.DEFAULT_CHECKSUM_TYPE, + true, 1, HFile.DEFAULT_CHECKSUM_TYPE, HFile.DEFAULT_BYTES_PER_CHECKSUM); long totalSize = 0; for (int blockId = 0; blockId < 2; ++blockId) { @@ -189,7 +176,7 @@ public class TestChecksum { algo + bytesPerChecksum); FSDataOutputStream os = fs.create(path); HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null, - true, HFile.DEFAULT_CHECKSUM_TYPE, bytesPerChecksum); + true, 1,HFile.DEFAULT_CHECKSUM_TYPE, bytesPerChecksum); // write one block. The block has data // that is at least 6 times more than the checksum chunk size @@ -206,7 +193,7 @@ public class TestChecksum { os.close(); long expectedChunks = ChecksumUtil.numChunks( - dataSize + HFileBlock.HEADER_SIZE, + dataSize + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS, bytesPerChecksum); LOG.info("testChecksumChunks: pread=" + pread + ", bytesPerChecksum=" + bytesPerChecksum + @@ -228,7 +215,7 @@ public class TestChecksum { assertEquals(dataSize, b.getUncompressedSizeWithoutHeader()); // verify that we have the expected number of checksum chunks - assertEquals(totalSize, HFileBlock.HEADER_SIZE + dataSize + + assertEquals(totalSize, HFileBlock.HEADER_SIZE_WITH_CHECKSUMS + dataSize + expectedChunks * HFileBlock.CHECKSUM_SIZE); // assert that we did not encounter hbase checksum verification failures diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index bf6f064..489a1d2 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*; @@ -196,7 +197,8 @@ public class TestHFileBlock { boolean includesMemstoreTS) throws IOException { final BlockType blockType = BlockType.DATA; HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null, - includesMemstoreTS, HFile.DEFAULT_CHECKSUM_TYPE, + includesMemstoreTS, HFileReaderV2.MAX_MINOR_VERSION, + HFile.DEFAULT_CHECKSUM_TYPE, HFile.DEFAULT_BYTES_PER_CHECKSUM); DataOutputStream dos = hbw.startWriting(blockType); writeTestBlockContents(dos); @@ -210,7 +212,7 @@ public class TestHFileBlock { int correctLength) throws IOException { HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS); byte[] testV2Block = hbw.getHeaderAndDataForTest(); - int osOffset = HFileBlock.HEADER_SIZE + 9; + int osOffset = HFileBlock.HEADER_SIZE_WITH_CHECKSUMS + 9; if (testV2Block.length == correctLength) { // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid // variations across operating systems. @@ -297,7 +299,9 @@ public class TestHFileBlock { + algo); FSDataOutputStream os = fs.create(path); HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null, - includesMemstoreTS, HFile.DEFAULT_CHECKSUM_TYPE, + includesMemstoreTS, + HFileReaderV2.MAX_MINOR_VERSION, + HFile.DEFAULT_CHECKSUM_TYPE, HFile.DEFAULT_BYTES_PER_CHECKSUM); long totalSize = 0; for (int blockId = 0; blockId < 2; ++blockId) { @@ -325,13 +329,13 @@ public class TestHFileBlock { if (algo == GZ) { is = fs.open(path); hbr = new HFileBlock.FSReaderV2(is, algo, totalSize); - b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE + + b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS + b.totalChecksumBytes(), -1, pread); assertEquals(blockStr, b.toString()); int wrongCompressedSize = 2172; try { b = hbr.readBlockData(0, wrongCompressedSize - + HFileBlock.HEADER_SIZE, -1, pread); + + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS, -1, pread); fail("Exception expected"); } catch (IOException ex) { String expectedPrefix = "On-disk size without header provided is " @@ -363,7 +367,9 @@ public class TestHFileBlock { HFileDataBlockEncoder dataBlockEncoder = new HFileDataBlockEncoderImpl(encoding); HFileBlock.Writer hbw = new HFileBlock.Writer(algo, dataBlockEncoder, - includesMemstoreTS, HFile.DEFAULT_CHECKSUM_TYPE, + includesMemstoreTS, + HFileReaderV2.MAX_MINOR_VERSION, + HFile.DEFAULT_CHECKSUM_TYPE, HFile.DEFAULT_BYTES_PER_CHECKSUM); long totalSize = 0; final List encodedSizes = new ArrayList(); @@ -505,7 +511,7 @@ public class TestHFileBlock { for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { if (!pread) { assertEquals(is.getPos(), curOffset + (i == 0 ? 0 : - HFileBlock.HEADER_SIZE)); + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS)); } assertEquals(expectedOffsets.get(i).longValue(), curOffset); @@ -706,7 +712,9 @@ public class TestHFileBlock { boolean cacheOnWrite = expectedContents != null; FSDataOutputStream os = fs.create(path); HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo, null, - includesMemstoreTS, HFile.DEFAULT_CHECKSUM_TYPE, + includesMemstoreTS, + HFileReaderV2.MAX_MINOR_VERSION, + HFile.DEFAULT_CHECKSUM_TYPE, HFile.DEFAULT_BYTES_PER_CHECKSUM); Map prevOffsetByType = new HashMap(); long totalSize = 0; @@ -764,7 +772,7 @@ public class TestHFileBlock { } for (int size : new int[] { 100, 256, 12345 }) { - byte[] byteArr = new byte[HFileBlock.HEADER_SIZE + size]; + byte[] byteArr = new byte[HFileBlock.HEADER_SIZE_WITH_CHECKSUMS + size]; ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER, -1, includesMemstoreTS, @@ -772,7 +780,7 @@ public class TestHFileBlock { 0); long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase(buf.getClass(), true) - + HFileBlock.HEADER_SIZE + size); + + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS + size); long hfileBlockExpectedSize = ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true)); long expected = hfileBlockExpectedSize + byteBufferExpectedSize; diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java index 4d9b158..d7b44f4 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java @@ -28,32 +28,18 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.io.DoubleOutputStream; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; @@ -417,7 +403,7 @@ public class TestHFileBlockCompatibility { /** * Valid in the READY state. Contains the header and the uncompressed (but * potentially encoded, if this is a data block) bytes, so the length is - * {@link #uncompressedSizeWithoutHeader} + {@link HFileBlock#HEADER_SIZE}. + * {@link #uncompressedSizeWithoutHeader} + {@link HFileBlock#HEADER_SIZE_WITH_CHECKSUMS}. */ private byte[] uncompressedBytesWithHeader; diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index 8cbbc23..6280c21 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -216,7 +216,9 @@ public class TestHFileBlockIndex { private void writeWholeIndex() throws IOException { assertEquals(0, keys.size()); HFileBlock.Writer hbw = new HFileBlock.Writer(compr, null, - includesMemstoreTS, HFile.DEFAULT_CHECKSUM_TYPE, + includesMemstoreTS, + 1, + HFile.DEFAULT_CHECKSUM_TYPE, HFile.DEFAULT_BYTES_PER_CHECKSUM); FSDataOutputStream outputStream = fs.create(path); HFileBlockIndex.BlockIndexWriter biw = diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java index 4eea2ff..0c076f2 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java @@ -124,9 +124,9 @@ public class TestHFileDataBlockEncoder { HFileBlock block = getSampleHFileBlock(); Pair result = blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(), - includesMemstoreTS, HFileBlock.DUMMY_HEADER); + includesMemstoreTS, HFileBlock.DUMMY_HEADER_WITH_CHECKSUM); - int size = result.getFirst().limit() - HFileBlock.HEADER_SIZE; + int size = result.getFirst().limit() - HFileBlock.HEADER_SIZE_WITH_CHECKSUMS; HFileBlock blockOnDisk = new HFileBlock(result.getSecond(), size, size, -1, result.getFirst(), HFileBlock.FILL_HEADER, 0, includesMemstoreTS, block.getMinorVersion(), @@ -156,8 +156,8 @@ public class TestHFileDataBlockEncoder { ByteBuffer keyValues = RedundantKVGenerator.convertKvToByteBuffer( generator.generateTestKeyValues(60), includesMemstoreTS); int size = keyValues.limit(); - ByteBuffer buf = ByteBuffer.allocate(size + HFileBlock.HEADER_SIZE); - buf.position(HFileBlock.HEADER_SIZE); + ByteBuffer buf = ByteBuffer.allocate(size + HFileBlock.HEADER_SIZE_WITH_CHECKSUMS); + buf.position(HFileBlock.HEADER_SIZE_WITH_CHECKSUMS); keyValues.rewind(); buf.put(keyValues); HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf, diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java index cd0852b..1ddafa6 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java @@ -27,6 +27,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Random; @@ -46,14 +47,24 @@ import org.apache.hadoop.io.WritableUtils; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Testing writing a version 2 {@link HFile}. This is a low-level test written * during the development of {@link HFileWriterV2}. */ @Category(SmallTests.class) +@RunWith(Parameterized.class) public class TestHFileWriterV2 { + private final boolean useChecksums; + + @Parameterized.Parameters + public static Collection parameters() { + return HBaseTestingUtility.BOOLEAN_PARAMETERIZED; + } + private static final Log LOG = LogFactory.getLog(TestHFileWriterV2.class); private static final HBaseTestingUtility TEST_UTIL = @@ -62,9 +73,14 @@ public class TestHFileWriterV2 { private Configuration conf; private FileSystem fs; + public TestHFileWriterV2(boolean useChecksums) { + this.useChecksums = useChecksums; + } + @Before public void setUp() throws IOException { conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, useChecksums); fs = FileSystem.get(conf); } @@ -139,10 +155,13 @@ public class TestHFileWriterV2 { FixedFileTrailer.readFromStream(fsdis, fileSize); assertEquals(2, trailer.getMajorVersion()); + assertEquals(useChecksums?1:0, trailer.getMinorVersion()); assertEquals(entryCount, trailer.getEntryCount()); HFileBlock.FSReader blockReader = - new HFileBlock.FSReaderV2(fsdis, compressAlgo, fileSize); + new HFileBlock.FSReaderV2(fsdis,fsdis, compressAlgo, fileSize, + this.useChecksums?HFileReaderV2.MAX_MINOR_VERSION:HFileReaderV2.MIN_MINOR_VERSION, + null, null); // Comparator class name is stored in the trailer in version 2. RawComparator comparator = trailer.createComparator(); HFileBlockIndex.BlockIndexReader dataBlockIndexReader = -- 1.7.10.2 (Apple Git-33)