From 4d2da0d13b8438cb0dcb580cd7aa7966bef3c9b6 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Fri, 21 Jun 2019 16:29:08 -0400 Subject: [PATCH] =?UTF-8?q?HBASE-22492=20Wrap=20RPC=20responses=20with=20S?= =?UTF-8?q?ASL=20after=20queueing=20for=20response=20(S=C3=A9bastien=20Bar?= =?UTF-8?q?noud)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Amending-Author: Josh Elser --- .../apache/hadoop/hbase/ipc/RpcServer.java | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index a32040c295..e9daf16bff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -342,6 +342,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private User user; private InetAddress remoteAddress; + private boolean saslWrapDone; private long responseCellSize = 0; private long responseBlockSize = 0; @@ -369,6 +370,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.tinfo = tinfo; this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH this.remoteAddress = remoteAddress; + this.saslWrapDone = false; this.retryImmediatelySupported = connection == null? null: connection.retryImmediatelySupported; this.timeout = timeout; @@ -478,10 +480,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { byte[] b = createHeaderAndMessageBytes(result, header); bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock); - - if (connection.useWrap) { - bc = wrapWithSasl(bc); - } } catch (IOException e) { LOG.warn("Exception while creating response " + e); } @@ -526,6 +524,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return b; } + private synchronized void wrapWithSasl() throws IOException { + // do it only once per call + if (saslWrapDone) { + return; + } + response = wrapWithSasl(response); + saslWrapDone = true; + } + + /** + * Do not call directly, invoke via {@link #wrapWithSasl()}. + */ private BufferChain wrapWithSasl(BufferChain bc) throws IOException { if (!this.connection.useSasl) return bc; @@ -533,11 +543,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // THIS IS A BIG UGLY COPY. byte [] responseBytes = bc.getBytes(); byte [] token; - // synchronization may be needed since there can be multiple Handler - // threads using saslServer to wrap responses. - synchronized (connection.saslServer) { - token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); - } + + // Previously, synchronization was needed since there could be multiple Handler + // threads using saslServer to wrap responses. However, now we wrap the response + // inside of the Responder thread to avoid sending back mis-ordered SASL messages. + token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); if (LOG.isTraceEnabled()) { LOG.trace("Adding saslServer wrapped token of size " + token.length + " as call response."); @@ -1188,6 +1198,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private boolean processResponse(final Call call) throws IOException { boolean error = true; try { + // Wrap the message "late" in SASL to ensure that the sequence number matches the order of responses we write out. + if (call.connection.useWrap) { + call.wrapWithSasl(); + } // Send as much data as we can in the non-blocking fashion long numBytes = channelWrite(call.connection.channel, call.response); if (numBytes < 0) { @@ -1220,6 +1234,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { */ private boolean processAllResponses(final Connection connection) throws IOException { // We want only one writer on the channel for a connection at a time. + boolean isEmpty = false; connection.responseWriteLock.lock(); try { for (int i = 0; i < 20; i++) { @@ -1233,11 +1248,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return false; } } + // Check that state within the lock to be consistent + isEmpty = connection.responseQueue.isEmpty(); } finally { connection.responseWriteLock.unlock(); } - return connection.responseQueue.isEmpty(); + return isEmpty; } // -- 2.18.0