diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 8cead2a..c28f120 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -24,6 +24,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.lang.ref.SoftReference; import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -180,6 +181,8 @@ public class RpcServer implements RpcServerInterface { private static final int DEFAULT_WARN_DELAYED_CALLS = 1000; + private static final int READ_REQ_REUSED_BUF_SIZE = 2 * 1024; + private final int warnDelayedCalls; private AtomicInteger delayedCalls; @@ -1210,6 +1213,7 @@ public class RpcServer implements RpcServerInterface { protected SocketChannel channel; private ByteBuffer data; private ByteBuffer dataLengthBuffer; + private SoftReference reuseReadBufRef; protected final ConcurrentLinkedDeque responseQueue = new ConcurrentLinkedDeque(); private final Lock responseWriteLock = new ReentrantLock(); private Counter rpcCount = new Counter(); // number of outstanding rpcs @@ -1234,9 +1238,6 @@ public class RpcServer implements RpcServerInterface { private AuthMethod authMethod; private boolean saslContextEstablished; private boolean skipInitialSaslHandshake; - private ByteBuffer unwrappedData; - // When is this set? FindBugs wants to know! Says NP - private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4); boolean useSasl; SaslServer saslServer; private boolean useWrap = false; @@ -1259,6 +1260,8 @@ public class RpcServer implements RpcServerInterface { this.lastContact = lastContact; this.data = null; this.dataLengthBuffer = ByteBuffer.allocate(4); + this.reuseReadBufRef = + new SoftReference(ByteBuffer.allocate(READ_REQ_REUSED_BUF_SIZE)); this.socket = channel.socket(); this.addr = socket.getInetAddress(); if (addr == null) { @@ -1341,17 +1344,18 @@ public class RpcServer implements RpcServerInterface { } } - private void saslReadAndProcess(byte[] saslToken) throws IOException, + private void saslReadAndProcess(ByteBuffer saslToken) throws IOException, InterruptedException { if (saslContextEstablished) { if (LOG.isTraceEnabled()) - LOG.trace("Have read input token of size " + saslToken.length + LOG.trace("Have read input token of size " + saslToken.remaining() + " for processing by saslServer.unwrap()"); if (!useWrap) { processOneRpc(saslToken); } else { - byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length); + byte[] plaintextData = + saslServer.unwrap(saslToken.array(), saslToken.position(), saslToken.remaining()); processUnwrappedData(plaintextData); } } else { @@ -1400,10 +1404,12 @@ public class RpcServer implements RpcServerInterface { } } if (LOG.isDebugEnabled()) { - LOG.debug("Have read input token of size " + saslToken.length + LOG.debug("Have read input token of size " + saslToken.remaining() + " for processing by saslServer.evaluateResponse()"); } - replyToken = saslServer.evaluateResponse(saslToken); + + replyToken = saslServer.evaluateResponse( + new Bytes(saslToken.array(), saslToken.position(), saslToken.remaining()).copyBytes()); } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1579,8 +1585,6 @@ public class RpcServer implements RpcServerInterface { } } - // We have read a length and we have read the preamble. It is either the connection header - // or it is a request. if (data == null) { dataLengthBuffer.flip(); int dataLength = dataLengthBuffer.getInt(); @@ -1594,8 +1598,23 @@ public class RpcServer implements RpcServerInterface { throw new IllegalArgumentException("Unexpected data length " + dataLength + "!! from " + getHostAddress()); } - data = ByteBuffer.allocate(dataLength); + // We have read a length and we have read the preamble. It is either the connection header + // or it is a request. + // We will reuse buffer unless request is too large. + if (READ_REQ_REUSED_BUF_SIZE < dataLength) { + data = ByteBuffer.allocate(dataLength); + } else { + data = reuseReadBufRef.get(); + if (data == null) { + // Collected by GC because memory was not enough in that time. + // So we allocate a new reused buffer for request. + data = ByteBuffer.allocate(READ_REQ_REUSED_BUF_SIZE); + reuseReadBufRef = new SoftReference(data); + } + data.clear(); + data.limit(dataLength); + } // Increment the rpc count. This counter will be decreased when we write // the response. If we want the connection to be detected as idle properly, we // need to keep the inc / dec correct. @@ -1623,14 +1642,16 @@ public class RpcServer implements RpcServerInterface { } if (useSasl) { - saslReadAndProcess(data.array()); + saslReadAndProcess(data); } else { - processOneRpc(data.array()); + processOneRpc(data); } } finally { dataLengthBuffer.clear(); // Clean for the next call - data = null; // For the GC + // Freeing the ByteBuffer. + // If ByteBuffer came from reuseReadBufRef.get(), it's not be freed in fact. + data = null; } } @@ -1654,8 +1675,10 @@ public class RpcServer implements RpcServerInterface { } // Reads the connection header following version - private void processConnectionHeader(byte[] buf) throws IOException { - this.connectionHeader = ConnectionHeader.parseFrom(buf); + private void processConnectionHeader(ByteBuffer buf) throws IOException { + ConnectionHeader.Builder builder = ConnectionHeader.newBuilder(); + ProtobufUtil.mergeFrom(builder, buf.array(), buf.position(), buf.remaining()); + this.connectionHeader = builder.build(); String serviceName = connectionHeader.getServiceName(); if (serviceName == null) throw new EmptyServiceNameException(); this.service = getService(services, serviceName); @@ -1729,42 +1752,47 @@ public class RpcServer implements RpcServerInterface { private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException { ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf)); + dataLengthBuffer.clear(); // Read all RPCs contained in the inBuf, even partial ones while (true) { int count; - if (unwrappedDataLengthBuffer.remaining() > 0) { - count = channelRead(ch, unwrappedDataLengthBuffer); - if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0) - return; + + count = read4Bytes(); + if (count <= 0 || dataLengthBuffer.remaining() > 0) { + return; } - if (unwrappedData == null) { - unwrappedDataLengthBuffer.flip(); - int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); + dataLengthBuffer.flip(); + int dataLength = dataLengthBuffer.getInt(); - if (unwrappedDataLength == RpcClient.PING_CALL_ID) { - if (LOG.isDebugEnabled()) - LOG.debug("Received ping message"); - unwrappedDataLengthBuffer.clear(); - continue; // ping message - } - unwrappedData = ByteBuffer.allocate(unwrappedDataLength); + if (dataLength == RpcClient.PING_CALL_ID) { + if (LOG.isDebugEnabled()) + LOG.debug("Received ping message"); + dataLengthBuffer.clear(); + continue; // ping message } - count = channelRead(ch, unwrappedData); - if (count <= 0 || unwrappedData.remaining() > 0) + if (data.capacity() < dataLength) { + data = ByteBuffer.allocate(dataLength); + } + + data.clear(); + data.limit(dataLength); + + count = channelRead(ch, data); + if (count <= 0 || data.remaining() > 0) { return; + } - if (unwrappedData.remaining() == 0) { - unwrappedDataLengthBuffer.clear(); - unwrappedData.flip(); - processOneRpc(unwrappedData.array()); - unwrappedData = null; + if (data.remaining() == 0) { + data.flip(); + dataLengthBuffer.clear(); + processOneRpc(data); } } } - private void processOneRpc(byte[] buf) throws IOException, InterruptedException { + private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException { if (connectionHeaderRead) { processRequest(buf); } else { @@ -1785,18 +1813,20 @@ public class RpcServer implements RpcServerInterface { * @throws IOException * @throws InterruptedException */ - protected void processRequest(byte[] buf) throws IOException, InterruptedException { - long totalRequestSize = buf.length; - int offset = 0; + protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { + int totalRequestSize = buf.remaining(); + int initPos = buf.position(); // Here we read in the header. We avoid having pb // do its default 4k allocation for CodedInputStream. We force it to use backing array. - CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length); + CodedInputStream cis = + CodedInputStream.newInstance(buf.array(), buf.position(), buf.remaining()); int headerSize = cis.readRawVarint32(); - offset = cis.getTotalBytesRead(); + buf.position(initPos + cis.getTotalBytesRead()); Message.Builder builder = RequestHeader.newBuilder(); - ProtobufUtil.mergeFrom(builder, buf, offset, headerSize); + builder.mergeFrom(buf.array(), buf.position(), headerSize); RequestHeader header = (RequestHeader) builder.build(); - offset += headerSize; + cis.skipRawBytes(headerSize); + buf.position(initPos + cis.getTotalBytesRead()); int id = header.getCallId(); if (LOG.isTraceEnabled()) { LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) + @@ -1824,19 +1854,17 @@ public class RpcServer implements RpcServerInterface { md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); if (md == null) throw new UnsupportedOperationException(header.getMethodName()); builder = this.service.getRequestPrototype(md).newBuilderForType(); - // To read the varint, I need an inputstream; might as well be a CIS. - cis = CodedInputStream.newInstance(buf, offset, buf.length); int paramSize = cis.readRawVarint32(); - offset += cis.getTotalBytesRead(); + buf.position(initPos + cis.getTotalBytesRead()); if (builder != null) { - ProtobufUtil.mergeFrom(builder, buf, offset, paramSize); + builder.mergeFrom(buf.array(), buf.position(), paramSize); param = builder.build(); } - offset += paramSize; + buf.position(buf.position() + paramSize); } if (header.hasCellBlockMeta()) { cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, - buf, offset, buf.length); + buf.array(), buf.position(), buf.remaining()); } } catch (Throwable t) { String msg = getListenerAddress() + " is unable to read call parameter from client " + @@ -1863,7 +1891,6 @@ public class RpcServer implements RpcServerInterface { responder.doRespond(readParamsFailedCall); return; } - TraceInfo traceInfo = header.hasTraceInfo() ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()) : null;