.../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 56 +++++++++++++++++++++- .../asyncfs/TestFanOutOneBlockAsyncDFSOutput.java | 29 +++++++++++ 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 02ffcd5..c80e7c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -71,6 +71,8 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.util.DataChecksum; +import com.google.common.annotations.VisibleForTesting; + /** * An asynchronous HDFS output stream implementation which fans out data to datanode and only * supports writing file with only one block. @@ -165,6 +167,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private ByteBuf buf; + private MaxByteBufAllocator byteBufAllocator; + private enum State { STREAMING, CLOSING, BROKEN, CLOSED } @@ -307,7 +311,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.summer = summer; this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); this.alloc = alloc; - this.buf = alloc.directBuffer(); + this.byteBufAllocator = new MaxByteBufAllocator(); + this.buf = this.byteBufAllocator.allocate(alloc); this.state = State.STREAMING; setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); } @@ -315,6 +320,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private void writeInt0(int i) { buf.ensureWritable(4); buf.writeInt(i); + this.byteBufAllocator.bytesWritten(4); } @Override @@ -329,6 +335,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private void write0(ByteBuffer bb) { buf.ensureWritable(bb.remaining()); buf.writeBytes(bb); + this.byteBufAllocator.bytesWritten(bb.remaining()); } @Override @@ -348,6 +355,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private void write0(byte[] b, int off, int len) { buf.ensureWritable(len); buf.writeBytes(b, off, len); + this.byteBufAllocator.bytesWritten(len); } @Override @@ -472,7 +480,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { } }); int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum(); - ByteBuf newBuf = alloc.directBuffer().ensureWritable(trailingPartialChunkLen); + ByteBuf newBuf = this.byteBufAllocator.allocate(alloc).ensureWritable(trailingPartialChunkLen); + // after allocating reset the bytesWritten + this.byteBufAllocator.reset(); if (trailingPartialChunkLen != 0) { buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen); } @@ -543,4 +553,46 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId); } + + @VisibleForTesting + public static class MaxByteBufAllocator { + private int capacity; + private int bytesWritten; + private final int LIMIT = 128 * 1024 * 1024; + + MaxByteBufAllocator(int initialCapacity) { + this.capacity = initialCapacity; + } + + MaxByteBufAllocator() { + // default is 4KB + this(4 * 1024); + } + + ByteBuf allocate(ByteBufAllocator allocator) { + return allocator.directBuffer(guess()); + } + + void bytesWritten(int bytesWritten) { + this.bytesWritten += bytesWritten; + } + + void reset() { + this.bytesWritten = 0; + } + + private int guess() { + if (this.bytesWritten > this.capacity) { + if ((this.capacity << 1) <= LIMIT) { + // increase the capacity in the range of power of 2 + this.capacity = this.capacity << 1; + } + } else { + if ((this.capacity >> 1) >= this.bytesWritten) { + this.capacity = this.capacity >> 1; + } + } + return this.capacity; + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 7897472..05dd3b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -23,6 +23,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -40,6 +42,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput.MaxByteBufAllocator; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -128,6 +131,32 @@ public class TestFanOutOneBlockAsyncDFSOutput { } @Test + public void testMaxByteBufAllocator() throws Exception { + MaxByteBufAllocator allocator = new MaxByteBufAllocator(); + PooledByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT; + allocator.bytesWritten(5 * 1024); + ByteBuf buf = allocator.allocate(alloc); + assertEquals(8 * 1024, buf.capacity()); + // try allocating again + buf = allocator.allocate(alloc); + // should be of same size + assertEquals(8 * 1024, buf.capacity()); + allocator.reset(); + // write more + allocator.bytesWritten(10 * 1024); + buf = allocator.allocate(alloc); + assertEquals(16 * 1024, buf.capacity()); + allocator.reset(); + // again lower the size + allocator.bytesWritten(4 * 1024); + buf = allocator.allocate(alloc); + // it wont reduce directly to 4KB + assertEquals(8 * 1024, buf.capacity()); + buf = allocator.allocate(alloc); + assertEquals(4 * 1024, buf.capacity()); + } + + @Test public void testRecover() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next();