Index: modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/disco/JmhNodeFailureDetection.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/disco/JmhNodeFailureDetection.java (date 1592409874000) +++ modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/disco/JmhNodeFailureDetection.java (date 1592409874000) @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmarks.jmh.disco; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark; +import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +/** * JMH Node failure detection timeout runner. */ +@State(Scope.Benchmark) +public class JmhNodeFailureDetection extends JmhAbstractBenchmark { + /** Enabled or disabled load on cluster with cache puts. */ + private static final boolean WITH_LOADING = true; + + /** */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int ITERATIONS = 20; + + /** Node which gets failure simulation. */ + private Ignite illNode; + + /** */ + private volatile long beginTime; + + /** */ + private long endTime; + + /** */ + private final List results = new ArrayList<>(); + + /** */ + private TcpDiscoverySpi specialSpi; + + /** */ + private final Object mux = new Object(); + + /** */ + @TearDown + public void stop() { + Ignition.allGrids().forEach(this::silentStop); + } + + /** + * {@inheritDoc} + */ + @Setup + public void setup() throws Exception { + Ignition.start(configuration("grid0")); + + Ignite node1 = Ignition.start(configuration("grid1")); + + if(WITH_LOADING) + startLoad(node1); + + for (int i = 0; i < ITERATIONS; ++i) { + beginTime = endTime = 0; + + // SPI of node 1 simulates node failure. + specialSpi = illTcpDiscoSpi(); + + illNode = Ignition.start(configuration("grid2")); + + illNode.events().localListen((e) -> { + synchronized (mux) { + if (beginTime > 0 && endTime == 0) { + if (e.node().isLocal()) + endTime = System.nanoTime(); + else { + endTime = -1; + + System.err.println("Wrong node failed: " + e.node().order() + ". Expected: " + + illNode.cluster().localNode().order()); + } + + mux.notifyAll(); + } + } + + return false; + }, EventType.EVT_NODE_SEGMENTED, EventType.EVT_NODE_FAILED); + + specialSpi = null; + + // Randomizes node failure. + Thread.sleep(new Random().nextInt(2000)); + + synchronized (mux) { + beginTime = System.nanoTime(); + + mux.wait(10_000); + + if (endTime <= 0) + throw new IllegalStateException("Wrong node failed or not failed at all."); + else { + long delay = U.nanosToMillis(endTime - beginTime); + + beginTime = 0; + + System.err.println("Detection delay: " + delay + ". Failure detection timeout: " + + illNode.configuration().getFailureDetectionTimeout() + ", connection recovery timeout: " + + ((TcpDiscoverySpi)illNode.configuration().getDiscoverySpi()).getConnectionRecoveryTimeout()); + + results.add(delay); + } + } + + silentStop(illNode); + } + + System.out.println("Total detection delay: " + results.stream().mapToLong(v->v).sum()); + + stop(); + } + + /** */ + @Benchmark + public void measureTotalForTheOutput() throws InterruptedException { + Thread.sleep(results.stream().mapToLong(v->v).sum()); + } + + /** */ + private TcpDiscoverySpi illTcpDiscoSpi() { + return new TcpDiscoverySpi() { + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, OutputStream out, + TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + simulateSocketTimeout(timeout); + + super.writeToSocket(sock, out, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, + long timeout) throws IOException { + simulateSocketTimeout(timeout); + + super.writeToSocket(msg, sock, res, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, + long timeout) throws IOException { + simulateSocketTimeout(timeout); + + super.writeToSocket(sock, msg, data, timeout); + } + + /** */ + private void simulateSocketTimeout(long timeout) throws SocketTimeoutException { + if (beginTime > 0) { + try { + Thread.sleep(timeout); + + throw new SocketTimeoutException("Simulated timeout " + timeout + "ms."); + } + catch (InterruptedException ignored) { + // No-op. + } + } + } + }; + } + + /** + * Run benchmark. + * + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + JmhIdeBenchmarkRunner.create() + .threads(1) + .warmupIterations(0).measurementIterations(1) + .benchmarks(JmhNodeFailureDetection.class.getSimpleName()) + .jvmArguments("-Xms512m", "-Xmx512m") + .outputTimeUnit(TimeUnit.MINUTES) + .run(); + } + + /** */ + protected IgniteConfiguration configuration(String igniteInstanceName) { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setIgniteInstanceName(igniteInstanceName); + + cfg.setFailureDetectionTimeout(1000); + + // Disable other message traffic. + cfg.setMetricsUpdateFrequency(5_000); + + // Avoid useless warning. We do really block the threads. + cfg.setSystemWorkerBlockedTimeout(10_000); + + TcpDiscoverySpi discoSpi = specialSpi == null ? new TcpDiscoverySpi() : specialSpi; + discoSpi.setIpFinder(IP_FINDER); + discoSpi.setLocalAddress("127.0.0.1"); + + discoSpi.setConnectionRecoveryTimeout(500); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** */ + private void silentStop(Ignite ignite) { + try { + Ignition.stop(ignite.name(), true); + } + catch (Exception ignored) { + // No-op. + } + } + + /** */ + private void startLoad(Ignite node) { + CacheConfiguration cacheConfiguration = new CacheConfiguration<>("loadCache"); + cacheConfiguration.setCacheMode(CacheMode.REPLICATED); + + IgniteCache cache = node.getOrCreateCache(cacheConfiguration); + + Stream.generate(() -> { + return new Thread(() -> { + Random rnd = new Random(); + + System.out.println("Start loading."); + + while (true) { + try { + cache.put(rnd.nextInt(10_000), rnd.nextInt(10_000)); + } + catch (Exception e) { + break; + } + +// System.out.println("Put..."); + + try { + Thread.sleep(5); + } + catch (InterruptedException ignored) { + // No-op. + } + } + + System.out.println("Stop loading."); + }); + }).limit(6).forEach(Thread::start); + } +} Index: modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java (date 1592310728000) +++ modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java (date 1592409874000) @@ -23,15 +23,19 @@ import java.net.SocketTimeoutException; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -88,6 +92,12 @@ /** */ private int connectionRecoveryTimeout = -1; + /** */ + private int failureDetectionTimeout = 2_000; + + /** */ + private Long systemWorkerBlockedTimeout; + /** {@inheritDoc} */ @Override protected void afterTest() { stopAllGrids(); @@ -107,7 +117,10 @@ if (connectionRecoveryTimeout >= 0) spi.setConnectionRecoveryTimeout(connectionRecoveryTimeout); - cfg.setFailureDetectionTimeout(2_000); + cfg.setFailureDetectionTimeout(failureDetectionTimeout); + + if (systemWorkerBlockedTimeout != null) + cfg.setSystemWorkerBlockedTimeout(systemWorkerBlockedTimeout); cfg.setDiscoverySpi(spi); @@ -186,6 +199,138 @@ failedNodes.isEmpty()); } + /** + * Checks node get failed, segmented within failureDetectionTimeout + connectionRecoveryTimeout with some small + * timeouts. + */ + @Test + public void testConnectionRecoveryTimeoutSmallValues() throws Exception { + checkConnectionRecoveryTimeout(200, 300, 100); + } + + /** + * Checks node get failed, segmented within failureDetectionTimeout + connectionRecoveryTimeout with some medium + * timeouts. + */ + @Test + public void testConnectionRecoveryTimeoutMediumValues() throws Exception { + checkConnectionRecoveryTimeout(300, 500, 200); + } + + /** + * Checks node get failed, segmented within failureDetectionTimeout + connectionRecoveryTimeout with relatively + * long timeouts. + */ + @Test + public void testConnectionRecoveryTimeoutLongValues() throws Exception { + checkConnectionRecoveryTimeout(1000, 1000, 100); + } + + /** Checks node get failed, segmented within failureDetectionTimeout + connectionRecoveryTimeout. */ + void checkConnectionRecoveryTimeout(int minTimeout, int maxTimeout, int step) throws Exception { + // Avoid useless warnings. We do block threads specially. + systemWorkerBlockedTimeout = 5_000L; + + for (failureDetectionTimeout = minTimeout; failureDetectionTimeout <= maxTimeout; + failureDetectionTimeout += step) { + for (connectionRecoveryTimeout = minTimeout; connectionRecoveryTimeout <= maxTimeout; + connectionRecoveryTimeout += step) { + AtomicLong timer = new AtomicLong(); + + startGrids(2); + + specialSpi = new TcpDiscoverySpi() { + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, OutputStream out, + TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + simulateSocketTimeout(timeout); + + super.writeToSocket(sock, out, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, + long timeout) throws IOException { + simulateSocketTimeout(timeout); + + super.writeToSocket(msg, sock, res, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, + long timeout) throws IOException { + simulateSocketTimeout(timeout); + + super.writeToSocket(sock, msg, data, timeout); + } + + /** */ + private void simulateSocketTimeout(long timeout) throws SocketTimeoutException { + if (timer.get() <= 0) + return; + + try { + Thread.sleep(timeout); + + throw new SocketTimeoutException("Simulated timeout " + timeout + "ms."); + } + catch (InterruptedException e) { + // No-op. + } + } + }; + + IgniteEx failedGrid = startGrid(2); + + failedGrid.events().localListen((e) -> { + if (e.node().isLocal()) + timer.set(System.nanoTime() - timer.get()); + else { + log.error("Wrong node failed: " + e.node().order() + ". Expected: " + + failedGrid.localNode().order()); + + timer.set(-1); + } + + synchronized (timer) { + timer.notifyAll(); + } + + return false; + }, EventType.EVT_NODE_SEGMENTED, EventType.EVT_NODE_FAILED); + + specialSpi = null; + + startGrid(3); + + startGrid(4); + + synchronized (timer) { + timer.set(System.nanoTime()); + + timer.wait(getTestTimeout()); + } + + if (timer.get() < 0) + fail("Wrong node failed. See the log above."); + else { + timer.set(U.nanosToMillis(timer.get())); + + long expectedDelay = failureDetectionTimeout + connectionRecoveryTimeout; + + // Give additional 100ms on GC, timer granulation and platform delays. + assertTrue("Too long delay of connection recovery failure: " + timer.get() + ". Expected: " + + expectedDelay + ". failureDetectionTimeout: " + failureDetectionTimeout + + ", connectionRecoveryTimeout: " + connectionRecoveryTimeout, + timer.get() < expectedDelay + 100); + } + + stopAllGrids(true); + } + } + } + /** * @param ig Ignite instance to get failedNodes collection from. */