.../org/apache/hadoop/hbase/io/ByteBufferPool.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 42 +++++++++++++++++----- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java index e528f02..971c42c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java @@ -140,7 +140,7 @@ public class ByteBufferPool { buffers.offer(buf); } - int getBufferSize() { + public int getBufferSize() { return this.bufferSize; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index dd9bb01..0b437bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -323,6 +324,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected TraceInfo tinfo; private ByteBufferListOutputStream cellBlockStream = null; + private ByteBuffer headerMsgBuf = null; + private User user; private InetAddress remoteAddress; private RpcCallback callback; @@ -364,6 +367,15 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", justification="Presume the lock on processing request held by caller is protection enough") void done() { + if (headerMsgBuf != null) { + if (reservoir != null) { + // Return back the buffer if it was got from the pool + // TODO : Will it be better to manager all these buffers including + // cellBlockStream buffers using a manager? + reservoir.putbackBuffer(headerMsgBuf); + } + headerMsgBuf = null; + } if (this.cellBlockStream != null) { this.cellBlockStream.releaseResources();// This will return back the BBs which we // got from pool. @@ -460,7 +472,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } Message header = headerBuilder.build(); - byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize); + ByteBuffer b = createHeaderAndMessageBytes(result, header, cellBlockSize); ByteBuffer[] responseBufs = null; int cellBlockBufferSize = 0; if (cellBlock != null) { @@ -469,7 +481,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } else { responseBufs = new ByteBuffer[1]; } - responseBufs[0] = ByteBuffer.wrap(b); + responseBufs[0] = b; if (cellBlock != null) { for (int i = 0; i < cellBlockBufferSize; i++) { responseBufs[i + 1] = cellBlock.get(i); @@ -513,7 +525,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setException(exceptionBuilder.build()); } - private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize) + private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize) throws IOException { // Organize the response as a set of bytebuffers rather than collect it all together inside // one big byte array; save on allocations. @@ -532,14 +544,23 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { + (resultSerializedSize + resultVintSize) + cellBlockSize; // The byte[] should also hold the totalSize of the header, message and the cellblock - byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize - + resultVintSize + Bytes.SIZEOF_INT]; + int bufSize = headerSerializedSize + headerVintSize + resultSerializedSize + + resultVintSize + Bytes.SIZEOF_INT; + ByteBuffer buf = null; + if (reservoir != null && reservoir.getBufferSize() >= bufSize) { + buf = reservoir.getBuffer(); + } + if (buf == null) { + // allocate on heap if reservoir does not return any buffer + buf = ByteBuffer.allocate(bufSize); + } else { + buf.limit(bufSize); + } // The RpcClient expects the int to be in a format that code be decoded by // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int) // form of writing int. - Bytes.putInt(b, 0, totalSize); - CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT, - b.length - Bytes.SIZEOF_INT); + ByteBufferUtils.putInt(buf, totalSize); + CodedOutputStream cos = CodedOutputStream.newInstance(buf); if (header != null) { cos.writeMessageNoTag(header); } @@ -547,8 +568,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { cos.writeMessageNoTag(result); } cos.flush(); + System.out.println("Am i coming here"); cos.checkNoSpaceLeft(); - return b; + System.out.println("Am i coming here so space is left over"); + buf.flip(); + return buf; } private BufferChain wrapWithSasl(BufferChain bc)