diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 0fe2881..c1ecd50 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2101,6 +2101,24 @@ class ServerImpl extends TcpDiscoveryImpl { initConnectionCheckFrequency(); } + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + try { + super.body(); + } + catch (Throwable e) { + if (!spi.isNodeStopping0() && spi.ignite() != null) { + log.error("TcpDiscoverSpi's message worker thread failed abnormally. Stopping the grid in order " + + "to prevent cluster wide instability.", e); + + spi.ignite().close(); + } + + // Must be processed by IgniteSpiThread as well. + throw e; + } + } + /** * Initializes connection check frequency. Used only when failure detection timeout is enabled. */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 51d8a2d..874ae41 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -65,7 +65,6 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessa import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; @@ -355,6 +354,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private void testFailureDetectionOnNodePing(Ignite pingingNode, Ignite failedNode) throws Exception { final CountDownLatch cnt = new CountDownLatch(1); + final UUID failedNodeId = failedNode.cluster().localNode().id(); + pingingNode.events().localListen( new IgnitePredicate() { @Override public boolean apply(Event evt) { @@ -372,9 +373,9 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { TcpDiscoverySpi spi = discoMap.get(pingingNode.name()); - boolean res = spi.pingNode(failedNode.cluster().localNode().id()); + boolean res = spi.pingNode(failedNodeId); - assertFalse("Ping is ok for node " + failedNode.cluster().localNode().id() + ", but had to fail.", res); + assertFalse("Ping is ok for node " + failedNodeId + ", but had to fail.", res); // Heartbeat interval is 40 seconds, but we should detect node failure faster. assert cnt.await(7, SECONDS); @@ -391,6 +392,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).ignorePingResponse = true; + final UUID failedNodeId = failedNode.cluster().localNode().id(); + final CountDownLatch pingLatch = new CountDownLatch(1); final CountDownLatch eventLatch = new CountDownLatch(1); @@ -404,7 +407,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { pingingNode.events().localListen( new IgnitePredicate() { @Override public boolean apply(Event event) { - if (((DiscoveryEvent)event).eventNode().id().equals(failedNode.cluster().localNode().id())) { + if (((DiscoveryEvent)event).eventNode().id().equals(failedNodeId)) { failRes.set(true); eventLatch.countDown(); } @@ -420,7 +423,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { pingLatch.countDown(); pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode( - failedNode.cluster().localNode().id())); + failedNodeId)); return null; } @@ -1148,7 +1151,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { for (IgniteKernal grid : grids) assertEquals(startTime, (Long)grid.context().discovery().gridStartTime()); - grids.add((IgniteKernal) startGrid(5)); + grids.add((IgniteKernal)startGrid(5)); for (IgniteKernal grid : grids) assertEquals(startTime, (Long)grid.context().discovery().gridStartTime()); @@ -1308,6 +1311,51 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed + */ + public void testNodeShutdownOnRingMessageWorkerFailure() throws Exception { + try { + TestMessageWorkerFailureSpi spi0 = new TestMessageWorkerFailureSpi(); + + nodeSpi = spi0; + + final Ignite ignite0 = startGrid(0); + + nodeSpi = new TcpDiscoverySpi(); + + Ignite ignite1 = startGrid(1); + + final AtomicBoolean disconnected = new AtomicBoolean(); + + final CountDownLatch latch = new CountDownLatch(1); + + final UUID failedNodeId = ignite0.cluster().localNode().id(); + + ignite1.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event event) { + if (event.type() == EventType.EVT_NODE_FAILED && + failedNodeId.equals(((DiscoveryEvent)event).eventNode().id())) + disconnected.set(true); + + latch.countDown(); + + return false; + } + }, EventType.EVT_NODE_FAILED); + + spi0.stop = true; + + latch.await(15, TimeUnit.SECONDS); + + assertTrue(disconnected.get()); + } + finally { + stopAllGrids(); + } + } + + + /** * @param twoNodes If {@code true} starts two nodes, otherwise three. * @throws Exception If failed */ @@ -1458,6 +1506,25 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** + * + */ + private static class TestMessageWorkerFailureSpi extends TcpDiscoverySpi { + /** */ + private volatile boolean stop; + + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + + if (stop) + throw new RuntimeException("Failing ring message worker explicitly"); + + super.writeToSocket(sock, msg, bout, timeout); + } + } + + /** * Starts new grid with given index. Method optimize is not invoked. * * @param idx Index of the grid to start.