diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 8cead2a..219f781 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1210,6 +1210,7 @@ public class RpcServer implements RpcServerInterface { protected SocketChannel channel; private ByteBuffer data; private ByteBuffer dataLengthBuffer; + private boolean isPrepareData = false; protected final ConcurrentLinkedDeque responseQueue = new ConcurrentLinkedDeque(); private final Lock responseWriteLock = new ReentrantLock(); private Counter rpcCount = new Counter(); // number of outstanding rpcs @@ -1341,17 +1342,18 @@ public class RpcServer implements RpcServerInterface { } } - private void saslReadAndProcess(byte[] saslToken) throws IOException, + private void saslReadAndProcess(ByteBuffer saslToken) throws IOException, InterruptedException { if (saslContextEstablished) { if (LOG.isTraceEnabled()) - LOG.trace("Have read input token of size " + saslToken.length + LOG.trace("Have read input token of size " + saslToken.remaining() + " for processing by saslServer.unwrap()"); if (!useWrap) { processOneRpc(saslToken); } else { - byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length); + byte[] plaintextData = + saslServer.unwrap(saslToken.array(), saslToken.position(), saslToken.remaining()); processUnwrappedData(plaintextData); } } else { @@ -1400,10 +1402,18 @@ public class RpcServer implements RpcServerInterface { } } if (LOG.isDebugEnabled()) { - LOG.debug("Have read input token of size " + saslToken.length + LOG.debug("Have read input token of size " + saslToken.remaining() + " for processing by saslServer.evaluateResponse()"); } - replyToken = saslServer.evaluateResponse(saslToken); + + // TODO + // SASL API evaluateResponse(byte[] response) is so hard. + // Because saslToken is a reused buffer, so we can not direct use saslToken.arrray(). + // We have to copy it. Copy will break optimization about reusing buffer. + // Luckily, evaluateResponse just occurs several times. + // We can write well if SASL API will be improved. + replyToken = saslServer.evaluateResponse( + new Bytes(saslToken.array(), saslToken.position(), saslToken.remaining()).copyBytes()); } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1579,9 +1589,7 @@ public class RpcServer implements RpcServerInterface { } } - // We have read a length and we have read the preamble. It is either the connection header - // or it is a request. - if (data == null) { + if (!isPrepareData) { dataLengthBuffer.flip(); int dataLength = dataLengthBuffer.getInt(); if (dataLength == RpcClient.PING_CALL_ID) { @@ -1594,12 +1602,22 @@ public class RpcServer implements RpcServerInterface { throw new IllegalArgumentException("Unexpected data length " + dataLength + "!! from " + getHostAddress()); } - data = ByteBuffer.allocate(dataLength); - // Increment the rpc count. This counter will be decreased when we write - // the response. If we want the connection to be detected as idle properly, we - // need to keep the inc / dec correct. - incRpcCount(); + // We have read a length and we have read the preamble. It is either the connection header + // or it is a request. + // We will reuse buffer of data unless data.capacity is not enough. + if (data == null || (data != null && data.capacity() < dataLength)) { + data = ByteBuffer.allocate(dataLength); + + // Increment the rpc count. This counter will be decreased when we write + // the response. If we want the connection to be detected as idle properly, we + // need to keep the inc / dec correct. + incRpcCount(); + } else { + data.limit(dataLength); + } + + isPrepareData = true; } count = channelRead(channel, data); @@ -1623,14 +1641,18 @@ public class RpcServer implements RpcServerInterface { } if (useSasl) { - saslReadAndProcess(data.array()); + saslReadAndProcess(data); } else { - processOneRpc(data.array()); + processOneRpc(data); } } finally { dataLengthBuffer.clear(); // Clean for the next call - data = null; // For the GC + isPrepareData = false; + + // If buffer is larger we free it to save memory otherwise try to reuse it. + if (data.capacity() >= NIO_BUFFER_LIMIT / 2) data = null; + else data.clear(); } } @@ -1654,8 +1676,10 @@ public class RpcServer implements RpcServerInterface { } // Reads the connection header following version - private void processConnectionHeader(byte[] buf) throws IOException { - this.connectionHeader = ConnectionHeader.parseFrom(buf); + private void processConnectionHeader(ByteBuffer buf) throws IOException { + ConnectionHeader.Builder builder = ConnectionHeader.newBuilder(); + ProtobufUtil.mergeFrom(builder, buf.array(), buf.position(), buf.remaining()); + this.connectionHeader = builder.build(); String serviceName = connectionHeader.getServiceName(); if (serviceName == null) throw new EmptyServiceNameException(); this.service = getService(services, serviceName); @@ -1729,6 +1753,8 @@ public class RpcServer implements RpcServerInterface { private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException { ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf)); + // Reuse data buffer. + unwrappedData = data; // Read all RPCs contained in the inBuf, even partial ones while (true) { int count; @@ -1738,33 +1764,40 @@ public class RpcServer implements RpcServerInterface { return; } - if (unwrappedData == null) { - unwrappedDataLengthBuffer.flip(); - int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); + unwrappedDataLengthBuffer.flip(); + int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); - if (unwrappedDataLength == RpcClient.PING_CALL_ID) { - if (LOG.isDebugEnabled()) - LOG.debug("Received ping message"); - unwrappedDataLengthBuffer.clear(); - continue; // ping message - } + if (unwrappedDataLength == RpcClient.PING_CALL_ID) { + if (LOG.isDebugEnabled()) + LOG.debug("Received ping message"); + unwrappedDataLengthBuffer.clear(); + continue; // ping message + } + + // It's seem impossible but for strict code. + if (unwrappedData.capacity() < unwrappedDataLength) { unwrappedData = ByteBuffer.allocate(unwrappedDataLength); + } else { + unwrappedData.clear(); + unwrappedData.limit(unwrappedDataLength); } - + count = channelRead(ch, unwrappedData); if (count <= 0 || unwrappedData.remaining() > 0) return; - if (unwrappedData.remaining() == 0) { - unwrappedDataLengthBuffer.clear(); + if (unwrappedData.remaining() == 0) { unwrappedData.flip(); - processOneRpc(unwrappedData.array()); - unwrappedData = null; + try { + processOneRpc(unwrappedData); + } finally { + unwrappedDataLengthBuffer.clear(); + } } } } - private void processOneRpc(byte[] buf) throws IOException, InterruptedException { + private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException { if (connectionHeaderRead) { processRequest(buf); } else { @@ -1785,16 +1818,16 @@ public class RpcServer implements RpcServerInterface { * @throws IOException * @throws InterruptedException */ - protected void processRequest(byte[] buf) throws IOException, InterruptedException { - long totalRequestSize = buf.length; + protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { + long totalRequestSize = buf.remaining(); int offset = 0; // Here we read in the header. We avoid having pb // do its default 4k allocation for CodedInputStream. We force it to use backing array. - CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length); + CodedInputStream cis = CodedInputStream.newInstance(buf.array(), buf.position(), buf.remaining()); int headerSize = cis.readRawVarint32(); offset = cis.getTotalBytesRead(); Message.Builder builder = RequestHeader.newBuilder(); - ProtobufUtil.mergeFrom(builder, buf, offset, headerSize); + ProtobufUtil.mergeFrom(builder, buf.array(), offset, headerSize); RequestHeader header = (RequestHeader) builder.build(); offset += headerSize; int id = header.getCallId(); @@ -1825,18 +1858,18 @@ public class RpcServer implements RpcServerInterface { if (md == null) throw new UnsupportedOperationException(header.getMethodName()); builder = this.service.getRequestPrototype(md).newBuilderForType(); // To read the varint, I need an inputstream; might as well be a CIS. - cis = CodedInputStream.newInstance(buf, offset, buf.length); + cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit() - offset); int paramSize = cis.readRawVarint32(); offset += cis.getTotalBytesRead(); if (builder != null) { - ProtobufUtil.mergeFrom(builder, buf, offset, paramSize); + ProtobufUtil.mergeFrom(builder, buf.array(), offset, paramSize); param = builder.build(); } offset += paramSize; } if (header.hasCellBlockMeta()) { cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, - buf, offset, buf.length); + buf.array(), offset, buf.limit() - offset); } } catch (Throwable t) { String msg = getListenerAddress() + " is unable to read call parameter from client " +