Index: modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/disco/JmhNodeFailureDetection.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/disco/JmhNodeFailureDetection.java (date 1592824452000) +++ modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/disco/JmhNodeFailureDetection.java (date 1592824452000) @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmarks.jmh.disco; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark; +import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +/** * JMH Node failure detection timeout runner. */ +@State(Scope.Benchmark) +public class JmhNodeFailureDetection extends JmhAbstractBenchmark { + /** Enabled or disabled load on cluster with cache puts. */ + private static final boolean WITH_LOADING = true; + + /** */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int ITERATIONS = 20; + + /** Node which detects failure. */ + private Ignite detectingNode; + + /** Node which gets failure simulation. */ + private Ignite illNode; + + /** */ + private volatile long beginTime; + + /** */ + private volatile long endTime; + + /** */ + private TcpDiscoverySpi specialSpi; + + /** */ + private final Object mux = new Object(); + + /** */ + private final List results = new ArrayList<>(); + + /** */ + @TearDown + public void stop() { + Ignition.allGrids().forEach(this::silentStop); + } + + /** {@inheritDoc} */ + @Setup + public void setup() throws InterruptedException { + Ignition.start(configuration("grid0")); + + // SPI of node 1 detects the failure. + specialSpi = new TcpDiscoverySpi() { + /** + * {@inheritDoc} + */ + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + // Stop the test once the failure detected. + if (msg instanceof TcpDiscoveryNodeFailedMessage) { + synchronized (mux) { + if (endTime == 0 && beginTime > 0) { + endTime = System.nanoTime(); + + mux.notifyAll(); + } + } + } + + super.writeToSocket(sock, out, msg, timeout); + } + }; + + detectingNode = Ignition.start(configuration("grid1")); + + if(WITH_LOADING) + startLoad(detectingNode); + + for (int i = 0; i < ITERATIONS; ++i) { + beginTime = endTime = 0; + + // SPI of node 1 simulates node failure. + specialSpi = illTcpDiscoSpi(detectingNode.configuration().getFailureDetectionTimeout() * 2); + + illNode = Ignition.start(configuration("grid2")); + + // Randomizes time of node failure. + Thread.sleep(new Random().nextInt(2000)); + + // Next step: can launch simulation. + synchronized (mux) { + beginTime = System.nanoTime(); + + mux.wait(5000); + + if (endTime == 0) + throw new IllegalStateException("Failure not detected."); + } + + long delay = U.nanosToMillis(endTime - beginTime); + + beginTime = 0; + + System.err.println("Detection delay: " + delay + ". Failure detection timeout: " + + detectingNode.configuration().getFailureDetectionTimeout()); + + results.add(delay); + + silentStop(illNode); + } + + System.out.println("Total detection delay: " + results.stream().mapToLong(v -> v).sum()); + + stop(); + } + + /** */ + private void startLoad(Ignite node) { + CacheConfiguration cacheConfiguration = new CacheConfiguration<>("loadCache"); + cacheConfiguration.setCacheMode(CacheMode.REPLICATED); + + IgniteCache cache = node.getOrCreateCache(cacheConfiguration); + + Stream.generate(()->{ + return new Thread(()->{ + Random rnd = new Random(); + + System.out.println("Start loading."); + + while (true) { + try { + cache.put(rnd.nextInt(10_000), rnd.nextInt(10_000)); + } catch (Exception e){ + break; + } + +// System.out.println("Put..."); + + try { + Thread.sleep(5); + } + catch (InterruptedException ignored) { + // No-op. + } + } + + System.out.println("Stop loading."); + }); + }).limit(6).forEach(Thread::start); + } + + /** */ + @Benchmark + public void measureTotalForTheOutput() throws InterruptedException { + Thread.sleep(results.stream().mapToLong(v -> v).sum()); + } + + /** */ + private TcpDiscoverySpi illTcpDiscoSpi(long simulatedDelay) { + return new TcpDiscoverySpi() { + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + simulateUnacceptableDelay(); + + super.writeToSocket(sock, out, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, + long timeout) throws IOException { + simulateUnacceptableDelay(); + + super.writeToSocket(msg, sock, res, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, + long timeout) throws IOException { + simulateUnacceptableDelay(); + + super.writeToSocket(sock, msg, data, timeout); + } + + /** Simulates node delay like GC or unknown network issues. */ + private void simulateUnacceptableDelay() { + if (beginTime > 0) { + try { + Thread.sleep(simulatedDelay); + } + catch (InterruptedException ignored) { + // No-op. + } + } + } + }; + } + + /** + * Run benchmark. + * + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + JmhIdeBenchmarkRunner.create() + .threads(1) + .warmupIterations(0).measurementIterations(1) + .benchmarks(JmhNodeFailureDetection.class.getSimpleName()) + .jvmArguments("-Xms512m", "-Xmx512m") + .outputTimeUnit(TimeUnit.MINUTES) + .run(); + } + + /** */ + protected IgniteConfiguration configuration(String igniteInstanceName) { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setIgniteInstanceName(igniteInstanceName); + + cfg.setFailureDetectionTimeout(300); + + // Disable other message traffic. + cfg.setMetricsUpdateFrequency(5_000); + + // Avoid useless warning. We do really block the threads. + cfg.setSystemWorkerBlockedTimeout(10_000); + + TcpDiscoverySpi discoSpi = specialSpi == null ? new TcpDiscoverySpi() : specialSpi; + discoSpi.setIpFinder(IP_FINDER); + discoSpi.setLocalAddress("127.0.0.1"); + // We won't try recovering connection. We'll remove node from the grid asap. + discoSpi.setConnectionRecoveryTimeout(0); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** */ + private void silentStop(Ignite node) { + try { + Ignition.stop(node.name(), true); + } + catch (Exception ignored) { + // No-op; + } + } +} Index: modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java (date 1592231937000) +++ modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java (date 1592824452000) @@ -22,16 +22,23 @@ import java.net.Socket; import java.net.SocketTimeoutException; import java.util.Map; +import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -88,11 +95,140 @@ /** */ private int connectionRecoveryTimeout = -1; + /** */ + private int failureDetectionTimeout = 2_000; + + /** */ + private int metricsUpdateFreq = 1_000; + + /** */ + private Long sysWorkerBlockedTimeout; + /** {@inheritDoc} */ @Override protected void afterTest() { stopAllGrids(); } + /** Checks node failure is detected within failure detection timeout. */ + @Test + public void testNodeFailureDetectedWithinConfiguredTimeout() throws Exception { + // We won't try recovering connection. We'll remove node from the grid asap. + connectionRecoveryTimeout = 0; + + // Makes test faster. Also the value is closer to previous fixed ping rate 500ms. + failureDetectionTimeout = 700; + + // A message traffic. + metricsUpdateFreq = 400; + + // Avoid useless warnings. We do block threads specially. + sysWorkerBlockedTimeout = 5000L; + + // Running several times to be sure. Let's keep it within 1min. + for (int i = 0; i < 7; ++i) { + // Holder of falure detection delay. Also is test start and end regulator. + final AtomicLong timer = new AtomicLong(); + + // SPI of node 0 detects the failure. + specialSpi = new TcpDiscoverySpi() { + /** */ + private AtomicBoolean detected = new AtomicBoolean(); + + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + + if (msg instanceof TcpDiscoveryNodeFailedMessage && detected.compareAndSet(false, true)) { + synchronized (timer) { + timer.set(System.nanoTime() - timer.get()); + + // Failure detected. Stop the test. + timer.notifyAll(); + } + } + + super.writeToSocket(sock, out, msg, timeout); + } + }; + + IgniteEx grid0 = startGrid(0); + + assert ((IgniteSpiAdapter)grid0.configuration().getDiscoverySpi()).failureDetectionTimeoutEnabled() : + "Failure detection timeout is not active."; + + long nodeDelay = failureDetectionTimeout * 2; + + // SPI of node 1 simulates node failure. + specialSpi = new TcpDiscoverySpi() { + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + simulateUnacceptableDelay(); + + super.writeToSocket(sock, out, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, + long timeout) throws IOException { + simulateUnacceptableDelay(); + + super.writeToSocket(msg, sock, res, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, + long timeout) throws IOException { + simulateUnacceptableDelay(); + + super.writeToSocket(sock, msg, data, timeout); + } + + /** Simulates node delay like GC or unknown network issues. */ + private void simulateUnacceptableDelay() { + if (timer.get() > 0) { + try { + Thread.sleep(nodeDelay); + } + catch (InterruptedException ignored) { + // No-op. + } + } + } + }; + + startGrid(1); + + specialSpi = null; + + // Other node to send TcpDiscoveryNodeFailedMessage to. + startGrid(2); + + // Wait for exchanging various frequent messages like TcpDiscoveryCustomEventMessage. + awaitPartitionMapExchange(); + + // Randimizes failure time since cluster start. + Thread.sleep(new Random().nextInt(500)); + + synchronized (timer) { + // Failure simulated. + timer.set(System.nanoTime()); + + // Wait until failure is detected. + timer.wait(getTestTimeout()); + } + + long failureDetectionDelay = U.nanosToMillis(timer.get()); + + stopAllGrids(true); + + // Previous delay is up to 'failure detection timeout + 500ms'. Where 500ms is fixed ping rate. + // To avoid flaky test, we give anoter 100ms to work with GC pauses, platform delays and the timer + // granulation in IgniteUtils.currentTimeMillis(). + assertTrue("Long failure detection delay: " + failureDetectionDelay, + failureDetectionDelay <= failureDetectionTimeout + 100); + } + } + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -107,7 +243,14 @@ if (connectionRecoveryTimeout >= 0) spi.setConnectionRecoveryTimeout(connectionRecoveryTimeout); - cfg.setFailureDetectionTimeout(2_000); + cfg.setFailureDetectionTimeout(failureDetectionTimeout); + + cfg.setMetricsUpdateFrequency(metricsUpdateFreq); + + if (sysWorkerBlockedTimeout != null) + cfg.setSystemWorkerBlockedTimeout(sysWorkerBlockedTimeout); + + cfg.setDiscoverySpi(spi); cfg.setDiscoverySpi(spi);