.../hbase/io/ByteBufferListOutputStream.java | 4 ++ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 63 +++++++++++++++++----- 2 files changed, 53 insertions(+), 14 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java index b4c00c6..c334a5a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java @@ -134,6 +134,10 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream { throw new UnsupportedOperationException(); } + /** + * We can be assured that the buffers returned by this method are all flipped + * @return list of bytebuffers + */ public List getByteBuffers() { if (!this.lastBufFlipped) { this.lastBufFlipped = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index b026475..4a00da6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -460,8 +461,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { cellBlockBuilder.setLength(cellBlockSize); headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } + ByteBuffer possiblePBBuf = + (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; Message header = headerBuilder.build(); - byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize); + ByteBuffer headerBuf = + createHeaderAndMessageBytes(result, header, cellBlockSize, possiblePBBuf); ByteBuffer[] responseBufs = null; int cellBlockBufferSize = 0; if (cellBlock != null) { @@ -470,7 +474,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } else { responseBufs = new ByteBuffer[1]; } - responseBufs[0] = ByteBuffer.wrap(b); + responseBufs[0] = headerBuf; if (cellBlock != null) { for (int i = 0; i < cellBlockBufferSize; i++) { responseBufs[i + 1] = cellBlock.get(i); @@ -514,8 +518,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setException(exceptionBuilder.build()); } - private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize) - throws IOException { + private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, + int cellBlockSize, ByteBuffer possiblePBBuf) throws IOException { // Organize the response as a set of bytebuffers rather than collect it all together inside // one big byte array; save on allocations. int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, @@ -532,15 +536,39 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { int totalSize = headerSerializedSize + headerVintSize + (resultSerializedSize + resultVintSize) + cellBlockSize; - // The byte[] should also hold the totalSize of the header, message and the cellblock - byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize - + resultVintSize + Bytes.SIZEOF_INT]; - // The RpcClient expects the int to be in a format that code be decoded by - // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int) - // form of writing int. - Bytes.putInt(b, 0, totalSize); - CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT, - b.length - Bytes.SIZEOF_INT); + int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize + + resultVintSize + Bytes.SIZEOF_INT; + if (possiblePBBuf != null) { + // Only if the last buffer has enough space for header use it. Else allocate + // a new buffer. Assume they are all flipped + if (possiblePBBuf.remaining() + totalPBSize <= possiblePBBuf.capacity()) { + // duplicate the buffer. This is where the header is going to be written + ByteBuffer pbBuf = possiblePBBuf.duplicate(); + // get the current limit + int limit = pbBuf.limit(); + // Position such that we write the header to the end of the buffer + pbBuf.position(limit); + // limit to the header size + pbBuf.limit(totalPBSize + limit); + // mark the current position + pbBuf.mark(); + writeToCOS(result, header, totalSize, pbBuf); + // reset the buffer back to old position + pbBuf.reset(); + return pbBuf; + } else { + return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); + } + } else { + return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); + } + } + + private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) + throws IOException { + ByteBufferUtils.putInt(pbBuf, totalSize); + // create COS that works on BB + CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf); if (header != null) { cos.writeMessageNoTag(header); } @@ -549,7 +577,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } cos.flush(); cos.checkNoSpaceLeft(); - return b; + } + + private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, + int totalSize, int totalPBSize) throws IOException { + ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize); + writeToCOS(result, header, totalSize, pbBuf); + pbBuf.flip(); + return pbBuf; } private BufferChain wrapWithSasl(BufferChain bc)