Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java (revision a9f555aaff40051bb8d27d5ff681abca0f6b7ec5) +++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java (revision 4eb4c8d91b64bf8bd3c9acfb4b26c457f9b83f65) @@ -200,7 +200,10 @@ private static final TcpDiscoveryAbstractMessage WAKEUP = new TcpDiscoveryDummyWakeupMessage(); /** When this interval pass connection check will be performed. */ - private static final int CON_CHECK_INTERVAL = 500; + public static int CON_CHECK_INTERVAL = 500; + + /** Disable for test only. */ + public static boolean ENABLED_CONN_CHECK_THRESHOLD = true; /** */ private IgniteThreadPoolExecutor utilityPool; @@ -218,6 +221,12 @@ /** TCP server for discovery SPI. */ private TcpServer tcpSrvr; + /** */ + private volatile long simulatedDelay; + + /** */ + private volatile long prevNodeConnectionCheckDelay; + /** Message worker. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private RingMessageWorker msgWorker; @@ -456,11 +465,11 @@ * @throws IgniteSpiException If failed. */ private void spiStop0(boolean disconnect) throws IgniteSpiException { - if (log.isDebugEnabled()) { + if (debugMode || log.isDebugEnabled()) { if (disconnect) - log.debug("Disconnecting SPI."); + log.error("Disconnecting SPI."); else - log.debug("Preparing to start local node stop procedure."); + log.error("Preparing to start local node stop procedure."); } if (disconnect) { @@ -492,8 +501,8 @@ } if (spiState == LEFT) { - if (log.isDebugEnabled()) - log.debug("Verification for local node leave has been received from coordinator" + + if (debugMode || log.isDebugEnabled()) + log.error("Verification for local node leave has been received from coordinator" + " (continuing stop procedure)."); } else if (log.isInfoEnabled()) { @@ -748,8 +757,8 @@ return res; } catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e); // continue; @@ -773,6 +782,9 @@ @Nullable UUID clientNodeId) throws IgniteCheckedException { assert addr != null; + if(debugMode) + log.error("Ping node: " + nodeId + ", addr: " + addr); + UUID locNodeId = getLocalNodeId(); IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, @@ -828,19 +840,30 @@ fut.sock = sock; + if (debugMode) + log.error("sock = spi.openSocket(sock, addr, timeoutHelper)"); + sock = spi.openSocket(sock, addr, timeoutHelper); openedSock = true; + if (debugMode) + log.error("spi.writeToSocket(TcpDiscoveryPingRequest), timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())" + + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId), timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + if (debugMode) + log.error("spi.readMessage, read response, timeout: timeoutHelper.nextTimeoutChunk(spi.getAckTimeout()): " + + timeoutHelper.nextTimeoutChunk(spi.getAckTimeout())); + TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( spi.getAckTimeout())); if (locNodeId.equals(res.creatorNodeId())) { - if (log.isDebugEnabled()) - log.debug("Ping response from local node: " + res); + if (debugMode || log.isDebugEnabled()) + log.error("Ping response from local node: " + res); break; } @@ -855,8 +878,8 @@ } catch (IOException | IgniteCheckedException e) { if (nodeId != null && !nodeAlive(nodeId)) { - if (log.isDebugEnabled()) - log.debug("Failed to ping the node (has left or leaving topology): [nodeId=" + nodeId + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to ping the node (has left or leaving topology): [nodeId=" + nodeId + ']'); fut.onDone((IgniteBiTuple)null); @@ -880,8 +903,8 @@ break; if (spi.isNodeStopping0()) { - if (log.isDebugEnabled()) - log.debug("Stop pinging node, because node is stopping: [rmtNodeId=" + nodeId + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Stop pinging node, because node is stopping: [rmtNodeId=" + nodeId + ']'); break; } @@ -1030,8 +1053,8 @@ while (true) { if (!sendJoinRequestMessage(discoveryData)) { - if (log.isDebugEnabled()) - log.debug("Join request message has not been sent (local node is the first in the topology)."); + if (debugMode || log.isDebugEnabled()) + log.error("Join request message has not been sent (local node is the first in the topology)."); if (!auth && spi.nodeAuth != null) localAuthentication(locCred); @@ -1085,8 +1108,8 @@ break; } - if (log.isDebugEnabled()) - log.debug("Join request message has been sent (waiting for coordinator response)."); + if (debugMode || log.isDebugEnabled()) + log.error("Join request message has been sent (waiting for coordinator response)."); synchronized (mux) { long timeout = spi.netTimeout; @@ -1150,8 +1173,8 @@ assert locNode.order() != 0; assert locNode.internalOrder() != 0; - if (log.isDebugEnabled()) - log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder()); + if (debugMode || log.isDebugEnabled()) + log.error("Discovery SPI has been connected to topology with order: " + locNode.internalOrder()); } /** @@ -1240,8 +1263,8 @@ break; case RES_OK: - if (log.isDebugEnabled()) - log.debug("Join request message has been sent to address [addr=" + addr + + if (debugMode || log.isDebugEnabled()) + log.error("Join request message has been sent to address [addr=" + addr + ", req=" + joinReq + ']'); // Join request sending succeeded, wait for response from topology. @@ -1259,8 +1282,8 @@ retry = true; } else { - if (log.isDebugEnabled()) - log.debug("Unexpected response to join request: " + res); + if (debugMode || log.isDebugEnabled()) + log.error("Unexpected response to join request: " + res); retry = true; } @@ -1271,11 +1294,11 @@ catch (IgniteSpiException e) { errs.add(e); - if (log.isDebugEnabled()) { + if (debugMode || log.isDebugEnabled()) { IOException ioe = X.cause(e, IOException.class); - log.debug("Failed to send join request message [addr=" + addr + - ", msg=" + (ioe != null ? ioe.getMessage() : e.getMessage()) + ']'); + log.error("Failed to send join request message [addr=" + addr + + ", msg=" + (ioe != null ? ioe.getMessage() : e.getMessage()) + ']'); onException("Failed to send join request message [addr=" + addr + ", msg=" + (ioe != null ? ioe.getMessage() : e.getMessage()) + ']', ioe); @@ -1293,8 +1316,8 @@ } if (retry) { - if (log.isDebugEnabled()) - log.debug("Concurrent discovery SPI start has been detected (local node should wait)."); + if (debugMode || log.isDebugEnabled()) + log.error("Concurrent discovery SPI start has been detected (local node should wait)."); try { U.sleep(spi.getReconnectDelay()); @@ -1359,6 +1382,9 @@ assert msg != null; assert addr != null; + if(debugMode) + log.error("SendMessageDirectly. Message: " + msg + ", addr: " + addr); + Collection errs = null; long ackTimeout0 = spi.getAckTimeout(); @@ -1385,15 +1411,25 @@ try { long tsNanos = System.nanoTime(); + if (debugMode) + log.error("spi.openSocket(addr, timeoutHelper)"); + sock = spi.openSocket(addr, timeoutHelper); openSock = true; TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId); + if (debugMode) + log.error("spi.writeToSocket(TcpDiscoveryHandshakeRequest, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())), timeout: " + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + // Handshake. spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + if (debugMode) + log.error("TcpDiscoveryHandshakeResponse = spi.readMessage(timeoutHelper.nextTimeoutChunk(ackTimeout0), timeout: " + + timeoutHelper.nextTimeoutChunk(ackTimeout0)); + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( ackTimeout0)); @@ -1408,8 +1444,8 @@ synchronized (mux) { for (TcpDiscoveryNode failedNode : failedNodes.keySet()) { if (failedNode.id().equals(res.creatorNodeId())) { - if (log.isDebugEnabled()) - log.debug("Ignore response from node from failed list: " + res); + if (debugMode || log.isDebugEnabled()) + log.error("Ignore response from node from failed list: " + res); ignore = true; @@ -1423,8 +1459,8 @@ } if (locNodeId.equals(res.creatorNodeId())) { - if (log.isDebugEnabled()) - log.debug("Handshake response from local node: " + res); + if (debugMode || log.isDebugEnabled()) + log.error("Handshake response from local node: " + res); break; } @@ -1434,6 +1470,10 @@ // Send message. tsNanos = System.nanoTime(); + if (debugMode) + log.error("spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())), timeout: " + + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); long tsNanos0 = System.nanoTime(); @@ -1442,8 +1482,8 @@ debugLog(msg, "Message has been sent directly to address [msg=" + msg + ", addr=" + addr + ", rmtNodeId=" + res.creatorNodeId() + ']'); - if (log.isDebugEnabled()) - log.debug("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + + if (debugMode || log.isDebugEnabled()) + log.error("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + ", rmtNodeId=" + res.creatorNodeId() + ']'); // Connection has been established, but @@ -1451,8 +1491,14 @@ // E.g. due to class not found issue. joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; + if (debugMode) + log.error("int receipt = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), timeout: " + + timeoutHelper.nextTimeoutChunk(ackTimeout0)); + int receipt = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + spi.onMessageSent(msg); + spi.stats.onMessageSent(msg, U.nanosToMillis(tsNanos0 - tsNanos)); return receipt; @@ -1460,7 +1506,7 @@ catch (ClassCastException e) { // This issue is rarely reproducible on AmazonEC2, but never // on dedicated machines. - if (log.isDebugEnabled()) + if (debugMode || log.isDebugEnabled()) U.error(log, "Class cast exception on direct send: " + addr, e); onException("Class cast exception on direct send: " + addr, e); @@ -1471,7 +1517,7 @@ errs.add(e); } catch (IOException | IgniteCheckedException e) { - if (log.isDebugEnabled()) + if (debugMode || log.isDebugEnabled()) log.error("Exception on direct send: " + e.getMessage(), e); onException("Exception on direct send: " + e.getMessage(), e); @@ -1497,8 +1543,8 @@ continue; } - if (log.isDebugEnabled()) - log.debug("Connect failed with StreamCorruptedException, skip address: " + addr); + if (debugMode || log.isDebugEnabled()) + log.error("Connect failed with StreamCorruptedException, skip address: " + addr); break; } @@ -1534,8 +1580,8 @@ } if (joinReqSent) { - if (log.isDebugEnabled()) - log.debug("Join request has been sent, but receipt has not been read (returning RES_WAIT)."); + if (debugMode || log.isDebugEnabled()) + log.error("Join request has been sent, but receipt has not been read (returning RES_WAIT)."); // Topology will not include this node, // however, warning on timed out join will be output. @@ -1608,8 +1654,8 @@ DebugLogger log = type == EVT_NODE_METRICS_UPDATED ? traceLog : debugLog; if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) { - if (log.isDebugEnabled()) - log.debug("Discovery notification [node=" + node + ", spiState=" + spiState + + if (debugMode || log.isDebugEnabled()) + log.error("Discovery notification [node=" + node + ", spiState=" + spiState + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); Collection top = upcast(ring.visibleNodes()); @@ -1619,8 +1665,8 @@ lsnr.onDiscovery(type, topVer, node, top, hist, null); } else { - if (log.isDebugEnabled()) - log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState + + if (debugMode || log.isDebugEnabled()) + log.error("Skipped discovery notification [node=" + node + ", spiState=" + spiState + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); } } @@ -1642,8 +1688,8 @@ while (topHist.size() > spi.topHistSize) topHist.remove(topHist.firstKey()); - if (log.isDebugEnabled()) - log.debug("Added topology snapshot to history, topVer=" + topVer + ", historySize=" + topHist.size()); + if (debugMode || log.isDebugEnabled()) + log.error("Added topology snapshot to history, topVer=" + topVer + ", historySize=" + topHist.size()); return new TreeMap<>(topHist); } @@ -1821,6 +1867,39 @@ msgWorker.addMessage(msg); } + /** */ + @Override public void simulateDelay(long delay){ + if (debugMode) + log.error("Started delay simulating: " + delay); + + this.simulatedDelay = delay; + } + + /** */ + @Override void simulateCheckPrevNodeConnectionFailed(long delay) { + if (debugMode) + log.error("Started delay simulating prev. node connection check delay: " + delay); + + this.prevNodeConnectionCheckDelay = delay; + } + + /** */ + private void simulateDelay(){ + long simulatedDelay = this.simulatedDelay; + + if (simulatedDelay > 0) { + if (debugMode) + log.error("Simulating delay: " + simulatedDelay); + + try { + Thread.sleep(simulatedDelay); + } + catch (InterruptedException e) { + // No-op. + } + } + } + /** {@inheritDoc} */ @Override void simulateNodeFailure() { U.warn(log, "Simulating node failure: " + getLocalNodeId()); @@ -2108,15 +2187,15 @@ /** {@inheritDoc} */ @SuppressWarnings("BusyWait") @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("IP finder cleaner has been started."); + if (debugMode || log.isDebugEnabled()) + log.error("IP finder cleaner has been started."); while (!isInterrupted()) { Thread.sleep(spi.ipFinderCleanFreq); if (spiStateCopy() != CONNECTED) { - if (log.isDebugEnabled()) - log.debug("Stopping IP finder cleaner (SPI is not connected to topology)."); + if (debugMode || log.isDebugEnabled()) + log.error("Stopping IP finder cleaner (SPI is not connected to topology)."); return; } @@ -2163,8 +2242,8 @@ res = pingNode(addr, null, null) != null; } catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to ping node [addr=" + addr + ", err=" + e.getMessage() + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Failed to ping node [addr=" + addr + ", err=" + e.getMessage() + ']'); res = false; } @@ -2194,8 +2273,8 @@ if (rmvAddrs != null) { spi.ipFinder.unregisterAddresses(rmvAddrs); - if (log.isDebugEnabled()) - log.debug("Unregistered addresses from IP finder: " + rmvAddrs); + if (debugMode || log.isDebugEnabled()) + log.error("Unregistered addresses from IP finder: " + rmvAddrs); } // Addresses that were removed by mistake (e.g. on segmentation). @@ -2208,8 +2287,8 @@ if (!missingAddrs.isEmpty()) { spi.ipFinder.registerAddresses(missingAddrs); - if (log.isDebugEnabled()) - log.debug("Registered missing addresses in IP finder: " + missingAddrs); + if (debugMode || log.isDebugEnabled()) + log.error("Registered missing addresses in IP finder: " + missingAddrs); } } catch (IgniteSpiException e) { @@ -2231,8 +2310,8 @@ msg.addFailedNode(n.id()); } - if (log.isDebugEnabled()) - log.debug("Message failed nodes were replaced with failed nodes observed by local node: " + msg.failedNodes()); + if (debugMode || log.isDebugEnabled()) + log.error("Message failed nodes were replaced with failed nodes observed by local node: " + msg.failedNodes()); } /** @@ -2248,8 +2327,8 @@ if (sndId != null) { if (ring.node(sndId) == null) { - if (log.isDebugEnabled()) { - log.debug("Ignore message failed nodes, sender node is not alive [nodeId=" + sndId + + if (debugMode || log.isDebugEnabled()) { + log.error("Ignore message failed nodes, sender node is not alive [nodeId=" + sndId + ", failedNodes=" + msgFailedNodes + ']'); } @@ -2261,8 +2340,8 @@ synchronized (mux) { for (TcpDiscoveryNode failedNode : failedNodes.keySet()) { if (failedNode.id().equals(sndId)) { - if (log.isDebugEnabled()) { - log.debug("Ignore message failed nodes, sender node is in fail list [nodeId=" + sndId + + if (debugMode || log.isDebugEnabled()) { + log.error("Ignore message failed nodes, sender node is in fail list [nodeId=" + sndId + ", failedNodes=" + msgFailedNodes + ']'); } @@ -2289,8 +2368,8 @@ } } - if (added && log.isDebugEnabled()) - log.debug("Added node to failed nodes list [node=" + failedNode + ", msg=" + msg + ']'); + if (added && (debugMode || log.isDebugEnabled())) + log.error("Added node to failed nodes list [node=" + failedNode + ", msg=" + msg + ']'); } } } @@ -2441,11 +2520,11 @@ } } - if (log.isDebugEnabled()) { + if (debugMode || log.isDebugEnabled()) { if (res == null) - log.debug("Failed to find node added message [node=" + node + ']'); + log.error("Failed to find node added message [node=" + node + ']'); else - log.debug("Found add added message [node=" + node + ", hist=" + res + ']'); + log.error("Found add added message [node=" + node + ", hist=" + res + ']'); } return res; @@ -2475,11 +2554,11 @@ cp = !skip ? cp : null; - if (log.isDebugEnabled()) { + if (debugMode || log.isDebugEnabled()) { if (cp == null) - log.debug("Failed to find messages history [node=" + node + ", lastMsgId=" + lastMsgId + ']'); + log.error("Failed to find messages history [node=" + node + ", lastMsgId=" + lastMsgId + ']'); else - log.debug("Found messages history [node=" + node + ", hist=" + cp + ']'); + log.error("Found messages history [node=" + node + ", hist=" + cp + ']'); } return cp; @@ -2897,8 +2976,8 @@ msg instanceof TcpDiscoveryCustomEventMessage || msg instanceof TcpDiscoveryClientReconnectMessage) && queue.contains(msg)) { - if (log.isDebugEnabled()) - log.debug("Ignoring duplicate message: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Ignoring duplicate message: " + msg); return; } @@ -2909,8 +2988,8 @@ if (metricsMsgFilter.addMessage((TcpDiscoveryMetricsUpdateMessage)msg)) addToQueue(msg, addFirst); else { - if (log.isDebugEnabled()) - log.debug("Metric update message has been replaced in the worker's queue: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Metric update message has been replaced in the worker's queue: " + msg); } } else @@ -2927,14 +3006,14 @@ if (addFirst) { queue.addFirst(msg); - if (log.isDebugEnabled()) - log.debug("Message has been added to a head of a worker's queue: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Message has been added to a head of a worker's queue: " + msg); } else { queue.add(msg); - if (log.isDebugEnabled()) - log.debug("Message has been added to a worker's queue: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Message has been added to a worker's queue: " + msg); } } @@ -3030,14 +3109,17 @@ if (msg == WAKEUP) return; + if(debugMode) + log.error("Process message: " + msg + " on node " + spi.locNode.order()); + spi.startMessageProcess(msg); sendMetricsUpdateMessage(); synchronized (mux) { if (spiState == RING_FAILED) { - if (log.isDebugEnabled()) - log.debug("Discovery detected ring connectivity issues and will stop local node, " + + if (debugMode || log.isDebugEnabled()) + log.error("Discovery detected ring connectivity issues and will stop local node, " + "ignoring message [msg=" + msg + ", locNode=" + locNode + ']'); return; @@ -3046,8 +3128,8 @@ DebugLogger log = messageLogger(msg); - if (log.isDebugEnabled()) - log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); if (debugMode) debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); @@ -3064,8 +3146,8 @@ proc = ((TcpDiscoveryNodeAddedMessage)msg).node().equals(locNode); if (!proc) { - if (log.isDebugEnabled()) { - log.debug("Ignore message, local node order is not initialized [msg=" + msg + + if (debugMode || log.isDebugEnabled()) { + log.error("Ignore message, local node order is not initialized [msg=" + msg + ", locNode=" + locNode + ']'); } @@ -3230,6 +3312,7 @@ */ @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"}) private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) { + //if(msg instanceof TcpDiscoveryNodeFailedMessage) assert msg != null; assert ring.hasRemoteNodes(); @@ -3264,8 +3347,8 @@ TcpDiscoveryNode newNext = ring.nextNode(failedNodes); if (newNext == null) { - if (log.isDebugEnabled()) - log.debug("No next node in topology."); + if (debugMode || log.isDebugEnabled()) + log.error("No next node in topology."); if (debugMode) debugLog(msg, "No next node in topology."); @@ -3281,8 +3364,8 @@ } if (!newNext.equals(next)) { - if (log.isDebugEnabled()) - log.debug("New next node [newNext=" + newNext + ", formerNext=" + next + + if (debugMode || log.isDebugEnabled()) + log.error("New next node [newNext=" + newNext + ", formerNext=" + next + ", ring=" + ring + ", failedNodes=" + failedNodes + ']'); else if (log.isInfoEnabled()) log.info("New next node [newNext=" + newNext + ']'); @@ -3311,8 +3394,8 @@ long ackTimeout0 = spi.getAckTimeout(); if (locNodeAddrs.contains(addr)){ - if (log.isDebugEnabled()) - log.debug("Skip to send message to the local node (probably remote node has the same " + + if (debugMode || log.isDebugEnabled()) + log.error("Skip to send message to the local node (probably remote node has the same " + "loopback address that local node): " + addr); continue; @@ -3323,6 +3406,8 @@ IgniteSpiOperationTimeoutHelper timeoutHelper = null; while (true) { + simulateDelay(); + if (sock == null) { if (timeoutHelper == null) timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); @@ -3335,6 +3420,11 @@ try { long tsNanos = System.nanoTime(); + if(log.isDebugEnabled()) + log.error("Opening new socket to " + next.order() + " on " + + spi.locNode.order() + " / " + System.identityHashCode(spi) + + ", sending " + TcpDiscoveryHandshakeRequest.class.getSimpleName()); + sock = spi.openSocket(addr, timeoutHelper); out = spi.socketStream(sock); @@ -3350,38 +3440,67 @@ if (changeTop) hndMsg.changeTopology(ring.previousNodeOf(next).id()); - if (log.isDebugEnabled()) - log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState + ']'); + + if(debugMode) + log.error("spi.writeToSocket(TcpDiscoveryHandshakeRequest, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()), timeout: " + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.writeToSocket(sock, out, hndMsg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + if(debugMode) + log.error("spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(ackTimeout0)), timeout: " + + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(ackTimeout0)); - if (log.isDebugEnabled()) - log.debug("Handshake response: " + res); - + if(debugMode) + log.error("Handshake response: " + res + " on " + spi.locNode.order() + " from " + next.order()); + // We should take previousNodeAlive flag into account only if we received the response from the correct node. if (res.creatorNodeId().equals(next.id()) && res.previousNodeAlive() && sndState != null) { + if(debugMode) + log.error("Delay since handshake response 1."); + // Remote node checked connection to it's previous and got success. boolean previousNode = sndState.markLastFailedNodeAlive(); + if(debugMode) + log.error("Delay since handshake response 2."); + if (previousNode) failedNodes.remove(failedNodes.size() - 1); else { newNextNode = false; + if(debugMode) + log.error("Delay since handshake response 2.1"); + next = ring.nextNode(failedNodes); + + if(debugMode) + log.error("Delay since handshake response 3."); } + + if(debugMode) log.error("Delay since handshake response 4."); U.closeQuiet(sock); + if(debugMode) log.error("Delay since handshake response 5."); + sock = null; if (sndState.isFailed()) { + if(debugMode) + log.error("Delay since handshake response 6."); + segmentLocalNodeOnSendFail(failedNodes); + if(debugMode) + log.error("Delay since handshake response 7."); + return; // Nothing to do here. } @@ -3393,8 +3512,8 @@ } if (locNodeId.equals(res.creatorNodeId())) { - if (log.isDebugEnabled()) - log.debug("Handshake response from local node: " + res); + if (debugMode || log.isDebugEnabled()) + log.error("Handshake response from local node: " + res); U.closeQuiet(sock); @@ -3411,8 +3530,8 @@ if (!next.id().equals(nextId)) { // Node with different ID has bounded to the same port. - if (log.isDebugEnabled()) - log.debug("Failed to restore ring because next node ID received is not as " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to restore ring because next node ID received is not as " + "expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']'); if (debugMode) @@ -3432,8 +3551,8 @@ nextNew = hasPendingAddMessage(nextId); if (!nextNew) { - if (log.isDebugEnabled()) - log.debug("Failed to restore ring because next node order received " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to restore ring because next node order received " + "is not as expected [expected=" + next.internalOrder() + ", rcvd=" + nextOrder + ", id=" + next.id() + ']'); @@ -3446,8 +3565,8 @@ } } - if (log.isDebugEnabled()) - log.debug("Initialized connection with next node: " + next.id()); + if (debugMode || log.isDebugEnabled()) + log.error("Initialized connection with next node: " + next.id()); if (debugMode) debugLog(msg, "Initialized connection with next node: " + next.id()); @@ -3460,12 +3579,14 @@ } } catch (IOException | IgniteCheckedException e) { + log.error("Failed message across ring: " + msg + " on " + spi.locNode.order() + ". Error: " + e.getMessage() + ", " + e.getClass().getSimpleName()); + if (errs == null) errs = new ArrayList<>(); errs.add(e); - if (log.isDebugEnabled()) + if (debugMode || log.isDebugEnabled()) U.error(log, "Failed to connect to next node [msg=" + msg + ", err=" + e.getMessage() + ']', e); @@ -3477,8 +3598,10 @@ if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount()) break; - if (timeoutHelper.checkFailureTimeoutReached(e)) + if (timeoutHelper.checkFailureTimeoutReached(e)) { + log.error("Send message across ring: " + msg.getClass().getSimpleName() + " from node " + spi.locNode.order() + ": checkFailureTimeoutReached"); break; + } else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) { ackTimeout0 *= 2; @@ -3491,8 +3614,8 @@ } finally { if (!success) { - if (log.isDebugEnabled()) - log.debug("Closing socket to next: " + next); + if (debugMode || log.isDebugEnabled()) + log.error("Closing socket to next: " + next); U.closeQuiet(sock); @@ -3516,8 +3639,8 @@ assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage; if (failure || forceSndPending || newNextNode) { - if (log.isDebugEnabled()) - log.debug("Pending messages will be sent [failure=" + failure + + if (debugMode || log.isDebugEnabled()) + log.error("Pending messages will be sent [failure=" + failure + ", newNextNode=" + newNextNode + ", forceSndPending=" + forceSndPending + ", failedNodes=" + failedNodes + ']'); @@ -3539,6 +3662,9 @@ if (timeoutHelper == null) timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); + log.error("spi.writeToSocket(pendingMsg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())), timeout: " + + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + try { spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk( spi.getSocketTimeout())); @@ -3549,12 +3675,18 @@ long tsNanos0 = System.nanoTime(); + if (debugMode) + log.error("int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), timeout: " + + timeoutHelper.nextTimeoutChunk(ackTimeout0)); + int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + spi.onMessageSent(pendingMsg); + spi.stats.onMessageSent(pendingMsg, U.nanosToMillis(tsNanos0 - tsNanos)); - if (log.isDebugEnabled()) - log.debug("Pending message has been sent to next node [msgId=" + msg.id() + + if (debugMode || log.isDebugEnabled()) + log.error("Pending message has been sent to next node [msgId=" + msg.id() + ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() + ", res=" + res + ']'); @@ -3588,6 +3720,12 @@ if (latencyCheck && log.isInfoEnabled()) log.info("Latency check message has been written to socket: " + msg.id()); + if (debugMode) { + log.error("Send message across ring: " + msg + " from node " + spi.locNode.order() + " to next " + (newNextNode ? newNext : next).order()); + log.error("spi.writeToSocket(newNextNode ? newNext : next, sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()). Actual timeout: " + + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + } + spi.writeToSocket(newNextNode ? newNext : next, sock, out, @@ -3596,8 +3734,16 @@ long tsNanos0 = System.nanoTime(); + if(debugMode) + log.error("int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), timeout: " + timeoutHelper.nextTimeoutChunk(ackTimeout0)); + int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + if (debugMode || (log.isDebugEnabled() && msg.getClass() != TcpDiscoveryConnectionCheckMessage.class)) + log.error("Got receipt: " + res + " on node " + spi.locNode.order() + " from next " + (newNextNode ? newNext : next).order() + " on message: " + msg); + + spi.onMessageSent(msg); + if (latencyCheck && log.isInfoEnabled()) log.info("Latency check message has been acked: " + msg.id()); @@ -3607,8 +3753,8 @@ DebugLogger debugLog = messageLogger(msg); - if (debugLog.isDebugEnabled()) { - debugLog.debug("Message has been sent to next node [msg=" + msg + + if (debugMode || debugLog.isDebugEnabled()) { + debugLog.error("Message has been sent to next node [msg=" + msg + ", next=" + next.id() + ", res=" + res + ']'); } @@ -3637,7 +3783,7 @@ errs.add(e); - if (log.isDebugEnabled()) + if (debugMode || log.isDebugEnabled()) U.error(log, "Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ", err=" + e + ']', e); @@ -3663,15 +3809,15 @@ forceSndPending = false; if (!sent) { - if (log.isDebugEnabled()) - log.debug("Closing socket to next (not sent): " + next); + if (debugMode || log.isDebugEnabled()) + log.error("Closing socket to next (not sent): " + next); U.closeQuiet(sock); sock = null; - if (log.isDebugEnabled()) - log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg + + if (debugMode || log.isDebugEnabled()) + log.error("Message has not been sent [next=" + next.id() + ", msg=" + msg + (!spi.failureDetectionTimeoutEnabled() ? ", i=" + reconCnt : "") + ']'); } } @@ -3679,6 +3825,8 @@ } // Iterating node's addresses. if (!sent) { + log.error("Message not sent: " + msg + ", failureDetectionTimeout: " + spi.failureDetectionTimeout()); + if (sndState == null && spi.getEffectiveConnectionRecoveryTimeout() > 0) sndState = new CrossRingMessageSendState(); @@ -3734,12 +3882,12 @@ if (!failedNodes.isEmpty()) { if (state == CONNECTED) { - if (!sent && log.isDebugEnabled()) + if (!sent && (debugMode || log.isDebugEnabled())) // Message has not been sent due to some problems. - log.debug("Message has not been sent: " + msg); + log.error("Message has not been sent: " + msg); - if (log.isDebugEnabled()) - log.debug("Detected failed nodes: " + failedNodes); + if (debugMode || log.isDebugEnabled()) + log.error("Detected failed nodes: " + failedNodes); } synchronized (mux) { @@ -3758,8 +3906,8 @@ if (!sent) { assert next == null : next; - if (log.isDebugEnabled()) - log.debug("Pending messages will be resent to local node"); + if (debugMode || log.isDebugEnabled()) + log.error("Pending messages will be resent to local node"); if (debugMode) debugLog(msg, "Pending messages will be resent to local node"); @@ -3793,8 +3941,8 @@ msgWorker.addMessage(pendingMsg); - if (log.isDebugEnabled()) - log.debug("Pending message has been sent to local node [msg=" + curMsg.id() + + if (debugMode || log.isDebugEnabled()) + log.error("Pending message has been sent to local node [msg=" + curMsg.id() + ", pendingMsg=" + pendingMsg + ']'); if (debugMode) { @@ -3875,8 +4023,8 @@ spi.stats.onPendingMessageRegistered(); - if (log.isDebugEnabled()) - log.debug("Pending message has been registered: " + msg.id()); + if (debugMode || log.isDebugEnabled()) + log.error("Pending message has been registered: " + msg.id()); } } @@ -3916,8 +4064,8 @@ final UUID locNodeId = getLocalNodeId(); if (locNodeId.equals(node.id())) { - if (log.isDebugEnabled()) - log.debug("Received join request for local node, dropping: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Received join request for local node, dropping: " + msg); return; } @@ -3943,16 +4091,16 @@ LT.warn(log, errMsg); // Always output in debug. - if (log.isDebugEnabled()) - log.debug(errMsg); + if (debugMode || log.isDebugEnabled()) + log.error(errMsg); try { trySendMessageDirectly(node, new TcpDiscoveryLoopbackProblemMessage( locNodeId, locNode.addresses(), locNode.hostNames())); } catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send loopback problem message to node " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to send loopback problem message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); onException("Failed to send loopback problem message to node " + @@ -3984,8 +4132,8 @@ trySendMessageDirectly(node, createTcpDiscoveryDuplicateIdMessage(locNodeId, existingNode)); } catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send duplicate ID message to node " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to send duplicate ID message to node " + "[node=" + node + ", existingNode=" + existingNode + ", err=" + e.getMessage() + ']'); @@ -4016,8 +4164,8 @@ reconMsg.success(true); } - if (log.isDebugEnabled()) - log.debug("Send reconnect message to already joined client " + + if (debugMode || log.isDebugEnabled()) + log.error("Send reconnect message to already joined client " + "[clientNode=" + existingNode + ", msg=" + reconMsg + ']'); if (getLocalNodeId().equals(node.clientRouterNodeId())) { @@ -4025,8 +4173,8 @@ if (wrk != null) wrk.addMessage(reconMsg); - else if (log.isDebugEnabled()) - log.debug("Failed to find client message worker " + + else if (debugMode || log.isDebugEnabled()) + log.error("Failed to find client message worker " + "[clientNode=" + existingNode + ", msg=" + reconMsg + ']'); } else { @@ -4034,8 +4182,8 @@ sendMessageAcrossRing(reconMsg); } } - else if (log.isDebugEnabled()) - log.debug("Ignoring join request message since node is already in topology: " + msg); + else if (debugMode || log.isDebugEnabled()) + log.error("Ignoring join request message since node is already in topology: " + msg); return; } @@ -4046,8 +4194,8 @@ trySendMessageDirectly(node, createTcpDiscoveryDuplicateIdMessage(locNodeId, node)); } catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send duplicate ID message to node " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to send duplicate ID message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); @@ -4072,8 +4220,8 @@ ", addrs=" + U.addressesAsString(node) + ']'); // Always output in debug. - if (log.isDebugEnabled()) - log.debug("Authentication failed [nodeId=" + node.id() + ", addrs=" + + if (debugMode || log.isDebugEnabled()) + log.error("Authentication failed [nodeId=" + node.id() + ", addrs=" + U.addressesAsString(node)); try { @@ -4083,8 +4231,8 @@ ); } catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send unauthenticated message to node " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to send unauthenticated message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); onException("Failed to send unauthenticated message to node " + @@ -4110,8 +4258,8 @@ if (authFailedMsg != null) { // Always output in debug. - if (log.isDebugEnabled()) - log.debug(authFailedMsg + " [nodeId=" + node.id() + + if (debugMode || log.isDebugEnabled()) + log.error(authFailedMsg + " [nodeId=" + node.id() + ", addrs=" + U.addressesAsString(node)); try { @@ -4121,8 +4269,8 @@ ); } catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send unauthenticated message to node " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to send unauthenticated message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); } @@ -4142,8 +4290,8 @@ LT.error(log, e, "Authentication failed [nodeId=" + node.id() + ", addrs=" + U.addressesAsString(node) + ']'); - if (log.isDebugEnabled()) - log.debug("Failed to authenticate node (will ignore join request) [node=" + node + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to authenticate node (will ignore join request) [node=" + node + ", err=" + e + ']'); onException("Failed to authenticate node (will ignore join request) [node=" + node + @@ -4177,8 +4325,8 @@ if (err != null) { final IgniteNodeValidationResult err0 = err; - if (log.isDebugEnabled()) - log.debug("Node validation failed [res=" + err + ", node=" + node + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Node validation failed [res=" + err + ", node=" + node + ']'); utilityPool.execute( new Runnable() { @@ -4186,8 +4334,8 @@ boolean ping = node.id().equals(err0.nodeId()) ? pingNode(node) : pingNode(err0.nodeId()); if (!ping) { - if (log.isDebugEnabled()) - log.debug("Conflicting node has already left, need to wait for event. " + + if (debugMode || log.isDebugEnabled()) + log.error("Conflicting node has already left, need to wait for event. " + "Will ignore join request for now since it will be recent [req=" + msg + ", err=" + err0.message() + ']'); @@ -4198,16 +4346,16 @@ LT.warn(log, err0.message()); // Always output in debug. - if (log.isDebugEnabled()) - log.debug(err0.message()); + if (debugMode || log.isDebugEnabled()) + log.error(err0.message()); try { trySendMessageDirectly(node, new TcpDiscoveryCheckFailedMessage(err0.nodeId(), err0.sendMessage())); } catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send hash ID resolver validation failed message to node " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to send hash ID resolver validation failed message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); onException("Failed to send hash ID resolver validation failed message to node " + @@ -4239,8 +4387,8 @@ LT.warn(log, errMsg); // Always output in debug. - if (log.isDebugEnabled()) - log.debug(errMsg); + if (debugMode || log.isDebugEnabled()) + log.error(errMsg); try { String sndMsg = "Local node's marshaller differs from remote node's marshaller " + @@ -4255,8 +4403,8 @@ new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg)); } catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send marshaller check failed message to node " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to send marshaller check failed message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); onException("Failed to send marshaller check failed message to node " + @@ -4468,8 +4616,8 @@ // Handle join. node.internalOrder(ring.nextNodeOrder()); - if (log.isDebugEnabled()) - log.debug("Internal order has been assigned to node: " + node); + if (debugMode || log.isDebugEnabled()) + log.error("Internal order has been assigned to node: " + node); DiscoveryDataPacket data = msg.gridDiscoveryData(); @@ -4507,15 +4655,15 @@ LT.warn(log, errMsg); // Always output in debug. - if (log.isDebugEnabled()) - log.debug(errMsg); + if (debugMode || log.isDebugEnabled()) + log.error(errMsg); try { trySendMessageDirectly(node, new TcpDiscoveryCheckFailedMessage(locNode.id(), sndMsg)); } catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send marshaller check failed message to node " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to send marshaller check failed message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); onException("Failed to send marshaller check failed message to node " + @@ -4566,6 +4714,8 @@ ) { TcpDiscoveryNode node = ring.node(nodeId); + log.error("Direct message: " + msg + " to node " + nodeId.toString()); + if (node == null) { if (!F.isEmpty(addrs)) trySendMessageDirectlyToAddrs(addrs, null, msg); @@ -4654,8 +4804,8 @@ } else { // TODO IGNITE-11272 - if (log.isDebugEnabled()) - log.debug("Received node added message with node order smaller than local node order " + + if (debugMode || log.isDebugEnabled()) + log.error("Received node added message with node order smaller than local node order " + "(will appy) [node=" + node + ", ring=" + ring + ", msg=" + msg + ']'); } } @@ -4690,8 +4840,8 @@ if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); - if (log.isDebugEnabled()) { - log.debug("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " + + if (debugMode || log.isDebugEnabled()) { + log.error("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " + "coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode=" + locNode + ", msg=" + msg + ']'); } @@ -4714,8 +4864,8 @@ } if (node.internalOrder() <= ring.maxInternalOrder()) { - if (log.isDebugEnabled()) - log.debug("Discarding node added message since new node's order is less than " + + if (debugMode || log.isDebugEnabled()) + log.error("Discarding node added message since new node's order is less than " + "max order in ring [ring=" + ring + ", node=" + node + ", locNode=" + locNode + ", msg=" + msg + ']'); @@ -4738,8 +4888,8 @@ SecurityCredentials cred = unmarshalCredentials(node); if (cred == null) { - if (log.isDebugEnabled()) - log.debug( + if (debugMode || log.isDebugEnabled()) + log.error( "Skipping global authentication for node (security credentials not found, " + "probably, due to coordinator has older version) " + "[nodeId=" + node.id() + @@ -4761,8 +4911,8 @@ ", addrs=" + U.addressesAsString(node) + ']'); // Always output in debug. - if (log.isDebugEnabled()) - log.debug("Authentication failed [nodeId=" + node.id() + ", addrs=" + + if (debugMode || log.isDebugEnabled()) + log.error("Authentication failed [nodeId=" + node.id() + ", addrs=" + U.addressesAsString(node)); } else @@ -4782,8 +4932,8 @@ ); } catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send unauthenticated message to node " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to send unauthenticated message to node " + "[node=" + node + ", err=" + e.getMessage() + ']'); onException("Failed to send unauthenticated message to node " + @@ -4827,8 +4977,8 @@ processMessageFailedNodes(msg); } - if (log.isDebugEnabled()) - log.debug("Added node to local ring [added=" + topChanged + ", node=" + node + + if (debugMode || log.isDebugEnabled()) + log.error("Added node to local ring [added=" + topChanged + ", node=" + node + ", ring=" + ring + ']'); } @@ -4910,8 +5060,8 @@ // Restore topology with all nodes visible. ring.restoreTopology(top, node.internalOrder()); - if (log.isDebugEnabled()) - log.debug("Restored topology from node added message: " + ring); + if (debugMode || log.isDebugEnabled()) + log.error("Restored topology from node added message: " + ring); gridDiscoveryData = msg.gridDiscoveryData(); @@ -4930,15 +5080,15 @@ msg.clearDiscoveryData(); } else { - if (log.isDebugEnabled()) - log.debug("Discarding node added message with empty topology: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Discarding node added message with empty topology: " + msg); return; } } else { - if (log.isDebugEnabled()) - log.debug("Discarding node added message (this message has already been processed) " + + if (debugMode || log.isDebugEnabled()) + log.error("Discarding node added message (this message has already been processed) " + "[spiState=" + spiState + ", msg=" + msg + ", locNode=" + locNode + ']'); @@ -4980,15 +5130,15 @@ TcpDiscoveryNode node = ring.node(nodeId); if (node == null) { - if (log.isDebugEnabled()) - log.debug("Discarding node add finished message since node is not found " + + if (debugMode || log.isDebugEnabled()) + log.error("Discarding node add finished message since node is not found " + "[msg=" + msg + ']'); return; } - if (log.isDebugEnabled()) - log.debug("Node to finish add: " + node); + if (debugMode || log.isDebugEnabled()) + log.error("Node to finish add: " + node); //we will need to recalculate this value since the topology changed if (nodeCompactRepresentationSupported) { @@ -5010,8 +5160,8 @@ } if (node.visible() && node.order() != 0) { - if (log.isDebugEnabled()) - log.debug("Discarding node add finished message since node has already been added " + + if (debugMode || log.isDebugEnabled()) + log.error("Discarding node add finished message since node has already been added " + "[node=" + node + ", msg=" + msg + ']'); return; @@ -5062,8 +5212,8 @@ assert b : "Topology version has not been updated: [ring=" + ring + ", msg=" + msg + ", lastMsg=" + lastMsg + ", spiState=" + state + ']'; - if (log.isDebugEnabled()) - log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']'); lastMsg = msg; } @@ -5080,8 +5230,8 @@ spi.ipFinder.registerAddresses(node.socketAddresses()); } catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to register new node address [node=" + node + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to register new node address [node=" + node + ", err=" + e.getMessage() + ']'); onException("Failed to register new node address [node=" + node + @@ -5166,8 +5316,8 @@ if (locNodeId.equals(leavingNodeId)) { if (msg.senderNodeId() == null) { synchronized (mux) { - if (log.isDebugEnabled()) - log.debug("Starting local node stop procedure."); + if (debugMode || log.isDebugEnabled()) + log.error("Starting local node stop procedure."); spiState = STOPPING; @@ -5203,8 +5353,8 @@ } if (ring.node(msg.senderNodeId()) == null) { - if (log.isDebugEnabled()) - log.debug("Discarding node left message since sender node is not in topology: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Discarding node left message since sender node is not in topology: " + msg); return; } @@ -5217,8 +5367,8 @@ } } else { - if (log.isDebugEnabled()) - log.debug("Discarding node left message since node was not found: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Discarding node left message since node was not found: " + msg); return; } @@ -5250,8 +5400,8 @@ assert leftNode != null : msg; - if (log.isDebugEnabled()) - log.debug("Removed node from topology: " + leftNode); + if (debugMode || log.isDebugEnabled()) + log.error("Removed node from topology: " + leftNode); long topVer; @@ -5270,8 +5420,8 @@ assert b : "Topology version has not been updated: [ring=" + ring + ", msg=" + msg + ", lastMsg=" + lastMsg + ", spiState=" + spiStateCopy() + ']'; - if (log.isDebugEnabled()) - log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']'); lastMsg = msg; } @@ -5284,15 +5434,20 @@ } else if (leftNode.equals(next) && sock != null) { try { + if(debugMode) { + log.error("spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()). Timeout: " + + (spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout())); + } + spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); - if (log.isDebugEnabled()) - log.debug("Sent verified node left message to leaving node: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Sent verified node left message to leaving node: " + msg); } catch (IgniteCheckedException | IOException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send verified node left message to leaving node [msg=" + msg + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to send verified node left message to leaving node [msg=" + msg + ", err=" + e.getMessage() + ']'); onException("Failed to send verified node left message to leaving node [msg=" + msg + @@ -5335,8 +5490,8 @@ else { forceSndPending = false; - if (log.isDebugEnabled()) - log.debug("Unable to send message across the ring (topology has no remote nodes): " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Unable to send message across the ring (topology has no remote nodes): " + msg); U.closeQuiet(sock); @@ -5373,8 +5528,8 @@ TcpDiscoveryNode sndNode = ring.node(sndId); if (sndNode == null) { - if (log.isDebugEnabled()) - log.debug("Discarding node failed message sent from unknown node: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Discarding node failed message sent from unknown node: " + msg); return; } @@ -5390,8 +5545,8 @@ } if (contains) { - if (log.isDebugEnabled()) - log.debug("Discarding node failed message sent from node which is about to fail: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Discarding node failed message sent from node which is about to fail: " + msg); return; } @@ -5403,8 +5558,8 @@ TcpDiscoveryNode failedNode = ring.node(failedNodeId); if (failedNode != null && failedNode.internalOrder() != msg.internalOrder()) { - if (log.isDebugEnabled()) - log.debug("Ignoring node failed message since node internal order does not match " + + if (debugMode || log.isDebugEnabled()) + log.error("Ignoring node failed message since node internal order does not match " + "[msg=" + msg + ", node=" + failedNode + ']'); return; @@ -5423,8 +5578,8 @@ } } else { - if (log.isDebugEnabled()) - log.debug("Discarding node failed message since node was not found: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Discarding node failed message since node was not found: " + msg); return; } @@ -5480,8 +5635,8 @@ assert b : "Topology version has not been updated: [ring=" + ring + ", msg=" + msg + ", lastMsg=" + lastMsg + ", spiState=" + spiStateCopy() + ']'; - if (log.isDebugEnabled()) - log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']'); lastMsg = msg; } @@ -5521,8 +5676,8 @@ if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); else { - if (log.isDebugEnabled()) - log.debug("Unable to send message across the ring (topology has no remote nodes): " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Unable to send message across the ring (topology has no remote nodes): " + msg); U.closeQuiet(sock); @@ -5530,6 +5685,8 @@ } checkPendingCustomMessages(); + + log.error("Node failed processed."); } /** @@ -5544,22 +5701,22 @@ if (msg.failedNodeId() != null) { if (locNodeId.equals(msg.failedNodeId())) { - if (log.isDebugEnabled()) - log.debug("Status check message discarded (suspect node is local node)."); + if (debugMode || log.isDebugEnabled()) + log.error("Status check message discarded (suspect node is local node)."); return; } if (locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() != null) { - if (log.isDebugEnabled()) - log.debug("Status check message discarded (local node is the sender of the status message)."); + if (debugMode || log.isDebugEnabled()) + log.error("Status check message discarded (local node is the sender of the status message)."); return; } if (isLocalNodeCoordinator() && ring.node(msg.creatorNodeId()) == null) { - if (log.isDebugEnabled()) - log.debug("Status check message discarded (creator node is not in topology)."); + if (debugMode || log.isDebugEnabled()) + log.error("Status check message discarded (creator node is not in topology)."); return; } @@ -5581,8 +5738,8 @@ @Override public void run() { synchronized (mux) { if (spiState == DISCONNECTED) { - if (log.isDebugEnabled()) - log.debug("Ignoring status check request, SPI is already disconnected: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Ignoring status check request, SPI is already disconnected: " + msg); return; } @@ -5597,7 +5754,7 @@ msg.failedNodeId()); if (msg0 == null) { - log.debug("Status check message discarded (creator node is not in topology)."); + log.error("Status check message discarded (creator node is not in topology)."); return; } @@ -5613,14 +5770,14 @@ try { trySendMessageDirectly(msg0.creatorNodeAddrs(), msg0.creatorNodeId(), msg0); - if (log.isDebugEnabled()) - log.debug("Responded to status check message " + + if (debugMode || log.isDebugEnabled()) + log.error("Responded to status check message " + "[recipient=" + msg0.creatorNodeId() + ", status=" + msg0.status() + ']'); } catch (IgniteSpiException e) { if (e.hasCause(SocketException.class)) { - if (log.isDebugEnabled()) - log.debug("Failed to respond to status check message (connection " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to respond to status check message (connection " + "refused) [recipient=" + msg0.creatorNodeId() + ", status=" + msg0.status() + ']'); @@ -5633,8 +5790,8 @@ U.error(log, "Failed to respond to status check message [recipient=" + msg0.creatorNodeId() + ", status=" + msg0.status() + ']', e); } - else if (log.isDebugEnabled()) { - log.debug("Failed to respond to status check message (did the node stop?)" + + else if (debugMode || log.isDebugEnabled()) { + log.error("Failed to respond to status check message (did the node stop?)" + "[recipient=" + msg0.creatorNodeId() + ", status=" + msg0.status() + ']'); } @@ -5649,16 +5806,16 @@ if (locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null && U.millisSinceNanos(locNode.lastUpdateTimeNanos()) < spi.metricsUpdateFreq) { - if (log.isDebugEnabled()) - log.debug("Status check message discarded (local node receives updates)."); + if (debugMode || log.isDebugEnabled()) + log.error("Status check message discarded (local node receives updates)."); return; } if (locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null && spiStateCopy() != CONNECTED) { - if (log.isDebugEnabled()) - log.debug("Status check message discarded (local node is not connected to topology)."); + if (debugMode || log.isDebugEnabled()) + log.error("Status check message discarded (local node is not connected to topology)."); return; } @@ -5668,8 +5825,8 @@ return; if (msg.status() == STATUS_OK) { - if (log.isDebugEnabled()) - log.debug("Received OK status response from coordinator: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Received OK status response from coordinator: " + msg); } else if (msg.status() == STATUS_RECON) { U.warn(log, "Node is out of topology (probably, due to short-time network problems)."); @@ -5678,8 +5835,8 @@ return; } - else if (log.isDebugEnabled()) - log.debug("Status value was not updated in status response: " + msg); + else if (debugMode || log.isDebugEnabled()) + log.error("Status value was not updated in status response: " + msg); // Discard the message. return; @@ -5708,23 +5865,23 @@ UUID locNodeId = getLocalNodeId(); if (ring.node(msg.creatorNodeId()) == null) { - if (log.isDebugEnabled()) - log.debug("Discarding metrics update message issued by unknown node [msg=" + msg + + if (debugMode || log.isDebugEnabled()) + log.error("Discarding metrics update message issued by unknown node [msg=" + msg + ", ring=" + ring + ']'); return; } if (isLocalNodeCoordinator() && !locNodeId.equals(msg.creatorNodeId())) { - if (log.isDebugEnabled()) - log.debug("Discarding metrics update message issued by non-coordinator node: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Discarding metrics update message issued by non-coordinator node: " + msg); return; } if (!isLocalNodeCoordinator() && locNodeId.equals(msg.creatorNodeId())) { - if (log.isDebugEnabled()) - log.debug("Discarding metrics update message issued by local node (node is no more coordinator): " + + if (debugMode || log.isDebugEnabled()) + log.error("Discarding metrics update message issued by local node (node is no more coordinator): " + msg); return; @@ -5848,8 +6005,8 @@ notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(), node); } - else if (log.isDebugEnabled()) - log.debug("Received metrics from unknown node: " + nodeId); + else if (debugMode || log.isDebugEnabled()) + log.error("Received metrics from unknown node: " + nodeId); } /** @@ -5888,8 +6045,8 @@ @Override public void run() { synchronized (mux) { if (spiState == DISCONNECTED) { - if (log.isDebugEnabled()) - log.debug("Ignoring ping request, SPI is already disconnected: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Ignoring ping request, SPI is already disconnected: " + msg); return; } @@ -5898,8 +6055,8 @@ final ClientMessageWorker worker = clientMsgWorkers.get(msg.creatorNodeId()); if (worker == null) { - if (log.isDebugEnabled()) - log.debug("Ping request from dead client node, will be skipped: " + msg.creatorNodeId()); + if (debugMode || log.isDebugEnabled()) + log.error("Ping request from dead client node, will be skipped: " + msg.creatorNodeId()); } else { boolean res; @@ -5942,9 +6099,9 @@ delayMsg = msg.topologyVersion() == 0L && !joiningEmpty; if (delayMsg) { - if (log.isDebugEnabled()) { + if (debugMode || log.isDebugEnabled()) { synchronized (mux) { - log.debug("Delay custom message processing, there are joining nodes [msg=" + msg + + log.error("Delay custom message processing, there are joining nodes [msg=" + msg + ", joiningNodes=" + joiningNodes + ']'); } } @@ -6015,8 +6172,8 @@ } if (msg.verified() && msg.topologyVersion() != ring.topologyVersion()) { - if (log.isDebugEnabled()) - log.debug("Discarding custom event message [msg=" + msg + ", ring=" + ring + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Discarding custom event message [msg=" + msg + ", ring=" + ring + ']'); return; } @@ -6217,7 +6374,7 @@ private void checkConnection() { Boolean hasRemoteSrvNodes = null; - if (spi.failureDetectionTimeoutEnabled() && !failureThresholdReached && + if (ENABLED_CONN_CHECK_THRESHOLD && spi.failureDetectionTimeoutEnabled() && !failureThresholdReached && U.millisSinceNanos(locNode.lastExchangeTimeNanos()) >= connCheckThreshold && spiStateCopy() == CONNECTED && (hasRemoteSrvNodes = ring.hasRemoteServerNodes())) { @@ -6322,8 +6479,8 @@ return; } catch (IOException e) { - if (log.isDebugEnabled()) - log.debug("Failed to bind to local port (will try next port within range) " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to bind to local port (will try next port within range) " + "[port=" + port + ", localHost=" + spi.locHost + ']'); onException("Failed to bind to local port. " + @@ -6380,7 +6537,7 @@ } } catch (IOException e) { - if (log.isDebugEnabled()) + if (debugMode || log.isDebugEnabled()) U.error(log, "Failed to accept TCP connection.", e); onException("Failed to accept TCP connection.", e); @@ -6474,6 +6631,8 @@ int timeout = sock.getSoTimeout(); + log.error("sock.setSoTimeout((int)spi.netTimeout): " + spi.netTimeout); + sock.setSoTimeout((int)spi.netTimeout); for (IgniteInClosure connLsnr : spi.incomeConnLsnrs) @@ -6492,8 +6651,8 @@ if (r >= 0) read += r; else { - if (log.isDebugEnabled()) - log.debug("Failed to read magic header (too few bytes received) " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to read magic header (too few bytes received) " + "[rmtAddr=" + rmtAddr + ", locAddr=" + sock.getLocalSocketAddress() + ']'); @@ -6505,8 +6664,8 @@ } if (!Arrays.equals(buf, U.IGNITE_HEADER)) { - if (log.isDebugEnabled()) - log.debug("Unknown connection detected (is some other software connecting to " + + if (debugMode || log.isDebugEnabled()) + log.error("Unknown connection detected (is some other software connecting to " + "this Ignite port?" + (!spi.isSslEnabled() ? " missed SSL configuration?" : "" ) + ") " + @@ -6522,10 +6681,16 @@ } // Restore timeout. + log.error("Restore timeout on socket, sock.setSoTimeout(timeout): " + timeout); sock.setSoTimeout(timeout); + log.error("spi.readMessage(sock, in, spi.netTimeout), timeout:" + spi.netTimeout); TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout); + simulateDelay(); + + log.error("SocketReader | Read " + msg); + // Ping. if (msg instanceof TcpDiscoveryPingRequest) { if (!spi.isNodeStopping0()) { @@ -6548,6 +6713,9 @@ res.clientExists(clientWorker.ping(timeoutHelper)); } + log.error("spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())). Timeout: " + + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); if (!(sock instanceof SSLSocket)) @@ -6557,8 +6725,8 @@ log.info("Finished writing ping response " + "[rmtNodeId=" + msg.creatorNodeId() + ", rmtAddr=" + rmtAddr + ", rmtPort=" + sock.getPort() + "]"); } - else if (log.isDebugEnabled()) - log.debug("Ignore ping request, node is stopping."); + else if (debugMode || log.isDebugEnabled()) + log.error("Ignore ping request, node is stopping."); return; } @@ -6583,6 +6751,8 @@ if (req.client()) res.clientAck(true); else if (req.changeTopology()) { + log.error("SocketReader | TcpDiscoveryHandshakeResponse is with changeTopology() == true."); + // Node cannot connect to it's next (for local node it's previous). // Need to check connectivity to it. long rcvdTime = lastRingMsgReceivedTime; @@ -6592,15 +6762,22 @@ boolean ok = rcvdTime + CON_CHECK_INTERVAL * 2 >= now; TcpDiscoveryNode previous = null; + log.error("SocketReader | boolean ok = rcvdTime + CON_CHECK_INTERVAL * 2 >= now : " + ok); + if (ok) { // Check case when previous node suddenly died. This will speed up // node failing. Set failed; + log.error("SocketReader | Check case when previous node suddenly died. This will speed up node failing."); + + //Looks like we can wait CONN_CHECK_INTERVAL_HERE synchronized (mux) { failed = failedNodes.keySet(); } + log.error("SocketReader | mux passed."); + previous = ring.previousNode(failed); InetSocketAddress liveAddr = null; @@ -6620,8 +6797,8 @@ } } - if (log.isInfoEnabled()) - log.info("Connection check done [liveAddr=" + liveAddr + //if (log.isInfoEnabled()) + log.error("Connection check done [liveAddr=" + liveAddr + ", previousNode=" + previous + ", addressesToCheck=" + nodeAddrs + ", connectingNodeId=" + nodeId + ']'); } @@ -6633,14 +6810,18 @@ res.previousNodeAlive(ok); - if (log.isInfoEnabled()) { - log.info("Previous node alive status [alive=" + ok + + //if (log.isInfoEnabled()) { + log.error("Previous node alive status [alive=" + ok + ", checkPreviousNodeId=" + req.checkPreviousNodeId() + ", actualPreviousNode=" + previous + ", lastMessageReceivedTime=" + rcvdTime + ", now=" + now + ", connCheckInterval=" + CON_CHECK_INTERVAL + ']'); - } + //} } + + log.error("TcpDiscoveryHandshakeResponse: " + res + + ", spi.writeToSocket(sock, res, spi.getEffectiveSocketTimeout(srvSock)), timeout: " + + spi.getEffectiveSocketTimeout(srvSock)); spi.writeToSocket(sock, res, spi.getEffectiveSocketTimeout(srvSock)); @@ -6649,8 +6830,8 @@ if (locNodeId.equals(nodeId)) { assert !req.client(); - if (log.isDebugEnabled()) - log.debug("Handshake request from local node: " + req); + if (debugMode || log.isDebugEnabled()) + log.error("Handshake request from local node: " + req); return; } @@ -6678,8 +6859,8 @@ if (old == null) break; - if (log.isDebugEnabled()) - log.debug("Already have client message worker, closing connection " + + if (debugMode || log.isDebugEnabled()) + log.error("Already have client message worker, closing connection " + "[locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ", workerSock=" + old.sock + @@ -6688,8 +6869,8 @@ return; } - if (log.isDebugEnabled()) - log.debug("Created client message worker [locNodeId=" + locNodeId + + if (debugMode || log.isDebugEnabled()) + log.error("Created client message worker [locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ", sock=" + sock + ']'); assert clientMsgWrk0 == clientMsgWorkers.get(nodeId); @@ -6705,7 +6886,7 @@ } } catch (IOException e) { - if (log.isDebugEnabled()) + if (debugMode || log.isDebugEnabled()) U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e); if (X.hasCause(e, SSLException.class) && spi.isSslEnabled() && !spi.isNodeStopping0()) @@ -6732,7 +6913,7 @@ return; } catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) + if (debugMode || log.isDebugEnabled()) U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e); onException("Caught exception on handshake [err=" + e +", sock=" + sock + ']', e); @@ -6765,21 +6946,28 @@ TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration())); + simulateDelay(); + msg.senderNodeId(nodeId); DebugLogger debugLog = messageLogger(msg); - if (debugLog.isDebugEnabled()) - debugLog.debug("Message has been received: " + msg); + if (debugMode || log.isDebugEnabled()) + debugLog.error("Message has been received: " + msg); spi.stats.onMessageReceived(msg); - if (debugMode && recordable(msg)) - debugLog(msg, "Message has been received: " + msg); + spi.onMessageReceived(msg); + +// if (debugMode && recordable(msg)) +// debugLog(msg, "Message has been received: " + msg); if (msg instanceof TcpDiscoveryConnectionCheckMessage) { ringMessageReceived(); + if(debugMode) + log.error("spi.writeToSocket(msg, sock, RES_OK, sockTimeout), timeout (spi.getEffectiveSocketTimeout(srvSock)): " + sockTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); continue; @@ -6801,6 +6989,9 @@ TcpDiscoverySpiState state = spiStateCopy(); if (state == CONNECTED) { + if(debugMode) + log.error("spi.writeToSocket(msg, sock, RES_OK, sockTimeout), timeout (spi.getEffectiveSocketTimeout(srvSock)): " + sockTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); if (clientMsgWrk != null && clientMsgWrk.runner() == null && !clientMsgWrk.isDone()) @@ -6815,6 +7006,9 @@ // If message is received from previous node and node is connecting forward to next node. if (!getLocalNodeId().equals(msg0.routerNodeId()) && state == CONNECTING) { + if(debugMode) + log.error("spi.writeToSocket(msg, sock, RES_OK, sockTimeout), timeout (spi.getEffectiveSocketTimeout(srvSock)): " + sockTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); msgWorker.addMessage(msg); @@ -6822,12 +7016,16 @@ continue; } + log.error("spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout), timeout (spi.getEffectiveSocketTimeout(srvSock)): " + sockTimeout); + spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout); break; } } else if (msg instanceof TcpDiscoveryDuplicateIdMessage) { + log.error("spi.writeToSocket(msg, sock, RES_OK, sockTimeout), timeout (spi.getEffectiveSocketTimeout(srvSock)): " + sockTimeout); + // Send receipt back. spi.writeToSocket(msg, sock, RES_OK, sockTimeout); @@ -6850,13 +7048,15 @@ } } - if (ignored && log.isDebugEnabled()) - log.debug("Duplicate ID message has been ignored [msg=" + msg + + if (ignored && (debugMode || log.isDebugEnabled())) + log.error("Duplicate ID message has been ignored [msg=" + msg + ", spiState=" + state + ']'); continue; } else if (msg instanceof TcpDiscoveryAuthFailedMessage) { + log.error("spi.writeToSocket(msg, sock, RES_OK, sockTimeout), timeout (spi.getEffectiveSocketTimeout(srvSock)): " + sockTimeout); + // Send receipt back. spi.writeToSocket(msg, sock, RES_OK, sockTimeout); @@ -6872,8 +7072,8 @@ UUID targetNode = ((TcpDiscoveryAuthFailedMessage)msg).getTargetNodeId(); if (targetNode == null || targetNode.equals(locNodeId)) { - if (log.isDebugEnabled()) - log.debug("Auth failed message has been ignored [msg=" + msg + + if (debugMode || log.isDebugEnabled()) + log.error("Auth failed message has been ignored [msg=" + msg + ", spiState=" + spiState + ']'); } else @@ -6885,6 +7085,8 @@ continue; } else if (msg instanceof TcpDiscoveryCheckFailedMessage) { + log.error("spi.writeToSocket(msg, sock, RES_OK, sockTimeout), timeout (spi.getEffectiveSocketTimeout(srvSock)): " + sockTimeout); + // Send receipt back. spi.writeToSocket(msg, sock, RES_OK, sockTimeout); @@ -6911,8 +7113,8 @@ worker.addMessage(msg); } else { - if (log.isDebugEnabled()) { - log.debug("Failed to find client message worker " + + if (debugMode || log.isDebugEnabled()) { + log.error("Failed to find client message worker " + "[clientNode=" + msg.creatorNodeId() + ']'); } } @@ -6921,13 +7123,15 @@ } } - if (ignored && log.isDebugEnabled()) - log.debug("Check failed message has been ignored [msg=" + msg + + if (ignored && (debugMode || log.isDebugEnabled())) + log.error("Check failed message has been ignored [msg=" + msg + ", spiState=" + state + ']'); continue; } else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) { + log.error("spi.writeToSocket(msg, sock, RES_OK, sockTimeout), timeout (spi.getEffectiveSocketTimeout(srvSock)): " + sockTimeout); + // Send receipt back. spi.writeToSocket(msg, sock, RES_OK, sockTimeout); @@ -6950,8 +7154,8 @@ } } - if (ignored && log.isDebugEnabled()) - log.debug("Loopback problem message has been ignored [msg=" + msg + + if (ignored && (debugMode || log.isDebugEnabled())) + log.error("Loopback problem message has been ignored [msg=" + msg + ", spiState=" + state + ']'); continue; @@ -6993,14 +7197,17 @@ clientMsgWrk.addMessage(ack); } - else + else { + log.error("spi.writeToSocket(msg, sock, RES_OK, sockTimeout), timeout (spi.getEffectiveSocketTimeout(srvSock)): " + sockTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); + } if (metricsUpdateMsg != null) processClientMetricsUpdateMessage(metricsUpdateMsg); } catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) + if (debugMode || log.isDebugEnabled()) U.error(log, "Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e); @@ -7027,7 +7234,7 @@ return; } catch (IOException e) { - if (log.isDebugEnabled()) + if (debugMode || log.isDebugEnabled()) U.error(log, "Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e); @@ -7054,8 +7261,8 @@ } finally { if (clientMsgWrk != null) { - if (log.isDebugEnabled()) - log.debug("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId + + if (debugMode || log.isDebugEnabled()) + log.error("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']'); clientMsgWorkers.remove(nodeId, clientMsgWrk); @@ -7079,6 +7286,9 @@ */ private void ringMessageReceived() { lastRingMsgReceivedTime = U.currentTimeMillis(); + + if( debugMode ) + log.error("Updated lastRingMsgReceivedTime"); } /** @@ -7086,16 +7296,38 @@ * @return {@code True} if got connection refused on connect try. */ private boolean isConnectionRefused(SocketAddress addr) { + log.error("Checking if connection refused: " + addr + " with hardcodded timeout " + 100); + + long prevNodeConnectionCheckDelay = ServerImpl.this.prevNodeConnectionCheckDelay; + + if (prevNodeConnectionCheckDelay != 0) { + log.error("Simulating check connection delay delay: " + Math.abs(prevNodeConnectionCheckDelay)); + + try { + Thread.sleep(Math.abs(prevNodeConnectionCheckDelay)); + } + catch (InterruptedException e) { + // No-op. + } + + //Negative value indicates indicates + return prevNodeConnectionCheckDelay > 0; + } + try (Socket sock = new Socket()) { sock.connect(addr, 100); } catch (ConnectException e) { + log.error("Got ConnectException " + e.getMessage() + ". Connection refused."); return true; } catch (IOException e) { + log.error("Got IOException " + e.getMessage() + ". Connection not refused."); return false; } + log.error("Got no any exception. Connection not refused."); + return false; } @@ -7130,21 +7362,21 @@ msg.pendingMessages(pending); msg.success(true); - if (log.isDebugEnabled()) - log.debug("Accept client reconnect, restored pending messages " + + if (debugMode || log.isDebugEnabled()) + log.error("Accept client reconnect, restored pending messages " + "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); } else if (!isLocalNodeCoordinator()) { - if (log.isDebugEnabled()) - log.debug("Failed to restore pending messages for reconnecting client. " + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to restore pending messages for reconnecting client. " + "Forwarding reconnection message to coordinator " + "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); } else { msg.verify(locNodeId); - if (log.isDebugEnabled()) - log.debug("Failing reconnecting client node because failed to restore pending " + + if (debugMode || log.isDebugEnabled()) + log.error("Failing reconnecting client node because failed to restore pending " + "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId, @@ -7156,8 +7388,8 @@ else { msg.verify(locNodeId); - if (log.isDebugEnabled()) - log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Reconnecting client node is already failed [nodeId=" + nodeId + ']'); } if (msg.verified() && isLocNodeRouter) { @@ -7165,8 +7397,8 @@ if (wrk != null) wrk.addMessage(msg); - else if (log.isDebugEnabled()) - log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" + + else if (debugMode || log.isDebugEnabled()) + log.error("Failed to reconnect client node (disconnected during the process) [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); } else @@ -7184,8 +7416,8 @@ if (wrk != null) wrk.addMessage(msg); - else if (log.isDebugEnabled()) - log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" + + else if (debugMode || log.isDebugEnabled()) + log.error("Failed to reconnect client node (disconnected during the process) [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); } else if (ring.hasRemoteNodes() && !isLocalNodeCoordinator()) @@ -7205,8 +7437,8 @@ if (wrk != null) wrk.metrics(msg.metrics()); - else if (log.isDebugEnabled()) - log.debug("Received client metrics update message from unknown client node: " + msg); + else if (debugMode || log.isDebugEnabled()) + log.error("Received client metrics update message from unknown client node: " + msg); } /** @@ -7232,16 +7464,20 @@ // Check that joining node can accept incoming connections. if (node.clientRouterNodeId() == null) { if (!pingJoiningNode(node)) { + log.error("spi.writeToSocket(msg, sock, RES_JOIN_IMPOSSIBLE, sockTimeout), timeout (spi.failureDetectionTimeout() || spi.getSocketTimeout()): " + sockTimeout); + spi.writeToSocket(msg, sock, RES_JOIN_IMPOSSIBLE, sockTimeout); return false; } } + log.error("spi.writeToSocket(msg, sock, RES_OK, sockTimeout)), timeout (spi.failureDetectionTimeout() || spi.getSocketTimeout()): " + sockTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); - if (log.isDebugEnabled()) - log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']'); msg.responded(true); @@ -7275,10 +7511,12 @@ // Local node is stopping. Remote node should try next one. res = RES_CONTINUE_JOIN; + log.error("spi.writeToSocket(msg, sock, RES_OK, sockTimeout), timeout (spi.failureDetectionTimeout() || spi.getSocketTimeout()): " + sockTimeout); + spi.writeToSocket(msg, sock, res, sockTimeout); - if (log.isDebugEnabled()) - log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Responded to join request message [msg=" + msg + ", res=" + res + ']'); fromAddrs.addAll(msg.node().socketAddresses()); @@ -7304,8 +7542,8 @@ } } catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to ping joining node, closing connection. [node=" + node + + if (debugMode || log.isDebugEnabled()) + log.error("Failed to ping joining node, closing connection. [node=" + node + ", err=" + e.getMessage() + ']'); } } @@ -7361,8 +7599,8 @@ /** {@inheritDoc} */ @SuppressWarnings({"BusyWait"}) @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Statistics printer has been started."); + if (debugMode || log.isDebugEnabled()) + log.error("Statistics printer has been started."); while (!isInterrupted()) { Thread.sleep(spi.statsPrintFreq); @@ -7452,8 +7690,8 @@ DebugLogger log = messageLogger(msg); - if (log.isDebugEnabled()) - log.debug("Message has been added to client queue: " + msg); + if (debugMode || log.isDebugEnabled()) + log.error("Message has been added to client queue: " + msg); } /** {@inheritDoc} */ @@ -7478,28 +7716,36 @@ if (node != null) clientVer = IgniteUtils.productVersion(node); - else if (msgLog.isDebugEnabled()) - msgLog.debug("Skip sending message ack to client, fail to get client node " + + else if (true || msgLog.isDebugEnabled()) + msgLog.error("Skip sending message ack to client, fail to get client node " + "[sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); } if (clientVer != null) { - if (msgLog.isDebugEnabled()) - msgLog.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + if (true || msgLog.isDebugEnabled()) + msgLog.error("Sending message ack to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); + log.error("spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ?\n" + + " spi.clientFailureDetectionTimeout() : spi.getSocketTimeout(), timeout: " + + (spi.failureDetectionTimeoutEnabled() ? + spi.clientFailureDetectionTimeout() : spi.getSocketTimeout())); + spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ? spi.clientFailureDetectionTimeout() : spi.getSocketTimeout()); } } else { - if (msgLog.isDebugEnabled()) - msgLog.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" + if (true || msgLog.isDebugEnabled()) + msgLog.error("Redirecting message to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); assert topologyInitialized(msg) : msg; + log.error("spi.writeToSocket(sock, msg, msgBytes, spi.getEffectiveSocketTimeout(false)), timeout: " + + spi.getEffectiveSocketTimeout(false)); + spi.writeToSocket(sock, msg, msgBytes, spi.getEffectiveSocketTimeout(false)); } @@ -7511,7 +7757,7 @@ success = !clientFailed; } catch (IgniteCheckedException | IOException e) { - if (log.isDebugEnabled()) + if (debugMode || log.isDebugEnabled()) U.error(log, "Client connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']', e); @@ -7750,8 +7996,8 @@ /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']'); + if (debugMode || log.isDebugEnabled()) + log.error("Message worker started [locNodeId=" + getConfiguredNodeId() + ']'); while (!isCancelled()) { if (beforeEachPoll != null) Index: modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/NodeFailureTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/NodeFailureTest.java (revision 4eb4c8d91b64bf8bd3c9acfb4b26c457f9b83f65) +++ modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/NodeFailureTest.java (revision 4eb4c8d91b64bf8bd3c9acfb4b26c457f9b83f65) @@ -0,0 +1,296 @@ +package org.apache.ignite.spi.discovery.tcp; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy; +import org.junit.Test; + +/** + * Ensures delay of node failure detection is within {@code maxNodeFailureDetectionDelay()}. + */ +public class NodeFailureTest extends GridCommonAbstractTest { + /** + * Total grids to start. The more the longer the tests are. The test check crash of every node from 1 to GRID_CNT - 1. + */ + private static final int GRID_CNT = 7; + + /** + * Cycles number to run test with one settings set. Test measures maximal and average values. + * The more cycles the longer ttests are but issues more accurate values. + */ + private static final int CYCLES_PER_SETTINGS = 7; + + /** + * If {@code false}, each parameter set will be applied chashing every node starting from second in every cycle up + * to {@code #CYCLES_PER_SETTINGS}. + * If {@code true}, each parameter set will be applied only for one randomly choosen node in every cycle up to + * {@code #CYCLES_PER_SETTINGS}. + * + * If {@code false}, the tests take more time but gives more accuracy. If {@code true}, test run faster with cost of + * lesse accuracy. + */ + private static final boolean RANDOM_NODE = true; + + /** Total delays in the code. */ + private static final int ACCEPTABLE_CODE_DELAYS = 20; + + /** {@code True} to activate cache puts while testing. */ + private static final boolean WITH_LOAD = false; + + /** How long to wait after start before kill a node. */ + private static final long BREATHE_DELAY = 1000; + + /** Not a param. */ + private long failureDetectionTimeout; + + /** Not a param. */ + private boolean putterActive; + + /** */ + @Test + public void test() throws Exception { + for (long failureDetectionTimeout = 100; failureDetectionTimeout <= 500; failureDetectionTimeout += 100) { + this.failureDetectionTimeout = failureDetectionTimeout; + + int startNode = RANDOM_NODE ? 1 + new Random().nextInt(GRID_CNT - 1) : 1; + int endNode = RANDOM_NODE ? startNode : GRID_CNT - 1; + + for (int i = 0; i < CYCLES_PER_SETTINGS; ++i) { + for (int failedNodeIdx = startNode; failedNodeIdx <= endNode; ++failedNodeIdx) { + log.info("Cycle " + i + ", badNodeIdx = " + failedNodeIdx + ", FDT = " + + failureDetectionTimeout); + + launchTest(WITH_LOAD, failedNodeIdx); + } + } + } + } + + /** Launches grids, kills test node and waits for next cycle. */ + private void launchTest(boolean withLoad, int failedNodeIdx) throws Exception { + final Exchanger sem = new Exchanger<>(); + + final AtomicLong timer = new AtomicLong(); + + final IgniteEx node0 = startGrids(GRID_CNT); + + if (withLoad) + node0.createCache("cache"); + + listenNodeFailed(timer, failedNodeIdx, withLoad, sem); + + breathe(); + + if (withLoad) { + startCachePutter(node0, 8, -1, "cache"); + + breathe(); + } + + failNode(failedNodeIdx, timer); + + //Wait untill failed noded is detected. + sem.exchange(Object.class); + + assertTrue( "Too long failure detection delay: " + timer.get(), timer.get() <= maxNodeFailureDetectionDelay() ); + + if(log.isDebugEnabled()) + log.debug("Stopping all the grids."); + + stopAllGrids(true); + } + + /** + * Kills node process in multiJVM tests or simulates unacceptable delays in the target node. + * + * @param nodeIdx idx of the node to simulate its failure or kill its process. + * @param timer To store failure time. + */ + private void failNode(int nodeIdx, AtomicLong timer) throws Exception { + IgniteEx node = grid(nodeIdx); + + IgniteProcessProxy procProxy = (IgniteProcessProxy)node; + + procProxy.kill(); + + timer.set(System.currentTimeMillis()); + + log.info("Terminated node with idx " + nodeIdx + "."); + } + + /** + * Registers listener of {@code TcpDiscoveryNodeFailedMessage}. + * @param badNodeNum number of the node failed in the test. + * @param withLoad {@code True} if test was started with loading. + * @param sem {@code True} if test was started with loading. + */ + private void listenNodeFailed(AtomicLong timer, int badNodeNum, boolean withLoad, Exchanger sem) { + TestDiscovery testDiscovery = (TestDiscovery)grid(0).context().discovery().getInjectedDiscoverySpi(); + + testDiscovery.writeMsgWatcher = new Consumer() { + + private final AtomicBoolean detected = new AtomicBoolean(); + + @Override public void accept(TcpDiscoveryAbstractMessage msg) { + if (msg instanceof TcpDiscoveryNodeFailedMessage && detected.compareAndSet(false, true)) { + timer.set(System.currentTimeMillis() - timer.get()); + + TcpDiscoveryNodeFailedMessage nodeFailedMsg = (TcpDiscoveryNodeFailedMessage)msg; + + // Run additional action. + if (withLoad) + stopPutter(); + + // Stop the cluster with a delay in separate thread. + new Thread(() -> { + try { + if(log.isDebugEnabled()) + log.debug("Releasing the semaphore for new test cycle."); + + //Release the test cycle, begin new. + sem.exchange(Object.class); + } + catch (InterruptedException ex) { + ex.printStackTrace(); + } + }, "clusterStopper").start(); + + if (nodeFailedMsg.internalOrder() != badNodeNum + 1) + log.error("Wrong order of failed node: " + nodeFailedMsg.internalOrder()); + } + } + }; + } + + /** + * Here you can adjust any acceptable maximum delay of node failure detection + */ + private long maxNodeFailureDetectionDelay(){ + return failureDetectionTimeout + ServerImpl.CON_CHECK_INTERVAL + ACCEPTABLE_CODE_DELAYS; + } + + /** Test timeout: suggested 15mins. */ + @Override protected long getTestTimeout() { + return 15 * 60 * 1000; + } + + /** */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + //Disable 'system worker blocked messages.' + cfg.setSystemWorkerBlockedTimeout(20_000); + + cfg.setDiscoverySpi(new TestDiscovery()); + + TestDiscovery disco = (TestDiscovery)cfg.getDiscoverySpi(); + + disco.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setFailureDetectionTimeout(failureDetectionTimeout); + + // Prevent other discovery messages. + if (cfg.getMetricsUpdateFrequency() < failureDetectionTimeout) + cfg.setMetricsUpdateFrequency(failureDetectionTimeout - 1); + + cfg.setClientFailureDetectionTimeout(cfg.getMetricsUpdateFrequency()); + + return cfg; + } + + /** + * Launches cache operation. Emulates some load on the cluster. on multi-JVM test works only with first node. + * + * @param node - operation node. + * @param puttersCnt - putter number (threads). + * @param delay delay after each put operation. + * @param cacheName Name of the cache to use. + */ + private void startCachePutter(IgniteEx node, int puttersCnt, long delay, String cacheName) { + putterActive = true; + + CountDownLatch cntDownLatch = new CountDownLatch(puttersCnt); + + for (int i = 0; i < puttersCnt; ++i) { + Thread putter = new Thread(() -> { + cntDownLatch.countDown(); + + while (putterActive) { + IgniteCache cache = node.cache(cacheName); + + cache.put(ThreadLocalRandom.current().nextInt(100000), ThreadLocalRandom.current().nextInt(100000)); + + if(delay>0) { + try { + Thread.sleep(delay); + } + catch (InterruptedException e) { + // No-op; + } + } + } + }, "putter_" + i); + + putter.setDaemon(true); + + putter.start(); + } + } + + /** Take a while before a significant operation. */ + private void breathe() throws InterruptedException { + Thread.sleep(BREATHE_DELAY); + } + + /** */ + private void stopPutter(){ + putterActive = false; + } + + /** */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(true); + } + + /** */ + @Override protected boolean isMultiJvm() { + return true; + } + + /** + * Can call a watcher before once a message is sent. + */ + private static final class TestDiscovery extends TcpDiscoverySpi { + /** */ + public volatile Consumer writeMsgWatcher; + + /** */ + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + super.writeToSocket(sock, out, msg, timeout); + + Consumer writeMsgWatcher = this.writeMsgWatcher; + + if (writeMsgWatcher != null) + writeMsgWatcher.accept(msg); + } + } +} Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java (revision 9ef57363b872842a9758b0d9ba727c488141e00f) +++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java (revision 4a6a25b2378a513613da43fddfda61b31e73058d) @@ -1542,8 +1542,12 @@ assert addr != null; + log.error("Opening socket, connecting. sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout)), timeout: " + timeoutHelper.nextTimeoutChunk(sockTimeout)); + sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout)); + log.error("Opening socket, writeToSocket(sock, null, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)), timeout: " + timeoutHelper.nextTimeoutChunk(sockTimeout)); + writeToSocket(sock, null, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); return sock; @@ -2017,6 +2021,16 @@ return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null; } + /** */ + protected void onMessageReceived(TcpDiscoveryAbstractMessage msg){ + + } + + /** */ + protected void onMessageSent(TcpDiscoveryAbstractMessage msg){ + + } + /** * @param msg Failed message. * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise. @@ -2401,6 +2415,16 @@ impl.simulateNodeFailure(); } + /** */ + public void simulateDelay(long delay){ + impl.simulateDelay(delay); + } + + /** */ + public void simulateCheckPrevNodeConnectionFailed(long delay){ + impl.simulateCheckPrevNodeConnectionFailed(delay); + } + /** * FOR TEST PURPOSE ONLY! */ Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java (revision 0e49473aa368f1e697f5c662ee6447b856b0effa) +++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java (revision 4eb4c8d91b64bf8bd3c9acfb4b26c457f9b83f65) @@ -69,7 +69,7 @@ protected volatile TcpDiscoveryNode locNode; /** Debug mode. */ - protected boolean debugMode; + public static boolean debugMode; /** Debug messages history. */ private int debugMsgHist = 512; @@ -88,6 +88,11 @@ @Override public void debug(String msg) { log.debug(msg); } + + /** {@inheritDoc} */ + @Override public void error(String msg) { + log.error(msg); + } }; /** */ @@ -101,6 +106,11 @@ @Override public void debug(String msg) { log.trace(msg); } + + /** {@inheritDoc} */ + @Override public void error(String msg) { + log.error(msg); + } }; /** @@ -329,9 +339,15 @@ */ abstract void simulateNodeFailure(); - /** - * FOR TEST PURPOSE ONLY! - */ + /** */ + abstract void simulateDelay(long delay); + + /** */ + abstract void simulateCheckPrevNodeConnectionFailed(long delay); + + /** + * FOR TEST PURPOSE ONLY! + */ public abstract void brakeConnection(); /** @@ -442,5 +458,8 @@ * @param msg Message to log. */ void debug(String msg); + + /** */ + void error(String msg); } } Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java (revision 0973a013eca04baa4bbedea6e67a075c72664652) +++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java (revision c714aec847a01e52dc5cc49113c501ab3c0dcbbb) @@ -265,6 +265,16 @@ return "disconnected"; } + /** */ + @Override void simulateDelay(long delay) { + // No-op. + } + + /** */ + @Override void simulateCheckPrevNodeConnectionFailed(long delay) { + // No-op. + } + /** {@inheritDoc} */ @Override public int getMessageWorkerQueueSize() { return msgWorker.queueSize(); Index: modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/NodeFailureResearchTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/NodeFailureResearchTest.java (revision 4eb4c8d91b64bf8bd3c9acfb4b26c457f9b83f65) +++ modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/NodeFailureResearchTest.java (revision 4eb4c8d91b64bf8bd3c9acfb4b26c457f9b83f65) @@ -0,0 +1,434 @@ +package org.apache.ignite.spi.discovery.tcp; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import static org.apache.ignite.internal.util.IgniteUtils.defaultWorkDirectory; + +/** + * Measures delays of node failure detection and writes it in {@code #resultsOutputPath()}. Default: working dir. + */ +public class NodeFailureResearchTest extends GridCommonAbstractTest { + /** + * Total grids to start. The more the longer the tests are. The test check crash of every node from 1 to GRID_CNT - 1. + */ + private static final int GRID_CNT = 7; + + /** + * Cycles number to run test with one settings set. Test measures maximal and average values. + * The more cycles the longer ttests are but issues more accurate values. + */ + private static final int CYCLES_PER_SETTINGS = 7; + + /** Max. value of the failureDetectionTimeout. */ + private static final int MAX_FAILURE_DETECTION_TIMEOUT = 500; + + /** Max value of {@code ServerImpl.CON_CHECK_INTERVAL}. */ + private static final int MAX_CHECK_CONNECTION_INTERVAL = 500; + + /** + * Sets delay on message processing of 'bad' node simulating its failure with no + * network errors. This emulates the worst case when failed node is not seen by socket exceptions like + * 'Connection refused' or 'Connection reset'. + * The value is somewhat bigger that the timeouts. + */ + private static final int DELAY_OF_FAILED_NODE = 2_000; + + /** + * Emulates delay and result of connection checking in {@code ServerImpl#isConnectionRefused} used to ensure if + * previous node in the ring is alive. Duration and result of this checking can significantly affect duration of + * node failure detection. Long freezes with IOException/timeout can duplicate cycle of node ping + * with failureDetectionTimeout intervals. + * + * Positive value forces to wait and simulates 'connection refused' after. This indicates failed connection/node + * with gived delay. It is relatively good case depending of the value. You can set 1 to indicate failed connection + * immediatelly. + * + * Negative value gives worst case. This is same delay as positive value but emulates 'connection timeout' after the + * delay. Currectly, сonnection timeout is not considered as lost connection. This often causes rechecking status of + * the failed node with additional delays. + */ + private static final long SIMULATE_CHECK_CONNECTION_DELAY = -100; + + /** + * If {@code false}, each parameter set will be applied chashing every node starting from second in every cycle up + * to {@code #CYCLES_PER_SETTINGS}. + * If {@code true}, each parameter set will be applied only for one randomly choosen node in every cycle up to + * {@code #CYCLES_PER_SETTINGS}. + * + * If {@code false}, the tests take more time but gives more accuracy. If {@code true}, test run faster with cost of + * lesse accuracy. + */ + private static final boolean RANDOM_NODE = true; + + /** {@code True} to activate cache puts while testing. */ + private static final boolean WITH_LOAD = false; + + /** Enabled or disables debug logging on ServerImpl. Mostly is used to watch actual socket timeouts. */ + private static final boolean DEBUG_MODE = false; + + /** Not a param. */ + private volatile boolean putterActive; + + /** Not a test param. Is used in {@code #getConfiguration()}. */ + private volatile long failureDetectionTimeout; + + /** */ + @Test + public void testFailureDetectionTimeout() throws Exception { + ServerImpl.debugMode = DEBUG_MODE; + + //The print writer is test results output. + try (PrintWriter printWriter = new PrintWriter(resultsOutputPath())) { + //Iterate with various {@link ServerImpl#CON_CHECK_INTERVAL}. + for (int cci = 100; cci <= MAX_CHECK_CONNECTION_INTERVAL; cci += 100) { + //Iterate with various failureDetectionTimeout + for (long failureDetectionTimeout = minimalFailureDetectionTimeout(cci); + failureDetectionTimeout <= MAX_FAILURE_DETECTION_TIMEOUT; + failureDetectionTimeout += 100) + { + this.failureDetectionTimeout = failureDetectionTimeout; + + ServerImpl.CON_CHECK_INTERVAL = cci; + + //Here we store delays of node failure detection. + final List delays = new ArrayList<>(CYCLES_PER_SETTINGS * GRID_CNT); + + int startNode = RANDOM_NODE ? 1 + new Random().nextInt(GRID_CNT - 1) : 1; + int endNode = RANDOM_NODE ? startNode + 1 : GRID_CNT; + + for (int i = 0; i < CYCLES_PER_SETTINGS; ++i) { + for (int failedNodeIdx = startNode; failedNodeIdx < endNode; ++failedNodeIdx) { + log.info("Cycle " + i + ", badNodeIdx = " + failedNodeIdx + ", FDT = " + + failureDetectionTimeout + ", CCI = " + ServerImpl.CON_CHECK_INTERVAL); + + launchTest(WITH_LOAD, delays, failedNodeIdx); + } + } + + storeResults(printWriter, delays); + } + } + } + } + + /** */ + private String resultsOutputPath() throws Exception { + return defaultWorkDirectory() + '/' + getClass().getSimpleName() + ".txt"; + } + + /** + * Keeps the detecting node away from segmentation which makes this test non-working. + * In some cases failureDetectionTimeout has to be bigger some threshold due to the hardcoded constants. + */ + private long minimalFailureDetectionTimeout(long connectionCheckInterval) { + long failureDetectionTimeout = 100; + + if (SIMULATE_CHECK_CONNECTION_DELAY >= failureDetectionTimeout) + failureDetectionTimeout = 2 * SIMULATE_CHECK_CONNECTION_DELAY; + else if (SIMULATE_CHECK_CONNECTION_DELAY < 0 && failureDetectionTimeout < connectionCheckInterval - 100) + failureDetectionTimeout = connectionCheckInterval - 100; + + //Cut to hundered. + failureDetectionTimeout = (int)Math.ceil(failureDetectionTimeout / 100.0) * 100; + + return failureDetectionTimeout; + } + + /** */ + private void launchTest(boolean withLoad, List delays, int failedNodeIdx) throws Exception { + final Exchanger sem = new Exchanger<>(); + + final AtomicLong timer = new AtomicLong(); + + final IgniteEx node0 = startGrids(GRID_CNT); + + if (withLoad) + node0.createCache("cache"); + + registerFailNodeMessageListener(delays, timer, failedNodeIdx, withLoad, sem); + + if (withLoad) + startCachePutter(node0, 8, -1, "cache"); + + failNode(failedNodeIdx, timer); + + //Wait untill failed noded is detected. + sem.exchange(Object.class); + + log.info("Stopping all the grids."); + + stopAllGrids(true); + } + + /** + * Simulates unacceptable delays in the target node. + * + * @param nodeIdx idx of the node to simulate its failure or kill its process. + * @param timer To store failure time. + */ + private void failNode(int nodeIdx, AtomicLong timer) throws Exception { + IgniteEx node = grid(nodeIdx); + + int prevNodeIdx = nodeIdx == 0 ? GRID_CNT - 1 : nodeIdx - 1; + + TestDiscovery prevDiscovery = (TestDiscovery)grid(prevNodeIdx).context().discovery().getInjectedDiscoverySpi(); + + log.info("Will simulate failure of node with idx " + nodeIdx + " once previous node " + prevNodeIdx + + " sends TcpDiscoveryConnectionCheckMessage."); + + //Fail node in the worst case: right after the ping message is sucessufuly sent. + prevDiscovery.sentMessageWatcher = new Consumer() { + private final AtomicBoolean failed = new AtomicBoolean(); + + @Override public void accept(TcpDiscoveryAbstractMessage message) { + if (message instanceof TcpDiscoveryConnectionCheckMessage && failed.compareAndSet(false, true)) { + int nextNodeIdx = nodeIdx == GRID_CNT - 1 ? 0 : nodeIdx + 1; + + TestDiscovery nextDiscovery = (TestDiscovery)grid(nextNodeIdx).context().discovery().getInjectedDiscoverySpi(); + + nextDiscovery.simulateCheckPrevNodeConnectionFailed(SIMULATE_CHECK_CONNECTION_DELAY); + + TestDiscovery discoveryToFail = (TestDiscovery)node.context().discovery().getInjectedDiscoverySpi(); + + discoveryToFail.simulateDelay(DELAY_OF_FAILED_NODE); + + timer.set(System.currentTimeMillis()); + + log.info("Simulated failure of node with idx " + nodeIdx + "."); + } + } + }; + } + + /** + * Registers listener of {@code TcpDiscoveryNodeFailedMessage}. + * @param delays There to stome delay since {@code timer}. + * @param badNodeNum number of the node failed in the test. + * @param withLoad {@code True} if test was started with loading. + * @param sem {@code True} if test was started with loading. + */ + private void registerFailNodeMessageListener(List delays, AtomicLong timer, int badNodeNum, boolean withLoad, + Exchanger sem) { + + int detectingNodeIdx = badNodeNum - 1; + + if(detectingNodeIdx<0) + detectingNodeIdx = GRID_CNT - 1; + + TestDiscovery testDiscovery = (TestDiscovery)grid(detectingNodeIdx).context().discovery().getInjectedDiscoverySpi(); + + testDiscovery.writeMessageWatcher = new Consumer() { + + private final AtomicBoolean detected = new AtomicBoolean(); + + @Override public void accept(TcpDiscoveryAbstractMessage msg) { + if (msg instanceof TcpDiscoveryNodeFailedMessage && detected.compareAndSet(false, true)) { + TcpDiscoveryNodeFailedMessage nodeFailedMsg = (TcpDiscoveryNodeFailedMessage)msg; + + delays.add(System.currentTimeMillis() - timer.get()); + + // Run additional action. + if (withLoad) + stopPutter(); + + log.info("Failed node detection delay: " + delays.get(delays.size() - 1)); + + // Stop the cluster with a delay in separate thread. + new Thread(() -> { + try { + log.info("Releasing the semaphore to begin new test cycle."); + + //Release the test cycle, begin new. + sem.exchange(Object.class); + } + catch (InterruptedException ex) { + ex.printStackTrace(); + } + }, "clusterStopper").start(); + + if (nodeFailedMsg.internalOrder() != badNodeNum + 1) + throw new RuntimeException("Wrong order of failed node: " + nodeFailedMsg.internalOrder()); + } + } + }; + } + + /** */ + private void storeResults(PrintWriter printWriter, List delays) { + printWriter.println("--------------------------------------------------------------"); + + printWriter.println("failureDetectionTimeout = " + failureDetectionTimeout + + ", CON_CHECK_INTERVAL = " + ServerImpl.CON_CHECK_INTERVAL + + ", previousNodeConnectionCheckDelay = " + SIMULATE_CHECK_CONNECTION_DELAY + ); + + printWriter.println("Average detection delay of failed node: " + + (long)delays.stream().mapToLong(v -> v).average().getAsDouble() + ); + + printWriter.println("Maximum detection delay of failed node: " + + delays.stream().mapToLong(v -> v).max().getAsLong() + ); + + printWriter.println("--------------------------------------------------------------"); + + printWriter.println(); + + printWriter.flush(); + } + + /** */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setSystemWorkerBlockedTimeout(20_000); + + cfg.setDiscoverySpi(new TestDiscovery()); + + TestDiscovery disco = (TestDiscovery)cfg.getDiscoverySpi(); + + disco.setJoinTimeout(20_000); + + disco.setIpFinder(LOCAL_IP_FINDER); + + cfg.setFailureDetectionTimeout(failureDetectionTimeout); + + // Prevent other discovery messages. + cfg.setMetricsUpdateFrequency(24 * 60 * 60_000); + + cfg.setClientFailureDetectionTimeout( cfg.getMetricsUpdateFrequency() ); + + return cfg; + } + + /** */ + @Override protected long getTestTimeout() { + return 1_000_000_000; + } + + /** + * Launches cache operation. Emulates some load on the cluster. + * + * @param node - operation node. + * @param puttersCnt - putter number (threads). + * @param delay delay after each put operation. + * @param cacheName Name of the cache to use. + */ + private void startCachePutter(IgniteEx node, int puttersCnt, long delay, String cacheName) { + putterActive = true; + + CountDownLatch cntDownLatch = new CountDownLatch(puttersCnt); + + for (int i = 0; i < puttersCnt; ++i) { + Thread putter = new Thread(() -> { + cntDownLatch.countDown(); + + while (putterActive) { + IgniteCache cache = node.cache(cacheName); + + cache.put(ThreadLocalRandom.current().nextInt(100000), ThreadLocalRandom.current().nextInt(100000)); + + if(delay>0) { + try { + Thread.sleep(delay); + } + catch (InterruptedException e) { + // No-op; + } + } + } + }, "putter_" + i); + + putter.setDaemon(true); + + putter.start(); + } + } + + /** */ + private void stopPutter(){ + putterActive = false; + } + + /** */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(true); + } + + /** */ + private static final class TestDiscovery extends TcpDiscoverySpi { + /** */ + public volatile Consumer writeMessageWatcher; + + /** */ + public volatile Consumer readMessageWatcher; + + /** */ + public volatile Consumer sentMessageWatcher; + + /** */ + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + super.writeToSocket(sock, out, msg, timeout); + + Consumer writeMessageWatcher = this.writeMessageWatcher; + + if (writeMessageWatcher != null) + writeMessageWatcher.accept(msg); + } + + /** */ + @Override protected void onMessageReceived(TcpDiscoveryAbstractMessage msg){ + super.onMessageReceived(msg); + + Consumer readMessageWatcher = this.readMessageWatcher; + + if (readMessageWatcher != null && msg != null) + readMessageWatcher.accept(msg); + } + + /** */ + @Override protected T readMessage(Socket sock, @Nullable InputStream in, + long timeout) throws IOException, IgniteCheckedException { + TcpDiscoveryAbstractMessage msg = super.readMessage(sock, in, timeout); + + onMessageReceived(msg); + + return (T)msg; + } + + @Override protected void onMessageSent(TcpDiscoveryAbstractMessage msg) { + super.onMessageSent(msg); + + Consumer sentMessageWatcher = this.sentMessageWatcher; + + if (sentMessageWatcher != null) + sentMessageWatcher.accept(msg); + } + } +}