diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java index d951595..f0d7343 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java @@ -126,4 +126,13 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferSup public int size() { return this.pos; } + + /** + * Copy internal array + * @param array - array + * @param offset - offset + */ + public void toByteArray(byte[] array, int offset) { + System.arraycopy(buf, 0, array, offset, size()); + } } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java index c7821e3..032fb74 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java @@ -129,16 +129,23 @@ public class HFileBlockDefaultEncodingContext implements @Override public byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException { - compressAfterEncoding(uncompressedBytesWithHeader, dummyHeader); + compressAfterEncoding(uncompressedBytesWithHeader, + uncompressedBytesWithHeader.length, dummyHeader); return onDiskBytesWithHeader; } + @Override + public byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader, int len) throws IOException { + compressAfterEncoding(uncompressedBytesWithHeader, len, dummyHeader); + return onDiskBytesWithHeader; + } + /** * @param uncompressedBytesWithHeader * @param headerBytes * @throws IOException */ - protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader, byte[] headerBytes) + protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader, int len, byte[] headerBytes) throws IOException { Encryption.Context cryptoContext = fileContext.getEncryptionContext(); if (cryptoContext != Encryption.Context.NONE) { @@ -163,14 +170,14 @@ public class HFileBlockDefaultEncodingContext implements compressedByteStream.reset(); compressionStream.resetState(); compressionStream.write(uncompressedBytesWithHeader, - headerBytes.length, uncompressedBytesWithHeader.length - headerBytes.length); + headerBytes.length, len - headerBytes.length); compressionStream.flush(); compressionStream.finish(); byte[] plaintext = compressedByteStream.toByteArray(); plaintextLength = plaintext.length; in = new ByteArrayInputStream(plaintext); } else { - plaintextLength = uncompressedBytesWithHeader.length - headerBytes.length; + plaintextLength = len - headerBytes.length; in = new ByteArrayInputStream(uncompressedBytesWithHeader, headerBytes.length, plaintextLength); } @@ -213,12 +220,13 @@ public class HFileBlockDefaultEncodingContext implements compressedByteStream.write(headerBytes); compressionStream.resetState(); compressionStream.write(uncompressedBytesWithHeader, - headerBytes.length, uncompressedBytesWithHeader.length + headerBytes.length, len - headerBytes.length); compressionStream.flush(); compressionStream.finish(); onDiskBytesWithHeader = compressedByteStream.toByteArray(); } else { + // TODO onDiskBytesWithHeader = uncompressedBytesWithHeader; } } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java index 9dc14a4..dc1f0f7 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java @@ -78,4 +78,12 @@ public interface HFileBlockEncodingContext { * encrypted bytes applying the set compression algorithm and encryption. */ byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException; + + /** + * @param uncompressedBytesWithHeader encoded bytes with header + * @param len - length + * @return Bytes with header which are ready to write out to disk. This is compressed and + * encrypted bytes applying the set compression algorithm and encryption. + */ + byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader, int len) throws IOException; } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java index 909391a..81f0b87 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java @@ -57,6 +57,13 @@ public class HFileContext implements HeapSize, Cloneable { private Encryption.Context cryptoContext = Encryption.Context.NONE; private long fileCreateTime; private String hfileName; + + private boolean cacheDataOnWrite = true; + private boolean cacheIndexOnWrite = true; + private boolean cacheBloomOnWrite = true; + private boolean writeBufferReuse = false; + private int writeBufferReuseMaxSize ; + //Empty constructor. Go with setters public HFileContext() { @@ -101,6 +108,46 @@ public class HFileContext implements HeapSize, Cloneable { this.hfileName = hfileName; } + public boolean isCacheDataOnWrite() { + return cacheDataOnWrite; + } + + public void setCacheDataOnWrite(boolean cacheDataOnWrite) { + this.cacheDataOnWrite = cacheDataOnWrite; + } + + public boolean isCacheIndexOnWrite() { + return cacheIndexOnWrite; + } + + public void setCacheIndexOnWrite(boolean cacheIndexOnWrite) { + this.cacheIndexOnWrite = cacheIndexOnWrite; + } + + public boolean isCacheBloomOnWrite() { + return cacheBloomOnWrite; + } + + public void setCacheBloomOnWrite(boolean cacheBloomOnWrite) { + this.cacheBloomOnWrite = cacheBloomOnWrite; + } + + public boolean isWriteBufferReuse() { + return writeBufferReuse; + } + + public void setWriteBufferReuse(boolean writeBufferReuse) { + this.writeBufferReuse = writeBufferReuse; + } + + public int getWriteBufferReuseMaxSize() { + return writeBufferReuseMaxSize; + } + + public void setWriteBufferReuseMaxSize(int writeBufferReuseMaxSize) { + this.writeBufferReuseMaxSize = writeBufferReuseMaxSize; + } + /** * @return true when on-disk blocks from this file are compressed, and/or encrypted; * false otherwise. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 8582dbe..6d70298 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -136,6 +136,13 @@ public class HFile { // LOG is being used in HFileBlock and CheckSumUtil static final Log LOG = LogFactory.getLog(HFile.class); + public final static String HFILE_WRITE_BUFFER_REUSE_ENABLED_KEY = "hbase.hfile.write.buffer-reuse.enabled" ; + public final static boolean DEFAULT_HFILE_WRITE_BUFFER_REUSE_ENABLED = true; + + public final static String HFILE_WRITE_BUFFER_REUSE_MAX_SIZE_KEY = "hbase.hfile.write.buffer-reuse.max.size" ; + // Max size for buffer in pool - 2MB (default) + public final static int DEFAULT_HFILE_WRITE_BUFFER_REUSE_MAX_SIZE = 2*1024*1024; + /** * Maximum length of key in HFile. */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 6268f2e..68b5b15 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -99,10 +99,10 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private public class HFileBlock implements Cacheable { private static final Log LOG = LogFactory.getLog(HFileBlock.class); - + /** * On a checksum failure, do these many succeeding read requests using hdfs checksums before - * auto-reenabling hbase checksum verification. + * auto-re-enabling hbase checksum verification. */ static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3; @@ -887,6 +887,16 @@ public class HFileBlock implements Cacheable { private byte[] uncompressedBytesWithHeader; /** + * Size of {@link #uncompressedBytesWithHeader} + */ + private int uncompressedBytesWithHeaderSize; + + /** + * Size of {@link #onDiskBytesWithHeader + */ + private int onDiskBytesWithHeaderSize; + + /** * Current block's start offset in the {@link HFile}. Set in * {@link #writeHeaderAndData(FSDataOutputStream)}. */ @@ -902,6 +912,15 @@ public class HFileBlock implements Cacheable { private long prevOffset; /** Meta data that holds information about the hfileblock**/ private HFileContext fileContext; + + /** + * Thread local buffer which is used to compress and encode + * file block before writing it out to output. + * This can be used only for blocks that are not supposed to be cached + * Default maximum buffer size is 2MB. + * + */ + private static ThreadLocal writeBufferTLS = new ThreadLocal(); /** * @param dataBlockEncoder data block encoding algorithm to use @@ -998,7 +1017,8 @@ public class HFileBlock implements Cacheable { // This will set state to BLOCK_READY. finishBlock(); } - + + /** * An internal method that flushes the compressing stream (if using * compression), serializes the header, and takes care of the separate @@ -1012,9 +1032,23 @@ public class HFileBlock implements Cacheable { blockType = dataBlockEncodingCtx.getBlockType(); } userDataStream.flush(); - // This does an array copy, so it is safe to cache this byte array. + // This does an array copy (only when we can't reuse TLS buffer), + // so it is safe to cache this byte array. // Header is still the empty, 'dummy' header that is yet to be filled out. - uncompressedBytesWithHeader = baosInMemory.toByteArray(); + if(canReuseBufferForCurrentBlock()){ + byte[] currentBuffer = writeBufferTLS.get(); + if(currentBuffer == null || currentBuffer.length < baosInMemory.size()) { + currentBuffer = new byte[baosInMemory.size()]; + writeBufferTLS.set(currentBuffer); + } + baosInMemory.toByteArray(currentBuffer, 0); + uncompressedBytesWithHeader = currentBuffer; + + } else{ + uncompressedBytesWithHeader = baosInMemory.toByteArray(); + } + uncompressedBytesWithHeaderSize = baosInMemory.size(); + prevOffset = prevOffsetByType[blockType.getId()]; // We need to set state before we can package the block up for @@ -1023,33 +1057,61 @@ public class HFileBlock implements Cacheable { state = State.BLOCK_READY; if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { onDiskBytesWithHeader = dataBlockEncodingCtx - .compressAndEncrypt(uncompressedBytesWithHeader); + .compressAndEncrypt(uncompressedBytesWithHeader, uncompressedBytesWithHeaderSize); } else { onDiskBytesWithHeader = this.defaultBlockEncodingCtx. - compressAndEncrypt(uncompressedBytesWithHeader); + compressAndEncrypt(uncompressedBytesWithHeader, uncompressedBytesWithHeaderSize); } // Calculate how many bytes we need for checksum on the tail of the block. int numBytes = (int) ChecksumUtil.numBytes( onDiskBytesWithHeader.length, fileContext.getBytesPerChecksum()); - + boolean shared = onDiskBytesWithHeader == uncompressedBytesWithHeader; + onDiskBytesWithHeaderSize = + shared? uncompressedBytesWithHeaderSize: onDiskBytesWithHeader.length; // Put the header for the on disk bytes; header currently is unfilled-out putHeader(onDiskBytesWithHeader, 0, - onDiskBytesWithHeader.length + numBytes, - uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length); + onDiskBytesWithHeaderSize + numBytes, + uncompressedBytesWithHeaderSize, onDiskBytesWithHeaderSize); // Set the header for the uncompressed bytes (for cache-on-write) -- IFF different from // onDiskBytesWithHeader array. if (onDiskBytesWithHeader != uncompressedBytesWithHeader) { putHeader(uncompressedBytesWithHeader, 0, - onDiskBytesWithHeader.length + numBytes, - uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length); + onDiskBytesWithHeaderSize + numBytes, + uncompressedBytesWithHeaderSize, onDiskBytesWithHeaderSize); } onDiskChecksum = new byte[numBytes]; ChecksumUtil.generateChecksums( - onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, + onDiskBytesWithHeader, 0, onDiskBytesWithHeaderSize, onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); } + private boolean canReuseBufferForCurrentBlock() + { + boolean cacheOnWrite = getCacheOnWrite(); + boolean sizeBelowMax = baosInMemory.size() < fileContext.getWriteBufferReuseMaxSize(); + return !cacheOnWrite && fileContext.isWriteBufferReuse() && sizeBelowMax; + } + + + private boolean getCacheOnWrite() { + switch(blockType){ + case DATA: + case ENCODED_DATA: + return fileContext.isCacheDataOnWrite(); + case BLOOM_CHUNK: + case DELETE_FAMILY_BLOOM_META: + case GENERAL_BLOOM_META: + return fileContext.isCacheBloomOnWrite(); + case INTERMEDIATE_INDEX: + case LEAF_INDEX: + case ROOT_INDEX: + return fileContext.isCacheIndexOnWrite(); + default: return true; + } + } + + /** * Put the header into the given byte array at the given offset. * @param onDiskSize size of the block on disk header + data + checksum @@ -1101,7 +1163,7 @@ public class HFileBlock implements Cacheable { protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException { ensureBlockReady(); - out.write(onDiskBytesWithHeader); + out.write(onDiskBytesWithHeader, 0, onDiskBytesWithHeaderSize); out.write(onDiskChecksum); } @@ -1120,12 +1182,12 @@ public class HFileBlock implements Cacheable { // This is not very optimal, because we are doing an extra copy. // But this method is used only by unit tests. byte[] output = - new byte[onDiskBytesWithHeader.length + new byte[onDiskBytesWithHeaderSize + onDiskChecksum.length]; System.arraycopy(onDiskBytesWithHeader, 0, output, 0, - onDiskBytesWithHeader.length); + onDiskBytesWithHeaderSize); System.arraycopy(onDiskChecksum, 0, output, - onDiskBytesWithHeader.length, onDiskChecksum.length); + onDiskBytesWithHeaderSize, onDiskChecksum.length); return output; } @@ -1153,7 +1215,7 @@ public class HFileBlock implements Cacheable { */ int getOnDiskSizeWithoutHeader() { expectState(State.BLOCK_READY); - return onDiskBytesWithHeader.length + + return onDiskBytesWithHeaderSize + onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; } @@ -1166,7 +1228,7 @@ public class HFileBlock implements Cacheable { */ int getOnDiskSizeWithHeader() { expectState(State.BLOCK_READY); - return onDiskBytesWithHeader.length + onDiskChecksum.length; + return onDiskBytesWithHeaderSize + onDiskChecksum.length; } /** @@ -1174,7 +1236,7 @@ public class HFileBlock implements Cacheable { */ int getUncompressedSizeWithoutHeader() { expectState(State.BLOCK_READY); - return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE; + return uncompressedBytesWithHeaderSize - HConstants.HFILEBLOCK_HEADER_SIZE; } /** @@ -1182,7 +1244,7 @@ public class HFileBlock implements Cacheable { */ int getUncompressedSizeWithHeader() { expectState(State.BLOCK_READY); - return uncompressedBytesWithHeader.length; + return uncompressedBytesWithHeaderSize; } /** @return true if a block is being written */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index d310d13..58ec29a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -55,8 +55,8 @@ import org.apache.hadoop.io.Writable; */ @InterfaceAudience.Private public class HFileWriterImpl implements HFile.Writer { - private static final Log LOG = LogFactory.getLog(HFileWriterImpl.class); - + private static final Log LOG = LogFactory.getLog(HFileWriterImpl.class); + private static final long UNSET = -1; /** The Cell previously appended. Becomes the last cell in the file.*/ @@ -160,6 +160,17 @@ public class HFileWriterImpl implements HFile.Writer { this.path = path; this.name = path != null ? path.getName() : outputStream.toString(); this.hFileContext = fileContext; + this.hFileContext.setCacheDataOnWrite(cacheConf.shouldCacheDataOnWrite()); + this.hFileContext.setCacheIndexOnWrite(cacheConf.shouldCacheIndexesOnWrite()); + this.hFileContext.setCacheBloomOnWrite(cacheConf.shouldCacheBloomsOnWrite()); + this.hFileContext.setWriteBufferReuse( + conf.getBoolean(HFile.HFILE_WRITE_BUFFER_REUSE_ENABLED_KEY, + HFile.DEFAULT_HFILE_WRITE_BUFFER_REUSE_ENABLED)); + this.hFileContext.setWriteBufferReuseMaxSize( + conf.getInt(HFile.HFILE_WRITE_BUFFER_REUSE_MAX_SIZE_KEY, + HFile.DEFAULT_HFILE_WRITE_BUFFER_REUSE_MAX_SIZE)); + + DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); if (encoding != DataBlockEncoding.NONE) { this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); @@ -179,6 +190,7 @@ public class HFileWriterImpl implements HFile.Writer { } } + /** * Add to the file info. All added key/value pairs can be obtained using * {@link HFile.Reader#loadFileInfo()}.