diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 8cead2a..93bd040 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1210,6 +1210,7 @@ public class RpcServer implements RpcServerInterface { protected SocketChannel channel; private ByteBuffer data; private ByteBuffer dataLengthBuffer; + private boolean isAllocateData = false; protected final ConcurrentLinkedDeque responseQueue = new ConcurrentLinkedDeque(); private final Lock responseWriteLock = new ReentrantLock(); private Counter rpcCount = new Counter(); // number of outstanding rpcs @@ -1253,7 +1254,7 @@ public class RpcServer implements RpcServerInterface { new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null); public UserGroupInformation attemptingUser = null; // user name before auth - + public Connection(SocketChannel channel, long lastContact) { this.channel = channel; this.lastContact = lastContact; @@ -1578,10 +1579,8 @@ public class RpcServer implements RpcServerInterface { return count; } } - - // We have read a length and we have read the preamble. It is either the connection header - // or it is a request. - if (data == null) { + + if (!isAllocateData) { dataLengthBuffer.flip(); int dataLength = dataLengthBuffer.getInt(); if (dataLength == RpcClient.PING_CALL_ID) { @@ -1594,12 +1593,21 @@ public class RpcServer implements RpcServerInterface { throw new IllegalArgumentException("Unexpected data length " + dataLength + "!! from " + getHostAddress()); } - data = ByteBuffer.allocate(dataLength); - - // Increment the rpc count. This counter will be decreased when we write - // the response. If we want the connection to be detected as idle properly, we - // need to keep the inc / dec correct. - incRpcCount(); + + // We have read a length and we have read the preamble. It is either the connection header + // or it is a request. + // We will reuse buffer of data unless data.capacity is not enough. + if (data == null || (data != null && data.capacity() < dataLength)) { + data = ByteBuffer.allocate(dataLength); + + // Increment the rpc count. This counter will be decreased when we write + // the response. If we want the connection to be detected as idle properly, we + // need to keep the inc / dec correct. + incRpcCount(); + } else { + data.limit(dataLength); + } + isAllocateData = true; } count = channelRead(channel, data); @@ -1630,7 +1638,11 @@ public class RpcServer implements RpcServerInterface { } finally { dataLengthBuffer.clear(); // Clean for the next call - data = null; // For the GC + isAllocateData = false; + + // If buffer is larger we free it to save memory otherwise try to reuse it. + if (data.capacity() >= NIO_BUFFER_LIMIT / 2) data = null; + else data.clear(); } }