From 78b52744a623a1310178d5c8d926ebc351f311a4 Mon Sep 17 00:00:00 2001 From: Sean Busbey Date: Sat, 25 Aug 2018 21:28:03 -0500 Subject: [PATCH] HBASE-21061 Fix inconsistent synchronization in RpcServer move variables that we don't need synchronized access to out of the critical block. --- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 153 +++++++++++---------- 1 file changed, 82 insertions(+), 71 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 7d7dd9dc17..0a1fb44ce8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1640,7 +1640,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * @throws IOException * @throws InterruptedException */ - public synchronized int readAndProcess() throws IOException, InterruptedException { + public int readAndProcess() throws IOException, InterruptedException { // If we have not read the connection setup preamble, look to see if that is on the wire. if (!connectionPreambleRead) { int count = readPreamble(); @@ -1668,85 +1668,96 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } - // We have read a length and we have read the preamble. It is either the connection header - // or it is a request. - if (data == null) { - dataLengthBuffer.flip(); - int dataLength = dataLengthBuffer.getInt(); - if (dataLength == RpcClient.PING_CALL_ID) { - if (!useWrap) { //covers the !useSasl too - dataLengthBuffer.clear(); - return 0; //ping message + final boolean useWrap = this.useWrap; + final BlockingService service = this.service; + final boolean headerAndPreambleRead = connectionHeaderRead && connectionPreambleRead; + final boolean canUseRequestTooBig = headerAndPreambleRead ? + VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(), + RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION) + : false; + + // we're guarding against data being modified concurrently + // while trying to keep other instance members out of the block + synchronized(this) { + // We have read a length and we have read the preamble. It is either the connection header + // or it is a request. + if (data == null) { + dataLengthBuffer.flip(); + int dataLength = dataLengthBuffer.getInt(); + if (dataLength == RpcClient.PING_CALL_ID) { + if (!useWrap) { //covers the !useSasl too + dataLengthBuffer.clear(); + return 0; //ping message + } + } + if (dataLength < 0) { // A data length of zero is legal. + throw new DoNotRetryIOException("Unexpected data length " + + dataLength + "!! from " + getHostAddress()); } - } - if (dataLength < 0) { // A data length of zero is legal. - throw new DoNotRetryIOException("Unexpected data length " - + dataLength + "!! from " + getHostAddress()); - } - - if (dataLength > maxRequestSize) { - String msg = "RPC data length of " + dataLength + " received from " - + getHostAddress() + " is greater than max allowed " - + maxRequestSize + ". Set \"" + MAX_REQUEST_SIZE - + "\" on server to override this limit (not recommended)"; - LOG.warn(msg); - if (connectionHeaderRead && connectionPreambleRead) { - incRpcCount(); - // Construct InputStream for the non-blocking SocketChannel - // We need the InputStream because we want to read only the request header - // instead of the whole rpc. - final ByteBuffer buf = ByteBuffer.allocate(1); - InputStream is = new InputStream() { - @Override - public int read() throws IOException { - channelRead(channel, buf); - buf.flip(); - int x = buf.get(); - buf.flip(); - return x; + if (dataLength > maxRequestSize) { + String msg = "RPC data length of " + dataLength + " received from " + + getHostAddress() + " is greater than max allowed " + + maxRequestSize + ". Set \"" + MAX_REQUEST_SIZE + + "\" on server to override this limit (not recommended)"; + LOG.warn(msg); + + if (headerAndPreambleRead) { + incRpcCount(); + // Construct InputStream for the non-blocking SocketChannel + // We need the InputStream because we want to read only the request header + // instead of the whole rpc. + final ByteBuffer buf = ByteBuffer.allocate(1); + InputStream is = new InputStream() { + @Override + public int read() throws IOException { + channelRead(channel, buf); + buf.flip(); + int x = buf.get(); + buf.flip(); + return x; + } + }; + CodedInputStream cis = CodedInputStream.newInstance(is); + int headerSize = cis.readRawVarint32(); + Message.Builder builder = RequestHeader.newBuilder(); + ProtobufUtil.mergeFrom(builder, cis, headerSize); + RequestHeader header = (RequestHeader) builder.build(); + + // Notify the client about the offending request + Call reqTooBig = new Call(header.getCallId(), service, null, null, null, + null, this, responder, 0, null, this.addr,0); + metrics.exception(REQUEST_TOO_BIG_EXCEPTION); + // Make sure the client recognizes the underlying exception + // Otherwise, throw a DoNotRetryIOException. + if (canUseRequestTooBig) { + setupResponse(null, reqTooBig, REQUEST_TOO_BIG_EXCEPTION, msg); + } else { + setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg); } - }; - CodedInputStream cis = CodedInputStream.newInstance(is); - int headerSize = cis.readRawVarint32(); - Message.Builder builder = RequestHeader.newBuilder(); - ProtobufUtil.mergeFrom(builder, cis, headerSize); - RequestHeader header = (RequestHeader) builder.build(); - - // Notify the client about the offending request - Call reqTooBig = new Call(header.getCallId(), this.service, null, null, null, - null, this, responder, 0, null, this.addr,0); - metrics.exception(REQUEST_TOO_BIG_EXCEPTION); - // Make sure the client recognizes the underlying exception - // Otherwise, throw a DoNotRetryIOException. - if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(), - RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) { - setupResponse(null, reqTooBig, REQUEST_TOO_BIG_EXCEPTION, msg); - } else { - setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg); + // We are going to close the connection, make sure we process the response + // before that. In rare case when this fails, we still close the connection. + responseWriteLock.lock(); + responder.processResponse(reqTooBig); + responseWriteLock.unlock(); } - // We are going to close the connection, make sure we process the response - // before that. In rare case when this fails, we still close the connection. - responseWriteLock.lock(); - responder.processResponse(reqTooBig); - responseWriteLock.unlock(); + // Close the connection + return -1; } - // Close the connection - return -1; - } - data = ByteBuffer.allocate(dataLength); + data = ByteBuffer.allocate(dataLength); - // Increment the rpc count. This counter will be decreased when we write - // the response. If we want the connection to be detected as idle properly, we - // need to keep the inc / dec correct. - incRpcCount(); - } + // Increment the rpc count. This counter will be decreased when we write + // the response. If we want the connection to be detected as idle properly, we + // need to keep the inc / dec correct. + incRpcCount(); + } - count = channelRead(channel, data); + count = channelRead(channel, data); - if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0 - process(); + if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0 + process(); + } } return count; -- 2.16.1