diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java index d951595..f0d7343 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java @@ -126,4 +126,13 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferSup public int size() { return this.pos; } + + /** + * Copy internal array + * @param array - array + * @param offset - offset + */ + public void toByteArray(byte[] array, int offset) { + System.arraycopy(buf, 0, array, offset, size()); + } } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java index c7821e3..032fb74 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java @@ -129,16 +129,23 @@ public class HFileBlockDefaultEncodingContext implements @Override public byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException { - compressAfterEncoding(uncompressedBytesWithHeader, dummyHeader); + compressAfterEncoding(uncompressedBytesWithHeader, + uncompressedBytesWithHeader.length, dummyHeader); return onDiskBytesWithHeader; } + @Override + public byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader, int len) throws IOException { + compressAfterEncoding(uncompressedBytesWithHeader, len, dummyHeader); + return onDiskBytesWithHeader; + } + /** * @param uncompressedBytesWithHeader * @param headerBytes * @throws IOException */ - protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader, byte[] headerBytes) + protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader, int len, byte[] headerBytes) throws IOException { Encryption.Context cryptoContext = fileContext.getEncryptionContext(); if (cryptoContext != Encryption.Context.NONE) { @@ -163,14 +170,14 @@ public class HFileBlockDefaultEncodingContext implements compressedByteStream.reset(); compressionStream.resetState(); compressionStream.write(uncompressedBytesWithHeader, - headerBytes.length, uncompressedBytesWithHeader.length - headerBytes.length); + headerBytes.length, len - headerBytes.length); compressionStream.flush(); compressionStream.finish(); byte[] plaintext = compressedByteStream.toByteArray(); plaintextLength = plaintext.length; in = new ByteArrayInputStream(plaintext); } else { - plaintextLength = uncompressedBytesWithHeader.length - headerBytes.length; + plaintextLength = len - headerBytes.length; in = new ByteArrayInputStream(uncompressedBytesWithHeader, headerBytes.length, plaintextLength); } @@ -213,12 +220,13 @@ public class HFileBlockDefaultEncodingContext implements compressedByteStream.write(headerBytes); compressionStream.resetState(); compressionStream.write(uncompressedBytesWithHeader, - headerBytes.length, uncompressedBytesWithHeader.length + headerBytes.length, len - headerBytes.length); compressionStream.flush(); compressionStream.finish(); onDiskBytesWithHeader = compressedByteStream.toByteArray(); } else { + // TODO onDiskBytesWithHeader = uncompressedBytesWithHeader; } } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java index 9dc14a4..dc1f0f7 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java @@ -78,4 +78,12 @@ public interface HFileBlockEncodingContext { * encrypted bytes applying the set compression algorithm and encryption. */ byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException; + + /** + * @param uncompressedBytesWithHeader encoded bytes with header + * @param len - length + * @return Bytes with header which are ready to write out to disk. This is compressed and + * encrypted bytes applying the set compression algorithm and encryption. + */ + byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader, int len) throws IOException; } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java index 909391a..81f0b87 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java @@ -57,6 +57,13 @@ public class HFileContext implements HeapSize, Cloneable { private Encryption.Context cryptoContext = Encryption.Context.NONE; private long fileCreateTime; private String hfileName; + + private boolean cacheDataOnWrite = true; + private boolean cacheIndexOnWrite = true; + private boolean cacheBloomOnWrite = true; + private boolean writeBufferReuse = false; + private int writeBufferReuseMaxSize ; + //Empty constructor. Go with setters public HFileContext() { @@ -101,6 +108,46 @@ public class HFileContext implements HeapSize, Cloneable { this.hfileName = hfileName; } + public boolean isCacheDataOnWrite() { + return cacheDataOnWrite; + } + + public void setCacheDataOnWrite(boolean cacheDataOnWrite) { + this.cacheDataOnWrite = cacheDataOnWrite; + } + + public boolean isCacheIndexOnWrite() { + return cacheIndexOnWrite; + } + + public void setCacheIndexOnWrite(boolean cacheIndexOnWrite) { + this.cacheIndexOnWrite = cacheIndexOnWrite; + } + + public boolean isCacheBloomOnWrite() { + return cacheBloomOnWrite; + } + + public void setCacheBloomOnWrite(boolean cacheBloomOnWrite) { + this.cacheBloomOnWrite = cacheBloomOnWrite; + } + + public boolean isWriteBufferReuse() { + return writeBufferReuse; + } + + public void setWriteBufferReuse(boolean writeBufferReuse) { + this.writeBufferReuse = writeBufferReuse; + } + + public int getWriteBufferReuseMaxSize() { + return writeBufferReuseMaxSize; + } + + public void setWriteBufferReuseMaxSize(int writeBufferReuseMaxSize) { + this.writeBufferReuseMaxSize = writeBufferReuseMaxSize; + } + /** * @return true when on-disk blocks from this file are compressed, and/or encrypted; * false otherwise. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 8582dbe..6d70298 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -136,6 +136,13 @@ public class HFile { // LOG is being used in HFileBlock and CheckSumUtil static final Log LOG = LogFactory.getLog(HFile.class); + public final static String HFILE_WRITE_BUFFER_REUSE_ENABLED_KEY = "hbase.hfile.write.buffer-reuse.enabled" ; + public final static boolean DEFAULT_HFILE_WRITE_BUFFER_REUSE_ENABLED = true; + + public final static String HFILE_WRITE_BUFFER_REUSE_MAX_SIZE_KEY = "hbase.hfile.write.buffer-reuse.max.size" ; + // Max size for buffer in pool - 2MB (default) + public final static int DEFAULT_HFILE_WRITE_BUFFER_REUSE_MAX_SIZE = 2*1024*1024; + /** * Maximum length of key in HFile. */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index f3402da..875713a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -842,7 +842,7 @@ public class HFileBlock implements Cacheable { * if compression is turned on. It also includes the checksum data that * immediately follows the block data. (header + data + checksums) */ - private byte[] onDiskBlockBytesWithHeader; + private byte[] onDiskBytesWithHeader; /** * The size of the checksum data on disk. It is used only if data is @@ -859,9 +859,19 @@ public class HFileBlock implements Cacheable { * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}. * Does not store checksums. */ - private byte[] uncompressedBlockBytesWithHeader; + private byte[] uncompressedBytesWithHeader; /** + * Size of {@link #uncompressedBytesWithHeader} + */ + private int uncompressedBytesWithHeaderSize; + + /** + * Size of {@link #onDiskBytesWithHeader + */ + private int onDiskBytesWithHeaderSize; + + /** * Current block's start offset in the {@link HFile}. Set in * {@link #writeHeaderAndData(FSDataOutputStream)}. */ @@ -877,6 +887,15 @@ public class HFileBlock implements Cacheable { private long prevOffset; /** Meta data that holds information about the hfileblock**/ private HFileContext fileContext; + + /** + * Thread local buffer which is used to compress and encode + * file block before writing it out to output. + * This can be used only for blocks that are not supposed to be cached + * Default maximum buffer size is 2MB. + * + */ + private static ThreadLocal writeBufferTLS = new ThreadLocal(); /** * @param dataBlockEncoder data block encoding algorithm to use @@ -974,7 +993,8 @@ public class HFileBlock implements Cacheable { // This will set state to BLOCK_READY. finishBlock(); } - + + /** * Finish up writing of the block. * Flushes the compressing stream (if using compression), fills out the header, @@ -984,47 +1004,89 @@ public class HFileBlock implements Cacheable { private void finishBlock() throws IOException { if (blockType == BlockType.DATA) { this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, - baosInMemory.getBuffer(), blockType); + baosInMemory.getBuffer(), blockType); blockType = dataBlockEncodingCtx.getBlockType(); } userDataStream.flush(); // This does an array copy, so it is safe to cache this byte array when cache-on-write. // Header is still the empty, 'dummy' header that is yet to be filled out. - uncompressedBlockBytesWithHeader = baosInMemory.toByteArray(); + // This does an array copy (only when we can't reuse TLS buffer), + // so it is safe to cache this byte array. + // Header is still the empty, 'dummy' header that is yet to be filled out. + if (canReuseBufferForCurrentBlock()) { + byte[] currentBuffer = writeBufferTLS.get(); + if (currentBuffer == null || currentBuffer.length < baosInMemory.size()) { + currentBuffer = new byte[baosInMemory.size()]; + writeBufferTLS.set(currentBuffer); + } + baosInMemory.toByteArray(currentBuffer, 0); + uncompressedBytesWithHeader = currentBuffer; + + } else { + uncompressedBytesWithHeader = baosInMemory.toByteArray(); + } + uncompressedBytesWithHeaderSize = baosInMemory.size(); prevOffset = prevOffsetByType[blockType.getId()]; // We need to set state before we can package the block up for cache-on-write. In a way, the // block is ready, but not yet encoded or compressed. state = State.BLOCK_READY; if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { - onDiskBlockBytesWithHeader = dataBlockEncodingCtx. - compressAndEncrypt(uncompressedBlockBytesWithHeader); + onDiskBytesWithHeader = + dataBlockEncodingCtx.compressAndEncrypt(uncompressedBytesWithHeader, + uncompressedBytesWithHeaderSize); } else { - onDiskBlockBytesWithHeader = defaultBlockEncodingCtx. - compressAndEncrypt(uncompressedBlockBytesWithHeader); + onDiskBytesWithHeader = + defaultBlockEncodingCtx.compressAndEncrypt(uncompressedBytesWithHeader, + uncompressedBytesWithHeaderSize); } + boolean shared = onDiskBytesWithHeader == uncompressedBytesWithHeader; + onDiskBytesWithHeaderSize = + shared ? uncompressedBytesWithHeaderSize : onDiskBytesWithHeader.length; // Calculate how many bytes we need for checksum on the tail of the block. - int numBytes = (int) ChecksumUtil.numBytes( - onDiskBlockBytesWithHeader.length, - fileContext.getBytesPerChecksum()); + int numBytes = + (int) ChecksumUtil.numBytes(onDiskBytesWithHeaderSize, fileContext.getBytesPerChecksum()); // Put the header for the on disk bytes; header currently is unfilled-out - putHeader(onDiskBlockBytesWithHeader, 0, - onDiskBlockBytesWithHeader.length + numBytes, - uncompressedBlockBytesWithHeader.length, onDiskBlockBytesWithHeader.length); + putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeaderSize + numBytes, + uncompressedBytesWithHeaderSize, onDiskBytesWithHeaderSize); // Set the header for the uncompressed bytes (for cache-on-write) -- IFF different from // onDiskBlockBytesWithHeader array. - if (onDiskBlockBytesWithHeader != uncompressedBlockBytesWithHeader) { - putHeader(uncompressedBlockBytesWithHeader, 0, - onDiskBlockBytesWithHeader.length + numBytes, - uncompressedBlockBytesWithHeader.length, onDiskBlockBytesWithHeader.length); + if (onDiskBytesWithHeader != uncompressedBytesWithHeader) { + putHeader(uncompressedBytesWithHeader, 0, onDiskBytesWithHeaderSize + numBytes, + uncompressedBytesWithHeaderSize, onDiskBytesWithHeaderSize); } onDiskChecksum = new byte[numBytes]; - ChecksumUtil.generateChecksums( - onDiskBlockBytesWithHeader, 0, onDiskBlockBytesWithHeader.length, - onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); + ChecksumUtil.generateChecksums(onDiskBytesWithHeader, 0, onDiskBytesWithHeaderSize, + onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); + } + + private boolean canReuseBufferForCurrentBlock() + { + boolean cacheOnWrite = getCacheOnWrite(); + boolean sizeBelowMax = baosInMemory.size() < fileContext.getWriteBufferReuseMaxSize(); + return !cacheOnWrite && fileContext.isWriteBufferReuse() && sizeBelowMax; + } + + + private boolean getCacheOnWrite() { + switch(blockType){ + case DATA: + case ENCODED_DATA: + return fileContext.isCacheDataOnWrite(); + case BLOOM_CHUNK: + case DELETE_FAMILY_BLOOM_META: + case GENERAL_BLOOM_META: + return fileContext.isCacheBloomOnWrite(); + case INTERMEDIATE_INDEX: + case LEAF_INDEX: + case ROOT_INDEX: + return fileContext.isCacheIndexOnWrite(); + default: return true; + } } - + + /** * Put the header into the given byte array at the given offset. * @param onDiskSize size of the block on disk header + data + checksum @@ -1076,7 +1138,7 @@ public class HFileBlock implements Cacheable { protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException { ensureBlockReady(); - out.write(onDiskBlockBytesWithHeader); + out.write(onDiskBytesWithHeader, 0, onDiskBytesWithHeaderSize); out.write(onDiskChecksum); } @@ -1095,12 +1157,12 @@ public class HFileBlock implements Cacheable { // This is not very optimal, because we are doing an extra copy. // But this method is used only by unit tests. byte[] output = - new byte[onDiskBlockBytesWithHeader.length + new byte[onDiskBytesWithHeaderSize + onDiskChecksum.length]; - System.arraycopy(onDiskBlockBytesWithHeader, 0, output, 0, - onDiskBlockBytesWithHeader.length); + System.arraycopy(onDiskBytesWithHeader, 0, output, 0, + onDiskBytesWithHeaderSize); System.arraycopy(onDiskChecksum, 0, output, - onDiskBlockBytesWithHeader.length, onDiskChecksum.length); + onDiskBytesWithHeaderSize, onDiskChecksum.length); return output; } @@ -1128,7 +1190,7 @@ public class HFileBlock implements Cacheable { */ int getOnDiskSizeWithoutHeader() { expectState(State.BLOCK_READY); - return onDiskBlockBytesWithHeader.length + + return onDiskBytesWithHeaderSize + onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; } @@ -1141,7 +1203,7 @@ public class HFileBlock implements Cacheable { */ int getOnDiskSizeWithHeader() { expectState(State.BLOCK_READY); - return onDiskBlockBytesWithHeader.length + onDiskChecksum.length; + return onDiskBytesWithHeaderSize + onDiskChecksum.length; } /** @@ -1149,7 +1211,7 @@ public class HFileBlock implements Cacheable { */ int getUncompressedSizeWithoutHeader() { expectState(State.BLOCK_READY); - return uncompressedBlockBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE; + return uncompressedBytesWithHeaderSize - HConstants.HFILEBLOCK_HEADER_SIZE; } /** @@ -1157,7 +1219,7 @@ public class HFileBlock implements Cacheable { */ int getUncompressedSizeWithHeader() { expectState(State.BLOCK_READY); - return uncompressedBlockBytesWithHeader.length; + return uncompressedBytesWithHeaderSize; } /** @return true if a block is being written */ @@ -1187,7 +1249,7 @@ public class HFileBlock implements Cacheable { */ ByteBuffer getUncompressedBufferWithHeader() { expectState(State.BLOCK_READY); - return ByteBuffer.wrap(uncompressedBlockBytesWithHeader); + return ByteBuffer.wrap(uncompressedBytesWithHeader); } /** @@ -1200,7 +1262,7 @@ public class HFileBlock implements Cacheable { */ ByteBuffer getOnDiskBufferWithHeader() { expectState(State.BLOCK_READY); - return ByteBuffer.wrap(onDiskBlockBytesWithHeader); + return ByteBuffer.wrap(onDiskBytesWithHeader); } private void expectState(State expectedState) { @@ -1255,7 +1317,7 @@ public class HFileBlock implements Cacheable { getOnDiskBufferWithHeader() : getUncompressedBufferWithHeader(), FILL_HEADER, startOffset, UNSET, - onDiskBlockBytesWithHeader.length + onDiskChecksum.length, newContext); + onDiskBytesWithHeaderSize + onDiskChecksum.length, newContext); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 9ab46cf..725a081 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -55,8 +55,8 @@ import org.apache.hadoop.io.Writable; */ @InterfaceAudience.Private public class HFileWriterImpl implements HFile.Writer { - private static final Log LOG = LogFactory.getLog(HFileWriterImpl.class); - + private static final Log LOG = LogFactory.getLog(HFileWriterImpl.class); + private static final long UNSET = -1; /** The Cell previously appended. Becomes the last cell in the file.*/ @@ -160,6 +160,17 @@ public class HFileWriterImpl implements HFile.Writer { this.path = path; this.name = path != null ? path.getName() : outputStream.toString(); this.hFileContext = fileContext; + this.hFileContext.setCacheDataOnWrite(cacheConf.shouldCacheDataOnWrite()); + this.hFileContext.setCacheIndexOnWrite(cacheConf.shouldCacheIndexesOnWrite()); + this.hFileContext.setCacheBloomOnWrite(cacheConf.shouldCacheBloomsOnWrite()); + this.hFileContext.setWriteBufferReuse( + conf.getBoolean(HFile.HFILE_WRITE_BUFFER_REUSE_ENABLED_KEY, + HFile.DEFAULT_HFILE_WRITE_BUFFER_REUSE_ENABLED)); + this.hFileContext.setWriteBufferReuseMaxSize( + conf.getInt(HFile.HFILE_WRITE_BUFFER_REUSE_MAX_SIZE_KEY, + HFile.DEFAULT_HFILE_WRITE_BUFFER_REUSE_MAX_SIZE)); + + DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); if (encoding != DataBlockEncoding.NONE) { this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); @@ -179,6 +190,7 @@ public class HFileWriterImpl implements HFile.Writer { } } + /** * Add to the file info. All added key/value pairs can be obtained using * {@link HFile.Reader#loadFileInfo()}.