From 6d4f97e7d95bc80c1caa4ff72b961c3a894642c0 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 28 Oct 2016 15:53:27 +0800 Subject: [PATCH] HBASE-16891 Try copying to the Netty ByteBuf directly from the WALEdit --- .../regionserver/wal/AsyncProtobufLogWriter.java | 74 +++++++++++++--------- 1 file changed, 44 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index e2080ff..e534cc6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -33,7 +33,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; @@ -45,8 +44,8 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; * AsyncWriter for protobuf-based WAL. */ @InterfaceAudience.Private -public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements - AsyncFSWALProvider.AsyncWriter { +public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter + implements AsyncFSWALProvider.AsyncWriter { private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class); @@ -98,7 +97,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements private AsyncFSOutput output; - private ByteArrayOutputStream buf; + private OutputStream asyncOutputWrapper; public AsyncProtobufLogWriter(EventLoop eventLoop) { this.eventLoop = eventLoop; @@ -106,26 +105,22 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements @Override public void append(Entry entry) { - buf.reset(); + int buffered = output.buffered(); entry.setCompressionContext(compressionContext); try { entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() - .writeDelimitedTo(buf); + .writeDelimitedTo(asyncOutputWrapper); } catch (IOException e) { throw new AssertionError("should not happen", e); } - length.addAndGet(buf.size()); - output.write(buf.getBuffer(), 0, buf.size()); try { for (Cell cell : entry.getEdit().getCells()) { - buf.reset(); cellEncoder.write(cell); - length.addAndGet(buf.size()); - output.write(buf.getBuffer(), 0, buf.size()); } } catch (IOException e) { throw new AssertionError("should not happen", e); } + length.addAndGet(output.buffered() - buffered); } @Override @@ -157,22 +152,40 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements short replication, long blockSize) throws IOException { this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, blockSize, eventLoop); - this.buf = new ByteArrayOutputStream(); + this.asyncOutputWrapper = new OutputStream() { + + private byte[] oneByteBuf = new byte[1]; + + @Override + public void write(int b) throws IOException { + oneByteBuf[0] = (byte) b; + write(oneByteBuf); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + output.write(b, off, len); + } + + @Override + public void close() throws IOException { + output.close(); + } + }; } @Override protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { - buf.reset(); - header.writeDelimitedTo(buf); final BlockingCompletionHandler handler = new BlockingCompletionHandler(); - eventLoop.execute(new Runnable() { - - @Override - public void run() { - output.write(ProtobufLogReader.PB_WAL_MAGIC); - output.write(buf.getBuffer(), 0, buf.size()); - output.flush(null, handler, false); + eventLoop.execute(() -> { + output.write(magic); + try { + header.writeDelimitedTo(asyncOutputWrapper); + } catch (IOException e) { + // should not happen + throw new AssertionError(e); } + output.flush(null, handler, false); }); return handler.get(); } @@ -180,22 +193,23 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements @Override protected long writeWALTrailerAndMagic(WALTrailer trailer, final byte[] magic) throws IOException { - buf.reset(); - trailer.writeTo(buf); final BlockingCompletionHandler handler = new BlockingCompletionHandler(); - eventLoop.execute(new Runnable() { - public void run() { - output.write(buf.getBuffer(), 0, buf.size()); - output.write(Ints.toByteArray(buf.size())); - output.write(magic); - output.flush(null, handler, false); + eventLoop.execute(() -> { + try { + trailer.writeTo(asyncOutputWrapper); + } catch (IOException e) { + // should not happen + throw new AssertionError(e); } + output.write(Ints.toByteArray(trailer.getSerializedSize())); + output.write(magic); + output.flush(null, handler, false); }); return handler.get(); } @Override protected OutputStream getOutputStreamForCellEncoder() { - return buf; + return asyncOutputWrapper; } } -- 2.7.4