.../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 23 ++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 02ffcd5..d1c0e8a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -132,6 +132,14 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private final int maxDataLen; + // buffer's initial size + private final static int INITIAL_SIZE = 64 * 1024; + + private int bufSize = INITIAL_SIZE; + + // increment in steps of 1KB + private final int STEP = 1024; + private final ByteBufAllocator alloc; private static final class Callback { @@ -307,7 +315,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.summer = summer; this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); this.alloc = alloc; - this.buf = alloc.directBuffer(); + // allocate 64 K by default + this.buf = alloc.directBuffer(bufSize); this.state = State.STREAMING; setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); } @@ -472,7 +481,17 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { } }); int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum(); - ByteBuf newBuf = alloc.directBuffer().ensureWritable(trailingPartialChunkLen); + if (dataLen > bufSize) { + // keep incrementing in steps + bufSize += STEP; + } else { + if (bufSize - STEP > dataLen) { + bufSize -= STEP; + // If the reduction takes the bufSize even below the dataLen just go with the existing + // bufSize + } + } + ByteBuf newBuf = alloc.directBuffer(bufSize).ensureWritable(trailingPartialChunkLen); if (trailingPartialChunkLen != 0) { buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen); }