Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java (date 1590495220000) +++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java (date 1590496523000) @@ -218,6 +218,18 @@ /** TCP server for discovery SPI. */ private TcpServer tcpSrvr; + /** + * Simulates delay of node like GC pauses or unknown network problems. Slows down message reading and sending on + * established connections. + */ + private volatile long simulatedDelay; + + /** + * Simulates delay of backward connectino checking of previous node. + * @see SocketReader#isConnectionRefused(SocketAddress) + */ + private volatile long prevNodeConnectionCheckDelay; + /** Message worker. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private RingMessageWorker msgWorker; @@ -1453,6 +1465,8 @@ int receipt = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + spi.onMessageSent(msg); + spi.stats.onMessageSent(msg, U.nanosToMillis(tsNanos0 - tsNanos)); return receipt; @@ -1821,6 +1835,37 @@ msgWorker.addMessage(msg); } + /** + * For testing purposes only. + * Begins simulating delays on reading/writting on established connections. + */ + void simulateDelay(long delay){ + simulatedDelay = delay; + } + + /** + * For testing purposes only. + * Sets delay and result of backwardConnectionChecking. + * @see SocketReader#isConnectionRefused + */ + void simulateCheckPrevNodeConnectionFailed(long delay) { + prevNodeConnectionCheckDelay = delay; + } + + /** */ + private void simulateDelay(){ + long simulatedDelay = this.simulatedDelay; + + if (simulatedDelay > 0) { + try { + Thread.sleep(simulatedDelay); + } + catch (InterruptedException e) { + // No-op. + } + } + } + /** {@inheritDoc} */ @Override void simulateNodeFailure() { U.warn(log, "Simulating node failure: " + getLocalNodeId()); @@ -3346,6 +3391,8 @@ IgniteSpiOperationTimeoutHelper timeoutHelper = null; while (true) { + simulateDelay(); + if (sock == null) { if (timeoutHelper == null) timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); @@ -3530,9 +3577,6 @@ } try { - // Simulating delay of messages sending. - simulateDelay(); - boolean failure; synchronized (mux) { @@ -3577,6 +3621,8 @@ int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + spi.onMessageSent(pendingMsg); + spi.stats.onMessageSent(pendingMsg, U.nanosToMillis(tsNanos0 - tsNanos)); if (log.isDebugEnabled()) @@ -3624,6 +3670,8 @@ int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + spi.onMessageSent(msg); + if (latencyCheck && log.isInfoEnabled()) log.info("Latency check message has been acked: " + msg.id()); @@ -6509,6 +6557,8 @@ TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout); + simulateDelay(); + // Ping. if (msg instanceof TcpDiscoveryPingRequest) { if (!spi.isNodeStopping0()) { @@ -6748,6 +6798,8 @@ TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration())); + simulateDelay(); + msg.senderNodeId(nodeId); DebugLogger debugLog = messageLogger(msg); @@ -7069,6 +7121,19 @@ * @return {@code True} if got connection refused on connect try. */ private boolean isConnectionRefused(SocketAddress addr) { + long prevNodeConnectionCheckDelay = ServerImpl.this.prevNodeConnectionCheckDelay; + + if (prevNodeConnectionCheckDelay != 0) { + try { + Thread.sleep(Math.abs(prevNodeConnectionCheckDelay)); + } + catch (InterruptedException e) { + // No-op. + } + + return prevNodeConnectionCheckDelay > 0; + } + try (Socket sock = new Socket()) { sock.connect(addr, 100); } Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java (date 1590495220000) +++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java (date 1590496523000) @@ -2017,6 +2017,11 @@ return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null; } + /** */ + protected void onMessageSent(TcpDiscoveryAbstractMessage msg){ + + } + /** * @param msg Failed message. * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise. Index: modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/FailureDetectionResearch.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/FailureDetectionResearch.java (date 1590496523000) +++ modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/FailureDetectionResearch.java (date 1590496523000) @@ -0,0 +1,305 @@ +package org.apache.ignite.spi.discovery.tcp; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Exchanger; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.internal.util.IgniteUtils.defaultWorkDirectory; + +/** + * Measures delays of node failure detection and writes it in {@link #resultsOutputPath()}. Default: working dir. + */ +public class FailureDetectionResearch extends GridCommonAbstractTest { + /** + * Total grids to start. The more the longer the tests are. The test check crash of every node from 1 to GRID_CNT - 1. + */ + private static final int GRID_CNT = 4; + + /** + * Cycles number to run test with one settings set. Test measures maximal and average values. + * The more cycles the longer ttests are but issues more accurate values. + */ + private static final int CYCLES_PER_SETTINGS = 5; + + /** + * Sets delay on message processing of 'bad' node simulating its failure with no + * network errors. This emulates the worst case when failed node is not seen by socket exceptions like + * 'Connection refused' or 'Connection reset'. The value is somewhat bigger that the timeouts. + */ + private static final int DELAY_OF_FAILED_NODE = 2_000; + + /** + * Emulates delay and result of bacward connection checking in {@link ServerImpl#SocketReader#isConnectionRefused} + * used to ensure if previous node in the ring is alive. Duration and result of this checking can significantly + * affect duration of node failure detection. Long freezes with IOException/timeout can duplicate cycle of node ping + * with failureDetectionTimeout intervals. + * + * Positive value forces to wait and simulates 'connection refused'. This indicates failed connection/node with + * given delay. It is relatively good case depending of the value. You can set 1 to indicate failed connection + * immediatelly. + * + * Negative value gives worst case. This is same delay as positive value but emulates 'connection timeout'. + * Currectly, сonnection timeout is not considered as lost connection. This often causes rechecking status of + * the failed node with additional delays. + */ + private static final long CHECK_CONNECTION_DELAY = -100; + + /** + * If {@code false}, each parameter set will be applied chashing every node starting from second in every cycle up + * to {@link #CYCLES_PER_SETTINGS}. + * If {@link true}, each parameter set will be applied only for one randomly choosen node in every cycle up to + * {@link #CYCLES_PER_SETTINGS}. + * + * If {@code false}, the tests take more time but gives more accuracy. If {@code true}, test run faster with cost of + * lesse accuracy. + */ + private static final boolean RANDOM_NODE = true; + + /** */ + private volatile long failureDetectionTimeout; + + /** */ + @Test + public void testFailureDetectionTimeout() throws Exception { + //The print writer is test results output. Default: work directory. + try (PrintWriter printWriter = new PrintWriter(resultsOutputPath())) { + for (long failureDetectionTimeout = 300; failureDetectionTimeout <= 500; failureDetectionTimeout += 100) { + this.failureDetectionTimeout = failureDetectionTimeout; + + //Here we store delays of node failure detection. + final List delays = new ArrayList<>(CYCLES_PER_SETTINGS * GRID_CNT); + + int startNode = RANDOM_NODE ? new Random().nextInt(GRID_CNT) : 0; + int endNode = RANDOM_NODE ? startNode + 1 : GRID_CNT; + + for (int i = 0; i < CYCLES_PER_SETTINGS; ++i) { + for (int failingNodeIdx = startNode; failingNodeIdx < endNode; ++failingNodeIdx) { + log.info("Cycle " + i + ", badNodeIdx = " + failingNodeIdx + ", failureDetectionTimeout = " + + failureDetectionTimeout); + + launchTest(delays, failingNodeIdx); + } + } + + storeResults(printWriter, delays); + } + } + } + + /** */ + private String resultsOutputPath() throws Exception { + return defaultWorkDirectory() + '/' + getClass().getSimpleName() + ".txt"; + } + + /** + * @param delays Where to store measured delays of node falure detection. + * @param failedNodeIdx Which node to simulate failure on. + */ + private void launchTest(List delays, int failedNodeIdx) throws Exception { + Exchanger sem = new Exchanger<>(); + + AtomicLong timer = new AtomicLong(); + + startGrids(GRID_CNT); + + // Let the cluster breathe. + Thread.sleep(2000); + + registerFailNodeMessageListener(delays, timer, failedNodeIdx, sem); + + failNode(failedNodeIdx, timer); + + //Wait untill failed noded is detected. + sem.exchange(Object.class); + + stopAllGrids(true); + } + + /** + * Simulates unacceptable delays in the target node. + * + * @param nodeIdx idx of the node to simulate its failure or kill its process. + * @param timer To store failure time. + */ + private void failNode(int nodeIdx, AtomicLong timer) { + IgniteEx node = grid(nodeIdx); + + int prevNodeIdx = nodeIdx == 0 ? GRID_CNT - 1 : nodeIdx - 1; + + TestDiscovery prevDiscovery = (TestDiscovery)grid(prevNodeIdx).context().discovery().getInjectedDiscoverySpi(); + + log.info("Will simulate failure of node with idx " + nodeIdx + " once previous node " + prevNodeIdx + + " sends TcpDiscoveryConnectionCheckMessage."); + + //Fail node in the worst case: right after the ping message is sucessufuly sent. + prevDiscovery.sentMsgWatcher = new Consumer() { + /** */ + private final AtomicBoolean detected = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override public void accept(TcpDiscoveryAbstractMessage msg) { + if (msg instanceof TcpDiscoveryConnectionCheckMessage && detected.compareAndSet(false, true)) { + int nextNodeIdx = nodeIdx == GRID_CNT - 1 ? 0 : nodeIdx + 1; + + // Next to failed node. It does backward checking of previous node aliveness. + // See ServerImpl.SocketReader.isConnectionRefused(). + TcpDiscoverySpi nextDisco = (TcpDiscoverySpi)grid(nextNodeIdx).configuration().getDiscoverySpi(); + + ((ServerImpl)nextDisco.impl).simulateCheckPrevNodeConnectionFailed(CHECK_CONNECTION_DELAY); + + // Fail selected node. + TcpDiscoverySpi discoToFail = (TcpDiscoverySpi)node.context().discovery().getInjectedDiscoverySpi(); + + ((ServerImpl)discoToFail.impl).simulateDelay(DELAY_OF_FAILED_NODE); + + timer.set(System.currentTimeMillis()); + + log.info("Simulated failure of node with idx " + nodeIdx + "."); + } + } + }; + } + + /** + * Registers listener of {@code TcpDiscoveryNodeFailedMessage}. + * @param delays There to stome delay since {@code timer}. + * @param badNodeNum number of the node failed in the test. + * @param sem {@code True} if test was started with loading. + */ + private void registerFailNodeMessageListener(List delays, AtomicLong timer, int badNodeNum, + Exchanger sem) { + + int detectingNodeIdx = badNodeNum - 1; + + if (detectingNodeIdx < 0) + detectingNodeIdx = GRID_CNT - 1; + + TestDiscovery testDiscovery = (TestDiscovery)grid(detectingNodeIdx).context().discovery().getInjectedDiscoverySpi(); + + testDiscovery.writeMsgWatcher = new Consumer() { + /** */ + private final AtomicBoolean detected = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override public void accept(TcpDiscoveryAbstractMessage msg) { + if (msg instanceof TcpDiscoveryNodeFailedMessage && detected.compareAndSet(false, true)) { + TcpDiscoveryNodeFailedMessage nodeFailedMsg = (TcpDiscoveryNodeFailedMessage)msg; + + delays.add(System.currentTimeMillis() - timer.get()); + + log.info("Failed node detection delay: " + delays.get(delays.size() - 1)); + + // Stop the cluster with a delay in separate thread. + new Thread(() -> { + try { + log.info("Releasing the semaphore to begin new test cycle."); + + //Release the test cycle, begin new. + sem.exchange(Object.class); + } + catch (InterruptedException ex) { + ex.printStackTrace(); + } + }, "clusterStopper").start(); + + if (nodeFailedMsg.internalOrder() != badNodeNum + 1) + throw new RuntimeException("Wrong order of failed node: " + nodeFailedMsg.internalOrder()); + } + } + }; + } + + /** */ + private void storeResults(PrintWriter printWriter, List delays) { + printWriter.println("IgniteConfiguration.failureDetectionTimeout: " + failureDetectionTimeout + "ms"); + + printWriter.println("Maximum detection delay of node failure: " + + delays.stream().mapToLong(v -> v).max().getAsLong() + "ms" + ); + + printWriter.println(); + + printWriter.flush(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setSystemWorkerBlockedTimeout(20_000); + + cfg.setDiscoverySpi(new TestDiscovery()); + + TestDiscovery disco = (TestDiscovery)cfg.getDiscoverySpi(); + + disco.setJoinTimeout(20_000); + + disco.setIpFinder(LOCAL_IP_FINDER); + + cfg.setFailureDetectionTimeout(failureDetectionTimeout); + + cfg.setMetricsUpdateFrequency(60 * 60 * 1000); + + cfg.setClientFailureDetectionTimeout( cfg.getMetricsUpdateFrequency() ); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 30 * 60 * 1000; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(true); + } + + /** */ + private static final class TestDiscovery extends TcpDiscoverySpi { + /** */ + public volatile Consumer writeMsgWatcher; + + /** */ + public volatile Consumer sentMsgWatcher; + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + super.writeToSocket(sock, out, msg, timeout); + + Consumer writeMsgWatcher = this.writeMsgWatcher; + + if (writeMsgWatcher != null) + writeMsgWatcher.accept(msg); + } + + /** {@inheritDoc} */ + @Override protected void onMessageSent(TcpDiscoveryAbstractMessage msg) { + super.onMessageSent(msg); + + Consumer sentMsgWatcher = this.sentMsgWatcher; + + if (sentMsgWatcher != null) + sentMsgWatcher.accept(msg); + } + } +}