Index: hbase/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (date 1558880860000) +++ hbase/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (date 1559143313000) @@ -342,6 +342,7 @@ private User user; private InetAddress remoteAddress; + private boolean saslWrapDone; private long responseCellSize = 0; private long responseBlockSize = 0; @@ -369,6 +370,7 @@ this.tinfo = tinfo; this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH this.remoteAddress = remoteAddress; + this.saslWrapDone = false; this.retryImmediatelySupported = connection == null? null: connection.retryImmediatelySupported; this.timeout = timeout; @@ -479,9 +481,11 @@ bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock); + /* to make sure that messages are sent in the same order than Sasl sequence number, we must wrap the message when we put it in the response queue if (connection.useWrap) { bc = wrapWithSasl(bc); } + */ } catch (IOException e) { LOG.warn("Exception while creating response " + e); } @@ -526,6 +530,13 @@ return b; } + public synchronized void wrapWithSasl() throws IOException { + // do it only once per call + if (saslWrapDone == true) return; + response = wrapWithSasl(response); + saslWrapDone = true; + } + private BufferChain wrapWithSasl(BufferChain bc) throws IOException { if (!this.connection.useSasl) return bc; @@ -533,11 +544,14 @@ // THIS IS A BIG UGLY COPY. byte [] responseBytes = bc.getBytes(); byte [] token; + + // No lock on saslServer needed anymore:we are called only by the responder thread + // synchronization may be needed since there can be multiple Handler // threads using saslServer to wrap responses. - synchronized (connection.saslServer) { + //synchronized (connection.saslServer) { token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); - } + //} if (LOG.isTraceEnabled()) { LOG.trace("Adding saslServer wrapped token of size " + token.length + " as call response."); @@ -1188,6 +1202,9 @@ private boolean processResponse(final Call call) throws IOException { boolean error = true; try { + if (call.connection.useWrap) { + call.wrapWithSasl(); + } // Send as much data as we can in the non-blocking fashion long numBytes = channelWrite(call.connection.channel, call.response); if (numBytes < 0) { @@ -1220,6 +1237,7 @@ */ private boolean processAllResponses(final Connection connection) throws IOException { // We want only one writer on the channel for a connection at a time. + boolean isEmpty = false; connection.responseWriteLock.lock(); try { for (int i = 0; i < 20; i++) { @@ -1233,11 +1251,13 @@ return false; } } + // Check that state within the lock to be consistent + isEmpty = connection.responseQueue.isEmpty(); } finally { connection.responseWriteLock.unlock(); } - return connection.responseQueue.isEmpty(); + return isEmpty; } // @@ -1248,8 +1268,9 @@ // If there is already a write in progress, we don't wait. This allows to free the handlers // immediately for other tasks. - if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) { - try { + // If we use Sasl, always insert in the queue to ensure Sasl sequence number respect the network send order + // TODO: Another solution could be to synchronize (processResponse,addFirst(call)) with addLast(call) to keep the sequence order, if this optimisation is needed + if (!call.connection.useWrap && call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) { try { if (call.connection.responseQueue.isEmpty()) { // If we're alone, we can try to do a direct call to the socket. It's // an optimisation to save on context switches and data transfer between cores..