From 52c972fbc17202e38c7da0c7210d8d1aa5c50de1 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Thu, 23 Mar 2017 00:00:56 +0800 Subject: [PATCH] HBASE-17623 Reuse the bytes array when building the hfile block --- .../encoding/HFileBlockDefaultEncodingContext.java | 44 +++--- .../io/encoding/HFileBlockEncodingContext.java | 14 +- .../java/org/apache/hadoop/hbase/util/Bytes.java | 149 ++++++++++++++++++++- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 131 +++++++++--------- .../hbase/io/encoding/TestDataBlockEncoders.java | 11 +- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 4 +- .../hbase/io/hfile/TestHFileDataBlockEncoder.java | 7 +- 7 files changed, 244 insertions(+), 116 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java index c7821e3..948f770 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java @@ -19,13 +19,13 @@ package org.apache.hadoop.hbase.io.encoding; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.security.SecureRandom; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; @@ -37,6 +37,7 @@ import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; /** * A default implementation of {@link HFileBlockEncodingContext}. It will @@ -48,7 +49,6 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private public class HFileBlockDefaultEncodingContext implements HFileBlockEncodingContext { - private byte[] onDiskBytesWithHeader; private BlockType blockType; private final DataBlockEncoding encodingAlgo; @@ -128,17 +128,12 @@ public class HFileBlockDefaultEncodingContext implements } @Override - public byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException { - compressAfterEncoding(uncompressedBytesWithHeader, dummyHeader); - return onDiskBytesWithHeader; + public Bytes compressAndEncrypt(byte[] data, int offset, int length) throws IOException { + return compressAfterEncoding(data, offset, length, dummyHeader); } - /** - * @param uncompressedBytesWithHeader - * @param headerBytes - * @throws IOException - */ - protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader, byte[] headerBytes) + private Bytes compressAfterEncoding(byte[] uncompressedBytesWithHeaderBuffer, + int uncompressedBytesWithHeaderOffset, int uncompressedBytesWithHeaderLength, byte[] headerBytes) throws IOException { Encryption.Context cryptoContext = fileContext.getEncryptionContext(); if (cryptoContext != Encryption.Context.NONE) { @@ -162,17 +157,17 @@ public class HFileBlockDefaultEncodingContext implements if (fileContext.getCompression() != Compression.Algorithm.NONE) { compressedByteStream.reset(); compressionStream.resetState(); - compressionStream.write(uncompressedBytesWithHeader, - headerBytes.length, uncompressedBytesWithHeader.length - headerBytes.length); + compressionStream.write(uncompressedBytesWithHeaderBuffer, + headerBytes.length + uncompressedBytesWithHeaderOffset, uncompressedBytesWithHeaderLength - headerBytes.length); compressionStream.flush(); compressionStream.finish(); byte[] plaintext = compressedByteStream.toByteArray(); plaintextLength = plaintext.length; in = new ByteArrayInputStream(plaintext); } else { - plaintextLength = uncompressedBytesWithHeader.length - headerBytes.length; - in = new ByteArrayInputStream(uncompressedBytesWithHeader, - headerBytes.length, plaintextLength); + plaintextLength = uncompressedBytesWithHeaderLength - headerBytes.length; + in = new ByteArrayInputStream(uncompressedBytesWithHeaderBuffer, + headerBytes.length + uncompressedBytesWithHeaderOffset, plaintextLength); } if (plaintextLength > 0) { @@ -194,16 +189,13 @@ public class HFileBlockDefaultEncodingContext implements // Encrypt the data Encryption.encrypt(cryptoByteStream, in, encryptor); - onDiskBytesWithHeader = cryptoByteStream.toByteArray(); - // Increment the IV given the final block size - Encryption.incrementIv(iv, 1 + (onDiskBytesWithHeader.length / encryptor.getBlockSize())); - + Encryption.incrementIv(iv, 1 + (cryptoByteStream.size() / encryptor.getBlockSize())); + return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size()); } else { cryptoByteStream.write(0); - onDiskBytesWithHeader = cryptoByteStream.toByteArray(); - + return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size()); } } else { @@ -212,14 +204,14 @@ public class HFileBlockDefaultEncodingContext implements compressedByteStream.reset(); compressedByteStream.write(headerBytes); compressionStream.resetState(); - compressionStream.write(uncompressedBytesWithHeader, - headerBytes.length, uncompressedBytesWithHeader.length + compressionStream.write(uncompressedBytesWithHeaderBuffer, + headerBytes.length + uncompressedBytesWithHeaderOffset, uncompressedBytesWithHeaderLength - headerBytes.length); compressionStream.flush(); compressionStream.finish(); - onDiskBytesWithHeader = compressedByteStream.toByteArray(); + return new Bytes(compressedByteStream.getBuffer(), 0, compressedByteStream.size()); } else { - onDiskBytesWithHeader = uncompressedBytesWithHeader; + return null; } } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java index 9dc14a4..30c2a16 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.util.Bytes; /** * An encoding context that is created by a writer's encoder, and is shared @@ -73,9 +74,14 @@ public interface HFileBlockEncodingContext { EncodingState getEncodingState(); /** - * @param uncompressedBytesWithHeader encoded bytes with header - * @return Bytes with header which are ready to write out to disk. This is compressed and - * encrypted bytes applying the set compression algorithm and encryption. + * @param data encoded bytes with header + * @param offset the offset in encoded data to start at + * @param length the number of encoded bytes + * @return Bytes with header which are ready to write out to disk. + * This is compressed and encrypted bytes applying the set compression + * algorithm and encryption. The bytes may be changed. + * If need a Bytes reference for later use, clone the bytes and use that. + * Null if the data doesn't need to be compressed and encrypted. */ - byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException; + Bytes compressAndEncrypt(byte[] data, int offset, int length) throws IOException; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java index 822da6a..ed98305 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -61,7 +61,7 @@ import org.apache.hadoop.hbase.util.Bytes.LexicographicalComparerHolder.UnsafeCo @SuppressWarnings("restriction") @InterfaceAudience.Public @InterfaceStability.Stable -public class Bytes { +public class Bytes implements Comparable { //HConstants.UTF8_ENCODING should be updated if this changed /** When we encode strings, we always specify UTF8 encoding */ private static final String UTF8_ENCODING = "UTF-8"; @@ -141,6 +141,153 @@ public class Bytes { return b == null ? 0 : b.length; } + private byte[] bytes; + private int offset; + private int length; + + /** + * Create a zero-size sequence. + */ + public Bytes() { + super(); + } + + /** + * Create a Bytes using the byte array as the initial value. + * @param bytes This array becomes the backing storage for the object. + */ + public Bytes(byte[] bytes) { + this(bytes, 0, bytes.length); + } + + /** + * Set the new Bytes to the contents of the passed + * ibw. + * @param ibw the value to set this Bytes to. + */ + public Bytes(final Bytes ibw) { + this(ibw.get(), ibw.getOffset(), ibw.getLength()); + } + + /** + * Set the value to a given byte range + * @param bytes the new byte range to set to + * @param offset the offset in newData to start at + * @param length the number of bytes in the range + */ + public Bytes(final byte[] bytes, final int offset, + final int length) { + this.bytes = bytes; + this.offset = offset; + this.length = length; + } + + /** + * Get the data from the Bytes. + * @return The data is only valid between offset and offset+length. + */ + public byte[] get() { + if (this.bytes == null) { + throw new IllegalStateException("Uninitialiized. Null constructor " + + "called w/o accompaying readFields invocation"); + } + return this.bytes; + } + + /** + * @param b Use passed bytes as backing array for this instance. + */ + public void set(final byte[] b) { + set(b, 0, b.length); + } + + /** + * @param b Use passed bytes as backing array for this instance. + * @param offset + * @param length + */ + public void set(final byte[] b, final int offset, final int length) { + this.bytes = b; + this.offset = offset; + this.length = length; + } + + /** + * @return the number of valid bytes in the buffer + */ + public int getLength() { + if (this.bytes == null) { + throw new IllegalStateException("Uninitialiized. Null constructor " + + "called w/o accompaying readFields invocation"); + } + return this.length; + } + + /** + * @return offset + */ + public int getOffset(){ + return this.offset; + } + @Override + public int hashCode() { + return Bytes.hashCode(bytes, offset, length); + } + + /** + * Define the sort order of the Bytes. + * @param that The other bytes writable + * @return Positive if left is bigger than right, 0 if they are equal, and + * negative if left is smaller than right. + */ + @Override + public int compareTo(Bytes that) { + return BYTES_RAWCOMPARATOR.compare( + this.bytes, this.offset, this.length, + that.bytes, that.offset, that.length); + } + + /** + * Compares the bytes in this object to the specified byte array + * @param that + * @return Positive if left is bigger than right, 0 if they are equal, and + * negative if left is smaller than right. + */ + public int compareTo(final byte[] that) { + return BYTES_RAWCOMPARATOR.compare( + this.bytes, this.offset, this.length, + that, 0, that.length); + } + + /** + * @see Object#equals(Object) + */ + @Override + public boolean equals(Object that) { + if (that == this) { + return true; + } + if (that instanceof Bytes) { + return compareTo((Bytes)that) == 0; + } + return false; + } + + /** + * @see Object#toString() + */ + @Override + public String toString() { + return Bytes.toString(bytes, offset, length); + } + + /** + * Returns a copy of the bytes referred to by this writable + */ + public byte[] copyBytes() { + return Arrays.copyOfRange(bytes, offset, offset+length); + } + /** * Byte array comparator class. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 78cabcb..b8629da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; @@ -37,6 +36,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; @@ -301,7 +301,7 @@ public class HFileBlock implements Cacheable { /** * Creates a new {@link HFile} block from the given fields. This constructor - * is used when the block data has already been read and uncompressed, + * is used only while writing blocks and caching, * and is sitting in a byte buffer and we want to stuff the block into cache. * See {@link Writer#getBlockForCaching(CacheConfig)}. * @@ -313,8 +313,7 @@ public class HFileBlock implements Cacheable { * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} * @param prevBlockOffset see {@link #prevBlockOffset} - * @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by - * uncompressed data. + * @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) * @param fillHeader when true, write the first 4 header fields into passed buffer. * @param offset the file offset the block was read from * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} @@ -850,7 +849,7 @@ public class HFileBlock implements Cacheable { * if compression is turned on. It also includes the checksum data that * immediately follows the block data. (header + data + checksums) */ - private byte[] onDiskBlockBytesWithHeader; + private ByteArrayOutputStream onDiskBlockBytesWithHeader; /** * The size of the checksum data on disk. It is used only if data is @@ -861,15 +860,6 @@ public class HFileBlock implements Cacheable { private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; /** - * Valid in the READY state. Contains the header and the uncompressed (but - * potentially encoded, if this is a data block) bytes, so the length is - * {@link #uncompressedSizeWithoutHeader} + - * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}. - * Does not store checksums. - */ - private byte[] uncompressedBlockBytesWithHeader; - - /** * Current block's start offset in the {@link HFile}. Set in * {@link #writeHeaderAndData(FSDataOutputStream)}. */ @@ -991,66 +981,57 @@ public class HFileBlock implements Cacheable { */ private void finishBlock() throws IOException { if (blockType == BlockType.DATA) { - BufferGrabbingByteArrayOutputStream baosInMemoryCopy = - new BufferGrabbingByteArrayOutputStream(); - baosInMemory.writeTo(baosInMemoryCopy); this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, - baosInMemoryCopy.buf, blockType); + baosInMemory.getBuffer(), blockType); blockType = dataBlockEncodingCtx.getBlockType(); } userDataStream.flush(); - // This does an array copy, so it is safe to cache this byte array when cache-on-write. - // Header is still the empty, 'dummy' header that is yet to be filled out. - uncompressedBlockBytesWithHeader = baosInMemory.toByteArray(); prevOffset = prevOffsetByType[blockType.getId()]; // We need to set state before we can package the block up for cache-on-write. In a way, the // block is ready, but not yet encoded or compressed. state = State.BLOCK_READY; + Bytes compressAndEncryptDat; if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { - onDiskBlockBytesWithHeader = dataBlockEncodingCtx. - compressAndEncrypt(uncompressedBlockBytesWithHeader); + compressAndEncryptDat = dataBlockEncodingCtx. + compressAndEncrypt(baosInMemory.getBuffer(), + 0, baosInMemory.size()); } else { - onDiskBlockBytesWithHeader = defaultBlockEncodingCtx. - compressAndEncrypt(uncompressedBlockBytesWithHeader); + compressAndEncryptDat = defaultBlockEncodingCtx. + compressAndEncrypt(baosInMemory.getBuffer(), + 0, baosInMemory.size()); + } + if (compressAndEncryptDat == null) { + compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), + 0, baosInMemory.size()); } + if (onDiskBlockBytesWithHeader == null) { + onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); + } + onDiskBlockBytesWithHeader.reset(); + onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), + compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); // Calculate how many bytes we need for checksum on the tail of the block. int numBytes = (int) ChecksumUtil.numBytes( - onDiskBlockBytesWithHeader.length, + onDiskBlockBytesWithHeader.size(), fileContext.getBytesPerChecksum()); // Put the header for the on disk bytes; header currently is unfilled-out - putHeader(onDiskBlockBytesWithHeader, 0, - onDiskBlockBytesWithHeader.length + numBytes, - uncompressedBlockBytesWithHeader.length, onDiskBlockBytesWithHeader.length); - // Set the header for the uncompressed bytes (for cache-on-write) -- IFF different from - // onDiskBlockBytesWithHeader array. - if (onDiskBlockBytesWithHeader != uncompressedBlockBytesWithHeader) { - putHeader(uncompressedBlockBytesWithHeader, 0, - onDiskBlockBytesWithHeader.length + numBytes, - uncompressedBlockBytesWithHeader.length, onDiskBlockBytesWithHeader.length); - } + putHeader(onDiskBlockBytesWithHeader, + onDiskBlockBytesWithHeader.size() + numBytes, + baosInMemory.size(), onDiskBlockBytesWithHeader.size()); + // Set the header for the uncompressed bytes (for cache-on-write) if (onDiskChecksum.length != numBytes) { onDiskChecksum = new byte[numBytes]; } ChecksumUtil.generateChecksums( - onDiskBlockBytesWithHeader, 0, onDiskBlockBytesWithHeader.length, + onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(), onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); } - - public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream { - private byte[] buf; - - @Override - public void write(byte[] b, int off, int len) { - this.buf = b; - } - - public byte[] getBuffer() { - return this.buf; - } + private void putHeader(ByteArrayOutputStream dest, int onDiskSize, + int uncompressedSize, int onDiskDataSize) { + putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize); } - /** * Put the header into the given byte array at the given offset. * @param onDiskSize size of the block on disk header + data + checksum @@ -1102,7 +1083,7 @@ public class HFileBlock implements Cacheable { protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException { ensureBlockReady(); - out.write(onDiskBlockBytesWithHeader); + out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); out.write(onDiskChecksum); } @@ -1121,12 +1102,12 @@ public class HFileBlock implements Cacheable { // This is not very optimal, because we are doing an extra copy. // But this method is used only by unit tests. byte[] output = - new byte[onDiskBlockBytesWithHeader.length + new byte[onDiskBlockBytesWithHeader.size() + onDiskChecksum.length]; - System.arraycopy(onDiskBlockBytesWithHeader, 0, output, 0, - onDiskBlockBytesWithHeader.length); + System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, + onDiskBlockBytesWithHeader.size()); System.arraycopy(onDiskChecksum, 0, output, - onDiskBlockBytesWithHeader.length, onDiskChecksum.length); + onDiskBlockBytesWithHeader.size(), onDiskChecksum.length); return output; } @@ -1154,7 +1135,7 @@ public class HFileBlock implements Cacheable { */ int getOnDiskSizeWithoutHeader() { expectState(State.BLOCK_READY); - return onDiskBlockBytesWithHeader.length + + return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; } @@ -1167,7 +1148,7 @@ public class HFileBlock implements Cacheable { */ int getOnDiskSizeWithHeader() { expectState(State.BLOCK_READY); - return onDiskBlockBytesWithHeader.length + onDiskChecksum.length; + return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; } /** @@ -1175,7 +1156,7 @@ public class HFileBlock implements Cacheable { */ int getUncompressedSizeWithoutHeader() { expectState(State.BLOCK_READY); - return uncompressedBlockBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE; + return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; } /** @@ -1183,7 +1164,7 @@ public class HFileBlock implements Cacheable { */ int getUncompressedSizeWithHeader() { expectState(State.BLOCK_READY); - return uncompressedBlockBytesWithHeader.length; + return baosInMemory.size(); } /** @return true if a block is being written */ @@ -1204,29 +1185,37 @@ public class HFileBlock implements Cacheable { } /** - * Returns the header followed by the uncompressed data, even if using + * Clones the header followed by the uncompressed data, even if using * compression. This is needed for storing uncompressed blocks in the block * cache. Can be called in the "writing" state or the "block ready" state. * Returns only the header and data, does not include checksum data. * - * @return uncompressed block bytes for caching on write + * @return Returns a copy of uncompressed block bytes for caching on write */ - ByteBuffer getUncompressedBufferWithHeader() { + @VisibleForTesting + ByteBuffer cloneUncompressedBufferWithHeader() { expectState(State.BLOCK_READY); + byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray(); + int numBytes = (int) ChecksumUtil.numBytes( + onDiskBlockBytesWithHeader.size(), + fileContext.getBytesPerChecksum()); + putHeader(uncompressedBlockBytesWithHeader, 0, + onDiskBlockBytesWithHeader.size() + numBytes, + uncompressedBlockBytesWithHeader.length, onDiskBlockBytesWithHeader.size()); return ByteBuffer.wrap(uncompressedBlockBytesWithHeader); } /** - * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is + * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is * needed for storing packed blocks in the block cache. Expects calling semantics identical to * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data, * Does not include checksum data. * - * @return packed block bytes for caching on write + * @return Returns a copy of block bytes for caching on write */ - ByteBuffer getOnDiskBufferWithHeader() { + private ByteBuffer cloneOnDiskBufferWithHeader() { expectState(State.BLOCK_READY); - return ByteBuffer.wrap(onDiskBlockBytesWithHeader); + return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray()); } private void expectState(State expectedState) { @@ -1257,7 +1246,9 @@ public class HFileBlock implements Cacheable { * the byte buffer passed into the constructor of this newly created * block does not have checksum data even though the header minor * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a - * 0 value in bytesPerChecksum. + * 0 value in bytesPerChecksum. This method copies the on-disk or + * uncompressed data to build the HFileBlock which is used only + * while writing blocks and caching. * *

TODO: Should there be an option where a cache can ask that hbase preserve block * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible @@ -1278,10 +1269,10 @@ public class HFileBlock implements Cacheable { return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), getUncompressedSizeWithoutHeader(), prevOffset, cacheConf.shouldCacheCompressed(blockType.getCategory())? - getOnDiskBufferWithHeader() : - getUncompressedBufferWithHeader(), + cloneOnDiskBufferWithHeader() : + cloneUncompressedBufferWithHeader(), FILL_HEADER, startOffset, UNSET, - onDiskBlockBytesWithHeader.length + onDiskChecksum.length, newContext); + onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java index 5c11818..c8263ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -39,8 +38,8 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -242,9 +241,7 @@ public class TestDataBlockEncoders { for (KeyValue kv : kvs) { encoder.encode(kv, encodingContext, dos); } - BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream(); - baos.writeTo(stream); - encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer()); + encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer()); byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET]; System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length); return ByteBuffer.wrap(encodedData); @@ -386,9 +383,7 @@ public class TestDataBlockEncoders { for (KeyValue kv : kvList) { encoder.encode(kv, encodingContext, dos); } - BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream(); - baos.writeTo(stream); - encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer()); + encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer()); byte[] encodedData = baos.toByteArray(); testAlgorithm(encodedData, unencodedDataBuf, encoder); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 80f9989..36a2f05 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -384,7 +384,7 @@ public class TestHFileBlock { writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag); hbw.writeHeaderAndData(os); int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE; - byte[] encodedResultWithHeader = hbw.getUncompressedBufferWithHeader().array(); + byte[] encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader().array(); final int encodedSize = encodedResultWithHeader.length - headerLen; if (encoding != DataBlockEncoding.NONE) { // We need to account for the two-byte encoding algorithm ID that @@ -792,7 +792,7 @@ public class TestHFileBlock { totalSize += hbw.getOnDiskSizeWithHeader(); if (cacheOnWrite) - expectedContents.add(hbw.getUncompressedBufferWithHeader()); + expectedContents.add(hbw.cloneUncompressedBufferWithHeader()); if (detailedLogging) { LOG.info("Written block #" + i + " of type " + bt diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java index 7f8ca3b..66baa2a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -29,13 +28,13 @@ import java.util.List; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; -import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.test.RedundantKVGenerator; import org.junit.Test; @@ -190,9 +189,7 @@ public class TestHFileDataBlockEncoder { for (KeyValue kv : kvs) { blockEncoder.encode(kv, context, dos); } - BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream(); - baos.writeTo(stream); - blockEncoder.endBlockEncoding(context, dos, stream.getBuffer(), BlockType.DATA); + blockEncoder.endBlockEncoding(context, dos, baos.getBuffer(), BlockType.DATA); byte[] encodedBytes = baos.toByteArray(); size = encodedBytes.length - block.getDummyHeaderForVersion().length; return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes), -- 2.9.3