diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java index deddb51..1472d52 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java @@ -54,7 +54,7 @@ public class BoundedByteBufferPool { private final Queue buffers = new ConcurrentLinkedQueue(); @VisibleForTesting - int getQueueSize() { + public int getQueueSize() { return buffers.size(); } @@ -99,6 +99,7 @@ public class BoundedByteBufferPool { // For reporting, only used in the log private final AtomicLong allocationsRef = new AtomicLong(); + private boolean isDirect = true; /** * @param maxByteBufferSizeToCache * @param initialByteBufferSize @@ -111,6 +112,14 @@ public class BoundedByteBufferPool { this.maxToCache = maxToCache; } + public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize, + final int maxToCache, boolean isDirect) { + this.maxByteBufferSizeToCache = maxByteBufferSizeToCache; + this.runningAverageRef = new AtomicInteger(initialByteBufferSize); + this.maxToCache = maxToCache; + this.isDirect = isDirect; + } + public ByteBuffer getBuffer() { ByteBuffer bb = buffers.poll(); if (bb != null) { @@ -134,7 +143,7 @@ public class BoundedByteBufferPool { } int runningAverage = runningAverageRef.get(); - bb = ByteBuffer.allocateDirect(runningAverage); + bb = isDirect? ByteBuffer.allocateDirect(runningAverage): ByteBuffer.allocate(runningAverage); if (LOG.isTraceEnabled()) { long allocations = allocationsRef.incrementAndGet(); @@ -143,10 +152,10 @@ public class BoundedByteBufferPool { return bb; } - public void putBuffer(ByteBuffer bb) { + public boolean putBuffer(ByteBuffer bb) { // If buffer is larger than we want to keep around, just let it go. if (bb.capacity() > maxByteBufferSizeToCache) { - return; + return false; } int countOfBuffers; @@ -156,9 +165,9 @@ public class BoundedByteBufferPool { countOfBuffers = toCountOfBuffers(prevState); if (countOfBuffers >= maxToCache) { if (LOG.isWarnEnabled()) { - LOG.warn("At capacity: " + countOfBuffers); + // LOG.warn("At capacity: " + countOfBuffers); } - return; + return false; } countOfBuffers++; assert 0 < countOfBuffers && countOfBuffers <= maxToCache; @@ -168,7 +177,7 @@ public class BoundedByteBufferPool { if (LOG.isWarnEnabled()) { LOG.warn("Overflowed total capacity."); } - return; + return false; } long state = toState(countOfBuffers, totalCapacity); @@ -190,5 +199,6 @@ public class BoundedByteBufferPool { break; } } + return true; } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index f3402da..2fd5f2b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -28,13 +28,16 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.ByteBuffInputStream; @@ -47,6 +50,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.regionserver.CompactSplitThread; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; @@ -191,6 +195,44 @@ public class HFileBlock implements Cacheable { public static final boolean FILL_HEADER = true; public static final boolean DONT_FILL_HEADER = false; + /** + * Byte buffer pool which is used to unpack HFileBlock + * during compaction (saves one large memory allocation per every HFile block read) + * HBASE-15544 + */ + static BoundedByteBufferPool bufferPool; + + /** + * This pool keeps released byte buffers for some time, + * before they can be promoted to buffer's pool. Works as FIFO queue + * Released buffers can't be reused immediately due to some race conditions + * in compaction threads. + */ + static BoundedByteBufferPool releasedBufferPool; + + static{ + int defaulBlockSize = HConstants.DEFAULT_BLOCKSIZE + 100; + int maxBlockSize = 4 * defaulBlockSize; + Configuration conf = HBaseConfiguration.create(); + int smallCompactionThreads = conf.getInt( + CompactSplitThread.SMALL_COMPACTION_THREADS, + CompactSplitThread.SMALL_COMPACTION_THREADS_DEFAULT); + int largeCompactionThreads = conf.getInt( + CompactSplitThread.LARGE_COMPACTION_THREADS, + CompactSplitThread.LARGE_COMPACTION_THREADS_DEFAULT); + + bufferPool = new BoundedByteBufferPool(maxBlockSize, defaulBlockSize, + 4*(smallCompactionThreads + largeCompactionThreads), false); + + releasedBufferPool = new BoundedByteBufferPool(maxBlockSize, defaulBlockSize, + 4*(smallCompactionThreads + largeCompactionThreads), false); + } + + /** + * Current buffer from pool + */ + private ByteBuffer fromBufferPool; + // How to get the estimate correctly? if it is a singleBB? public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = (int)ClassSize.estimateBase(MultiByteBuff.class, false); @@ -588,12 +630,13 @@ public class HFileBlock implements Cacheable { .append("]"); return sb.toString(); } - + + /** * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its * encoded structure. Internal structures are shared between instances where applicable. */ - HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { + HFileBlock unpack(HFileContext fileContext, FSReader reader, boolean isCompaction) throws IOException { if (!fileContext.isCompressedOrEncrypted()) { // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), // which is used for block serialization to L2 cache, does not preserve encoding and @@ -602,7 +645,10 @@ public class HFileBlock implements Cacheable { } HFileBlock unpacked = new HFileBlock(this); - unpacked.allocateBuffer(); // allocates space for the decompressed block + boolean reuse = isCompaction && + (unpacked.getBlockType() == BlockType.DATA || + unpacked.getBlockType() == BlockType.ENCODED_DATA); + unpacked.allocateBuffer(reuse); // allocates space for the decompressed block HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); @@ -613,22 +659,56 @@ public class HFileBlock implements Cacheable { ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup); + if(reuse && unpacked.getBlockType() != BlockType.DATA) { + /*DEBUG*/System.out.println("WARN "+ unpacked.getBlockType()); + } return unpacked; } + HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { + return unpack(fileContext, reader, false); + } + + /** + * Release byte buffer back to pool + */ + public void release(){ + if(fromBufferPool != null){ + boolean result = releasedBufferPool.putBuffer(fromBufferPool); + if(!result){ + // get next from released and promote it to main pool + ByteBuffer buf = releasedBufferPool.getBuffer(); + bufferPool.putBuffer(buf); + // Try again, dump if fails + releasedBufferPool.putBuffer(fromBufferPool); + } + fromBufferPool = null; + } + } + + /** * Always allocates a new buffer of the correct size. Copies header bytes * from the existing buffer. Does not change header fields. * Reserve room to keep checksum bytes too. */ - private void allocateBuffer() { + private void allocateBuffer(boolean reuseBuffer) { int cksumBytes = totalChecksumBytes(); int headerSize = headerSize(); int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; // TODO we need consider allocating offheap here? - ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded); - + ByteBuffer newBuf = null; + if(reuseBuffer){ + newBuf = bufferPool.getBuffer(); + if(newBuf == null || newBuf.capacity() < capacityNeeded){ + newBuf = ByteBuffer.allocate(capacityNeeded); + } + fromBufferPool = newBuf; + newBuf.clear(); + } else{ + newBuf = ByteBuffer.allocate(capacityNeeded); + } // Copy header bytes into newBuf. // newBuf is HBB so no issue in calling array() buf.position(0); @@ -1426,7 +1506,7 @@ public class HFileBlock implements Cacheable { return null; HFileBlock b = readBlockData(offset, -1, false); offset += b.getOnDiskSizeWithHeader(); - return b.unpack(fileContext, owner); + return b.unpack(fileContext, owner, false); } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 0b1ac83..77137c8 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -519,6 +519,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { if (this.curBlock != null && this.curBlock.usesSharedMemory()) { prevBlocks.add(this.curBlock); } + if(this.curBlock != null) { + this.curBlock.release(); + } this.curBlock = block; } @@ -527,6 +530,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { if (this.curBlock != null && this.curBlock.usesSharedMemory()) { this.prevBlocks.add(this.curBlock); } + if(this.curBlock != null){ + this.curBlock.release(); + } this.curBlock = null; } @@ -578,6 +584,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public void close() { this.returnBlocks(true); + if(this.curBlock != null) { + this.curBlock.release(); + } } protected int getCurCellSize() { @@ -1530,7 +1539,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread); validateBlockType(hfileBlock, expectedBlockType); - HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); + HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader, isCompaction); BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); // Cache the block if necessary