.../hbase/io/ByteBufferListOutputStream.java | 4 ++ .../org/apache/hadoop/hbase/io/ByteBufferPool.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 75 +++++++++++++++++----- 3 files changed, 65 insertions(+), 16 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java index b4c00c6..c334a5a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java @@ -134,6 +134,10 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream { throw new UnsupportedOperationException(); } + /** + * We can be assured that the buffers returned by this method are all flipped + * @return list of bytebuffers + */ public List getByteBuffers() { if (!this.lastBufFlipped) { this.lastBufFlipped = true; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java index e528f02..971c42c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java @@ -140,7 +140,7 @@ public class ByteBufferPool { buffers.offer(buf); } - int getBufferSize() { + public int getBufferSize() { return this.bufferSize; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index b026475..3c97b09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -441,8 +442,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec, this.connection.compressionCodec, cells, reservoir); if (this.cellBlockStream != null) { - cellBlock = this.cellBlockStream.getByteBuffers(); - cellBlockSize = this.cellBlockStream.size(); + cellBlock = cellBlockStream.getByteBuffers(); + cellBlockSize = cellBlockStream.size(); } } else { ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec, @@ -461,7 +462,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } Message header = headerBuilder.build(); - byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize); + ByteBuffer headerBuf = + createHeaderAndMessageBytes(result, header, cellBlockSize, cellBlock); ByteBuffer[] responseBufs = null; int cellBlockBufferSize = 0; if (cellBlock != null) { @@ -470,7 +472,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } else { responseBufs = new ByteBuffer[1]; } - responseBufs[0] = ByteBuffer.wrap(b); + responseBufs[0] = headerBuf; if (cellBlock != null) { for (int i = 0; i < cellBlockBufferSize; i++) { responseBufs[i + 1] = cellBlock.get(i); @@ -514,8 +516,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setException(exceptionBuilder.build()); } - private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize) - throws IOException { + private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, + int cellBlockSize, List cellBlock) throws IOException { // Organize the response as a set of bytebuffers rather than collect it all together inside // one big byte array; save on allocations. int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, @@ -532,15 +534,57 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { int totalSize = headerSerializedSize + headerVintSize + (resultSerializedSize + resultVintSize) + cellBlockSize; - // The byte[] should also hold the totalSize of the header, message and the cellblock - byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize - + resultVintSize + Bytes.SIZEOF_INT]; - // The RpcClient expects the int to be in a format that code be decoded by - // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int) - // form of writing int. - Bytes.putInt(b, 0, totalSize); - CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT, - b.length - Bytes.SIZEOF_INT); + int headerSize = headerSerializedSize + headerVintSize + resultSerializedSize + + resultVintSize + Bytes.SIZEOF_INT; + if (cellBlockSize > 0 && cellBlockStream != null && cellBlock != null) { + // Only if the last buffer has enough space for header use it. Else allocate + // a new buffer. Assume they are all flipped + ByteBuffer headerBuf = cellBlock.get(cellBlock.size() - 1); + if (headerBuf.remaining() + headerSize <= reservoir.getBufferSize()) { + headerBuf.mark(); + // the current limit + int limit = headerBuf.limit(); + // Position such that we write the header to the end of the curBuf + headerBuf.position(limit); + // limit to the header size + headerBuf.limit(headerSize + limit); + ByteBufferUtils.putInt(headerBuf, totalSize); + // create COS that works on BB + CodedOutputStream cos = CodedOutputStream.newInstance(headerBuf); + if (header != null) { + cos.writeMessageNoTag(header); + } + if (result != null) { + cos.writeMessageNoTag(result); + } + cos.flush(); + cos.checkNoSpaceLeft(); + // move back to the starting position + headerBuf.position(limit); + // one object creation can't be avoided!!!! + // this duplicate() ensures that only the header portion is used as an individual BB + ByteBuffer res = headerBuf.duplicate(); + headerBuf.reset(); + // limit the headerBuf to the original limit such that the header is not read + headerBuf.limit(limit); + return res; + } else { + // allocate new one because the pool will have to allocate a new curBuf of 64K + // to accommodate this header + return createHeaderAndMessageWithOnheapBuffer(result, header, totalSize, headerSize); + } + } else { + // TODO : Use the pool for non PB cases also + // The byte[] should also hold the totalSize of the header, message and the cellblock + return createHeaderAndMessageWithOnheapBuffer(result, header, totalSize, headerSize); + } + } + + private ByteBuffer createHeaderAndMessageWithOnheapBuffer(Message result, Message header, + int totalSize, int headerSize) throws IOException { + ByteBuffer b = ByteBuffer.allocate(headerSize); + ByteBufferUtils.putInt(b, totalSize); + CodedOutputStream cos = CodedOutputStream.newInstance(b); if (header != null) { cos.writeMessageNoTag(header); } @@ -549,6 +593,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } cos.flush(); cos.checkNoSpaceLeft(); + b.flip(); return b; }