.../hbase/io/ByteBufferListOutputStream.java | 18 +++++ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 76 +++++++++++++++++----- 2 files changed, 79 insertions(+), 15 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java index b4c00c6..ef1a79a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java @@ -170,4 +170,22 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream { } } } + + /** + * Sees if the currentBuffer can accommodate the given length. If so return the current buffer, if + * not null. + * @param length to be written + * @return the curBuf if the currentBuffer can accommodate the length if not return null; + */ + public ByteBuffer canWriteLenInCurrentBuffer(int length) { + if (curBuf != null) { + // in case getByteBuffers was not called. In the current usage we don't have this case + // of calling this method before getByteBuffers + int remain = lastBufFlipped ? curBuf.remaining() : curBuf.position(); + if (remain + length <= this.pool.getBufferSize()) { + return curBuf; + } + } + return null; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index b026475..48053ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -441,8 +442,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec, this.connection.compressionCodec, cells, reservoir); if (this.cellBlockStream != null) { - cellBlock = this.cellBlockStream.getByteBuffers(); - cellBlockSize = this.cellBlockStream.size(); + cellBlock = cellBlockStream.getByteBuffers(); + cellBlockSize = cellBlockStream.size(); } } else { ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec, @@ -461,7 +462,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } Message header = headerBuilder.build(); - byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize); + ByteBuffer headerBuf = + createHeaderAndMessageBytes(result, header, cellBlockSize); ByteBuffer[] responseBufs = null; int cellBlockBufferSize = 0; if (cellBlock != null) { @@ -470,7 +472,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } else { responseBufs = new ByteBuffer[1]; } - responseBufs[0] = ByteBuffer.wrap(b); + responseBufs[0] = headerBuf; if (cellBlock != null) { for (int i = 0; i < cellBlockBufferSize; i++) { responseBufs[i + 1] = cellBlock.get(i); @@ -514,8 +516,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setException(exceptionBuilder.build()); } - private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize) - throws IOException { + private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, + int cellBlockSize) throws IOException { // Organize the response as a set of bytebuffers rather than collect it all together inside // one big byte array; save on allocations. int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, @@ -532,15 +534,58 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { int totalSize = headerSerializedSize + headerVintSize + (resultSerializedSize + resultVintSize) + cellBlockSize; - // The byte[] should also hold the totalSize of the header, message and the cellblock - byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize - + resultVintSize + Bytes.SIZEOF_INT]; - // The RpcClient expects the int to be in a format that code be decoded by - // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int) - // form of writing int. - Bytes.putInt(b, 0, totalSize); - CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT, - b.length - Bytes.SIZEOF_INT); + int headerSize = headerSerializedSize + headerVintSize + resultSerializedSize + + resultVintSize + Bytes.SIZEOF_INT; + if (cellBlockSize > 0 && cellBlockStream != null) { + // Only if the current buffer has enough space for header use it. Else allocate + // a new buffer + ByteBuffer headerBuf = cellBlockStream.canWriteLenInCurrentBuffer(headerSize); + int limit = -1; + if (headerBuf == null) { + // allocate new one because the pool will have to allocate a new curBuf of 64K + // to accommodate this header + return createHeaderAndMessageWithOnheapBuffer(result, header, totalSize, headerSize); + } else { + headerBuf.mark(); + // the current limit + limit = headerBuf.limit(); + // Position such that we write the header to the end of the curBuf + headerBuf.position(limit); + // limit to the header size + headerBuf.limit(headerSize + limit); + ByteBufferUtils.putInt(headerBuf, totalSize); + // create COS that works on BB + CodedOutputStream cos = CodedOutputStream.newInstance(headerBuf); + if (header != null) { + cos.writeMessageNoTag(header); + } + if (result != null) { + cos.writeMessageNoTag(result); + } + cos.flush(); + cos.checkNoSpaceLeft(); + // move back to the starting position + headerBuf.position(limit); + // one object creation can't be avoided!!!! + // this duplicate() ensures that only the header portion is used as an individual BB + ByteBuffer res = headerBuf.duplicate(); + headerBuf.reset(); + // limit the headerBuf to the original limit such that the header is not read + headerBuf.limit(limit); + return res; + } + } else { + // TODO : Use the pool for non PB cases also + // The byte[] should also hold the totalSize of the header, message and the cellblock + return createHeaderAndMessageWithOnheapBuffer(result, header, totalSize, headerSize); + } + } + + private ByteBuffer createHeaderAndMessageWithOnheapBuffer(Message result, Message header, + int totalSize, int headerSize) throws IOException { + ByteBuffer b = ByteBuffer.allocate(headerSize); + ByteBufferUtils.putInt(b, totalSize); + CodedOutputStream cos = CodedOutputStream.newInstance(b); if (header != null) { cos.writeMessageNoTag(header); } @@ -549,6 +594,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } cos.flush(); cos.checkNoSpaceLeft(); + b.flip(); return b; }