diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 8cead2a..fc79548 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1210,6 +1210,7 @@ public class RpcServer implements RpcServerInterface { protected SocketChannel channel; private ByteBuffer data; private ByteBuffer dataLengthBuffer; + private boolean isAllocateData = false; protected final ConcurrentLinkedDeque responseQueue = new ConcurrentLinkedDeque(); private final Lock responseWriteLock = new ReentrantLock(); private Counter rpcCount = new Counter(); // number of outstanding rpcs @@ -1341,17 +1342,18 @@ public class RpcServer implements RpcServerInterface { } } - private void saslReadAndProcess(byte[] saslToken) throws IOException, + private void saslReadAndProcess(ByteBuffer saslToken) throws IOException, InterruptedException { if (saslContextEstablished) { if (LOG.isTraceEnabled()) - LOG.trace("Have read input token of size " + saslToken.length + LOG.trace("Have read input token of size " + saslToken.limit() + " for processing by saslServer.unwrap()"); if (!useWrap) { processOneRpc(saslToken); } else { - byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length); + byte[] plaintextData = + saslServer.unwrap(saslToken.array(), saslToken.position(), saslToken.limit()); processUnwrappedData(plaintextData); } } else { @@ -1400,10 +1402,18 @@ public class RpcServer implements RpcServerInterface { } } if (LOG.isDebugEnabled()) { - LOG.debug("Have read input token of size " + saslToken.length + LOG.debug("Have read input token of size " + saslToken.limit() + " for processing by saslServer.evaluateResponse()"); } - replyToken = saslServer.evaluateResponse(saslToken); + + // TODO + // SASL API evaluateResponse(byte[] response) is so hard. + // Because saslToken is a reused buffer, so we can not direct use saslToken.arrray(). + // We have to copy it.Copy will break reuse of optimization. + // Luckily, evaluateResponse only occurs several times. + // We can write well if SASL API will be improved. + replyToken = saslServer.evaluateResponse( + new Bytes(saslToken.array(), saslToken.position(), saslToken.limit()).copyBytes()); } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1579,9 +1589,7 @@ public class RpcServer implements RpcServerInterface { } } - // We have read a length and we have read the preamble. It is either the connection header - // or it is a request. - if (data == null) { + if (!isAllocateData) { dataLengthBuffer.flip(); int dataLength = dataLengthBuffer.getInt(); if (dataLength == RpcClient.PING_CALL_ID) { @@ -1594,12 +1602,22 @@ public class RpcServer implements RpcServerInterface { throw new IllegalArgumentException("Unexpected data length " + dataLength + "!! from " + getHostAddress()); } - data = ByteBuffer.allocate(dataLength); - // Increment the rpc count. This counter will be decreased when we write - // the response. If we want the connection to be detected as idle properly, we - // need to keep the inc / dec correct. - incRpcCount(); + // We have read a length and we have read the preamble. It is either the connection header + // or it is a request. + // We will reuse buffer of data unless data.capacity is not enough. + if (data == null || (data != null && data.capacity() < dataLength)) { + data = ByteBuffer.allocate(dataLength); + + // Increment the rpc count. This counter will be decreased when we write + // the response. If we want the connection to be detected as idle properly, we + // need to keep the inc / dec correct. + incRpcCount(); + } else { + data.limit(dataLength); + } + + isAllocateData = true; } count = channelRead(channel, data); @@ -1623,14 +1641,18 @@ public class RpcServer implements RpcServerInterface { } if (useSasl) { - saslReadAndProcess(data.array()); + saslReadAndProcess(data); } else { - processOneRpc(data.array()); + processOneRpc(data); } } finally { dataLengthBuffer.clear(); // Clean for the next call - data = null; // For the GC + isAllocateData = false; + + // If buffer is larger we free it to save memory otherwise try to reuse it. + if (data.capacity() >= NIO_BUFFER_LIMIT / 2) data = null; + else data.clear(); } } @@ -1654,8 +1676,10 @@ public class RpcServer implements RpcServerInterface { } // Reads the connection header following version - private void processConnectionHeader(byte[] buf) throws IOException { - this.connectionHeader = ConnectionHeader.parseFrom(buf); + private void processConnectionHeader(ByteBuffer buf) throws IOException { + ConnectionHeader.Builder builder = ConnectionHeader.newBuilder(); + ProtobufUtil.mergeFrom(builder, buf.array(), buf.position(), buf.limit()); + this.connectionHeader = builder.build(); String serviceName = connectionHeader.getServiceName(); if (serviceName == null) throw new EmptyServiceNameException(); this.service = getService(services, serviceName); @@ -1758,13 +1782,13 @@ public class RpcServer implements RpcServerInterface { if (unwrappedData.remaining() == 0) { unwrappedDataLengthBuffer.clear(); unwrappedData.flip(); - processOneRpc(unwrappedData.array()); + processOneRpc(unwrappedData); unwrappedData = null; } } } - private void processOneRpc(byte[] buf) throws IOException, InterruptedException { + private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException { if (connectionHeaderRead) { processRequest(buf); } else { @@ -1785,16 +1809,16 @@ public class RpcServer implements RpcServerInterface { * @throws IOException * @throws InterruptedException */ - protected void processRequest(byte[] buf) throws IOException, InterruptedException { - long totalRequestSize = buf.length; + protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { + long totalRequestSize = buf.limit(); int offset = 0; // Here we read in the header. We avoid having pb // do its default 4k allocation for CodedInputStream. We force it to use backing array. - CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length); + CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit()); int headerSize = cis.readRawVarint32(); offset = cis.getTotalBytesRead(); Message.Builder builder = RequestHeader.newBuilder(); - ProtobufUtil.mergeFrom(builder, buf, offset, headerSize); + ProtobufUtil.mergeFrom(builder, buf.array(), offset, headerSize); RequestHeader header = (RequestHeader) builder.build(); offset += headerSize; int id = header.getCallId(); @@ -1825,18 +1849,18 @@ public class RpcServer implements RpcServerInterface { if (md == null) throw new UnsupportedOperationException(header.getMethodName()); builder = this.service.getRequestPrototype(md).newBuilderForType(); // To read the varint, I need an inputstream; might as well be a CIS. - cis = CodedInputStream.newInstance(buf, offset, buf.length); + cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit()); int paramSize = cis.readRawVarint32(); offset += cis.getTotalBytesRead(); if (builder != null) { - ProtobufUtil.mergeFrom(builder, buf, offset, paramSize); + ProtobufUtil.mergeFrom(builder, buf.array(), offset, paramSize); param = builder.build(); } offset += paramSize; } if (header.hasCellBlockMeta()) { cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, - buf, offset, buf.length); + buf.array(), offset, buf.limit()); } } catch (Throwable t) { String msg = getListenerAddress() + " is unable to read call parameter from client " +