From 068b4e321fe20fea14a81ecd771b6a0aedcb472d Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Tue, 11 Aug 2015 12:36:44 +0300 Subject: [PATCH 1/6] ignite-1229: stop ping process when node left topology --- .../ignite/spi/discovery/tcp/ServerImpl.java | 101 +++++++++++++-------- .../tcp/internal/IgniteNodeLeftException.java | 40 ++++++++ 2 files changed, 103 insertions(+), 38 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 92c21ed..331b286 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -386,14 +386,14 @@ class ServerImpl extends TcpDiscoveryImpl { if (nodeId == getLocalNodeId()) return true; - TcpDiscoveryNode node = ring.node(nodeId); - - if (node == null || !node.visible()) + if (!nodeAlive(nodeId)) return false; + TcpDiscoveryNode node = ring.node(nodeId); + boolean res = pingNode(node); - if (!res && !node.isClient()) { + if (!res && !node.isClient() && nodeAlive(nodeId)) { LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId); msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id())); @@ -421,14 +421,14 @@ class ServerImpl extends TcpDiscoveryImpl { node = ring.node(node.clientRouterNodeId()); - if (node == null || !node.visible()) + if (!nodeAlive(node.id())) return false; } for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) { try { // ID returned by the node should be the same as ID of the parameter for ping to succeed. - IgniteBiTuple t = pingNode(addr, clientNodeId); + IgniteBiTuple t = pingNode(addr, node.id(), clientNodeId); boolean res = node.id().equals(t.get1()) && (clientNodeId == null || t.get2()); @@ -437,6 +437,14 @@ class ServerImpl extends TcpDiscoveryImpl { return res; } + catch (IgniteNodeLeftException e) { + if (log.isDebugEnabled()) + log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e); + + return false; + } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); @@ -453,12 +461,13 @@ class ServerImpl extends TcpDiscoveryImpl { * Pings the node by its address to see if it's alive. * * @param addr Address of the node. + * @param nodeId Node ID to ping. In case when client node ID is not null this node ID is an ID of the router node. * @param clientNodeId Client node ID. * @return ID of the remote node and "client exists" flag if node alive. * @throws IgniteCheckedException If an error occurs. */ - private IgniteBiTuple pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId) - throws IgniteCheckedException { + private IgniteBiTuple pingNode(InetSocketAddress addr, @Nullable UUID nodeId, + @Nullable UUID clientNodeId) throws IgniteCheckedException { assert addr != null; UUID locNodeId = getLocalNodeId(); @@ -537,6 +546,10 @@ class ServerImpl extends TcpDiscoveryImpl { return t; } catch (IOException | IgniteCheckedException e) { + if (nodeId != null && !nodeAlive(nodeId)) + throw new IgniteNodeLeftException("Failed to ping node (node already left or leaving" + + " the ring) [nodeId=" + nodeId + ", addr=" + addr +']', e); + if (errs == null) errs = new ArrayList<>(); @@ -615,6 +628,28 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Checks whether a node is alive or not. + * + * @param nodeId Node ID. + * @return {@code True} if node is in the ring and is not being removed from. + */ + private boolean nodeAlive(UUID nodeId) { + // Is node alive or about to be removed from the ring? + TcpDiscoveryNode node = ring.node(nodeId); + + boolean nodeAlive = node != null && node.visible(); + + if (nodeAlive) { + synchronized (mux) { + nodeAlive = !F.transform(failedNodes, F.node2id()).contains(nodeId) && + !F.transform(leavingNodes, F.node2id()).contains(nodeId); + } + } + + return nodeAlive; + } + + /** * Tries to join this node to topology. * * @throws IgniteSpiException If any error occurs. @@ -1387,15 +1422,17 @@ class ServerImpl extends TcpDiscoveryImpl { b.append("Leaving nodes: ").append(U.nl()); - for (TcpDiscoveryNode node : leavingNodes) - b.append(" ").append(node.id()).append(U.nl()); + synchronized (mux) { + for (TcpDiscoveryNode node : leavingNodes) + b.append(" ").append(node.id()).append(U.nl()); - b.append(U.nl()); + b.append(U.nl()); - b.append("Failed nodes: ").append(U.nl()); + b.append("Failed nodes: ").append(U.nl()); - for (TcpDiscoveryNode node : failedNodes) - b.append(" ").append(node.id()).append(U.nl()); + for (TcpDiscoveryNode node : failedNodes) + b.append(" ").append(node.id()).append(U.nl()); + } b.append(U.nl()); @@ -1520,7 +1557,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (res == null) { try { - res = pingNode(addr, null).get1() != null; + res = pingNode(addr, null, null).get1() != null; } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) @@ -3775,9 +3812,17 @@ class ServerImpl extends TcpDiscoveryImpl { else { int aliveCheck = clientNode.decrementAliveCheck(); - if (aliveCheck <= 0 && isLocalNodeCoordinator() && !failedNodes.contains(clientNode)) - processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, - clientNode.id(), clientNode.internalOrder())); + if (aliveCheck <= 0 && isLocalNodeCoordinator()) { + boolean failedNode; + + synchronized (mux) { + failedNode = failedNodes.contains(clientNode); + } + + if (!failedNode) + processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, + clientNode.id(), clientNode.internalOrder())); + } } } } @@ -4689,26 +4734,6 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * @param nodeId Node ID. - * @return {@code True} if node is in the ring and is not being removed from. - */ - private boolean nodeAlive(UUID nodeId) { - // Is node alive or about to be removed from the ring? - TcpDiscoveryNode node = ring.node(nodeId); - - boolean nodeAlive = node != null && node.visible(); - - if (nodeAlive) { - synchronized (mux) { - nodeAlive = !F.transform(failedNodes, F.node2id()).contains(nodeId) && - !F.transform(leavingNodes, F.node2id()).contains(nodeId); - } - } - - return nodeAlive; - } - - /** * @param msg Join request message. * @param clientMsgWrk Client message worker to start. * @return Whether connection was successful. diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java new file mode 100644 index 0000000..80bcc67 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.internal; + +import org.apache.ignite.*; + +import org.jetbrains.annotations.*; + +/** + * Thrown when there is an attempt to talk to the node that has already left the ring. + */ +public class IgniteNodeLeftException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates new exception with given error message and optional nested exception. + * + * @param msg Error message. + * @param cause Optional nested exception (can be {@code null}). + */ + public IgniteNodeLeftException(String msg, @Nullable Throwable cause) { + super(msg, cause); + } +} -- 1.9.5.msysgit.0 From a463d1dc6977b2f5514acbda49d090841182df71 Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Wed, 12 Aug 2015 10:53:16 +0300 Subject: [PATCH 2/6] ignite-946: reverted renaming topVer to avoid breaking compatibility --- .../ignite/spi/discovery/tcp/ServerImpl.java | 48 +++++++++++----------- .../tcp/internal/IgniteNodeLeftException.java | 40 ------------------ 2 files changed, 25 insertions(+), 63 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 331b286..dbfbc57 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -386,10 +386,13 @@ class ServerImpl extends TcpDiscoveryImpl { if (nodeId == getLocalNodeId()) return true; - if (!nodeAlive(nodeId)) + TcpDiscoveryNode node = ring.node(nodeId); + + if (node == null) return false; - TcpDiscoveryNode node = ring.node(nodeId); + if (!nodeAlive(nodeId)) + return false; boolean res = pingNode(node); @@ -430,6 +433,10 @@ class ServerImpl extends TcpDiscoveryImpl { // ID returned by the node should be the same as ID of the parameter for ping to succeed. IgniteBiTuple t = pingNode(addr, node.id(), clientNodeId); + if (t == null) + // Remote node left topology. + return false; + boolean res = node.id().equals(t.get1()) && (clientNodeId == null || t.get2()); if (res) @@ -437,14 +444,6 @@ class ServerImpl extends TcpDiscoveryImpl { return res; } - catch (IgniteNodeLeftException e) { - if (log.isDebugEnabled()) - log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); - - onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e); - - return false; - } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); @@ -463,10 +462,11 @@ class ServerImpl extends TcpDiscoveryImpl { * @param addr Address of the node. * @param nodeId Node ID to ping. In case when client node ID is not null this node ID is an ID of the router node. * @param clientNodeId Client node ID. - * @return ID of the remote node and "client exists" flag if node alive. + * @return ID of the remote node and "client exists" flag if node alive or {@code null} if the remote node has + * left a topology during the ping process. * @throws IgniteCheckedException If an error occurs. */ - private IgniteBiTuple pingNode(InetSocketAddress addr, @Nullable UUID nodeId, + private @Nullable IgniteBiTuple pingNode(InetSocketAddress addr, @Nullable UUID nodeId, @Nullable UUID clientNodeId) throws IgniteCheckedException { assert addr != null; @@ -546,9 +546,13 @@ class ServerImpl extends TcpDiscoveryImpl { return t; } catch (IOException | IgniteCheckedException e) { - if (nodeId != null && !nodeAlive(nodeId)) - throw new IgniteNodeLeftException("Failed to ping node (node already left or leaving" + - " the ring) [nodeId=" + nodeId + ", addr=" + addr +']', e); + if (nodeId != null && !nodeAlive(nodeId)) { + if (log.isDebugEnabled()) + log.debug("Failed to ping the node (has left or leaving topology): [nodeId=" + nodeId + + ']'); + + return null; + } if (errs == null) errs = new ArrayList<>(); @@ -1422,17 +1426,15 @@ class ServerImpl extends TcpDiscoveryImpl { b.append("Leaving nodes: ").append(U.nl()); - synchronized (mux) { - for (TcpDiscoveryNode node : leavingNodes) - b.append(" ").append(node.id()).append(U.nl()); + for (TcpDiscoveryNode node : leavingNodes) + b.append(" ").append(node.id()).append(U.nl()); - b.append(U.nl()); + b.append(U.nl()); - b.append("Failed nodes: ").append(U.nl()); + b.append("Failed nodes: ").append(U.nl()); - for (TcpDiscoveryNode node : failedNodes) - b.append(" ").append(node.id()).append(U.nl()); - } + for (TcpDiscoveryNode node : failedNodes) + b.append(" ").append(node.id()).append(U.nl()); b.append(U.nl()); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java deleted file mode 100644 index 80bcc67..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp.internal; - -import org.apache.ignite.*; - -import org.jetbrains.annotations.*; - -/** - * Thrown when there is an attempt to talk to the node that has already left the ring. - */ -public class IgniteNodeLeftException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Creates new exception with given error message and optional nested exception. - * - * @param msg Error message. - * @param cause Optional nested exception (can be {@code null}). - */ - public IgniteNodeLeftException(String msg, @Nullable Throwable cause) { - super(msg, cause); - } -} -- 1.9.5.msysgit.0 From ad54cae3c5918383d6ea978484105d9aab17445f Mon Sep 17 00:00:00 2001 From: sboikov Date: Wed, 12 Aug 2015 11:21:40 +0300 Subject: [PATCH 3/6] # ignite-1229 review --- .../src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index dbfbc57..76144e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -424,7 +424,7 @@ class ServerImpl extends TcpDiscoveryImpl { node = ring.node(node.clientRouterNodeId()); - if (!nodeAlive(node.id())) + if (node == null || !nodeAlive(node.id())) return false; } @@ -551,6 +551,8 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Failed to ping the node (has left or leaving topology): [nodeId=" + nodeId + ']'); + fut.onDone((IgniteBiTuple)null); + return null; } -- 1.9.5.msysgit.0 From aed83af5f76c47bc9e4d0e8f60955fc6c6b42aac Mon Sep 17 00:00:00 2001 From: sboikov Date: Thu, 13 Aug 2015 13:05:43 +0300 Subject: [PATCH 4/6] # Test for ignite-1245. --- .../IgniteCacheContinuousQueryClientTest.java | 114 +++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java new file mode 100644 index 0000000..bb413a0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.event.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testNodeJoins() throws Exception { + startGrids(2); + + client = true; + + Ignite clientNode = startGrid(3); + + client = false; + + CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor cur = clientNode.cache(null).query(qry); + + Ignite joined = startGrid(4); + + IgniteCache joinedCache = joined.cache(null); + + joinedCache.put(primaryKey(joinedCache), 1); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + + cur.close(); + } + + /** + * + */ + private static class CacheEventListener implements CacheEntryUpdatedListener { + /** */ + private final CountDownLatch latch = new CountDownLatch(1); + + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable> evts) { + for (CacheEntryEvent evt : evts) { + log.info("Received cache event: " + evt); + + latch.countDown(); + } + } + } +} -- 1.9.5.msysgit.0 From 4381bf7fbbcf6faa906cc9ed465d3feb2a302224 Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Thu, 13 Aug 2015 15:54:58 +0300 Subject: [PATCH 5/6] ignite-1229: interrupt ping queries if remote node leaves or fails --- .../ignite/spi/discovery/tcp/ServerImpl.java | 57 ++++++++++++++++++++-- .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 41 +++++++++++++--- 2 files changed, 87 insertions(+), 11 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 76144e3..1f0266e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -114,7 +114,7 @@ class ServerImpl extends TcpDiscoveryImpl { protected TcpDiscoverySpiState spiState = DISCONNECTED; /** Map with proceeding ping requests. */ - private final ConcurrentMap>> pingMap = + private final ConcurrentMap>> pingMap = new ConcurrentHashMap8<>(); /** @@ -497,9 +497,9 @@ class ServerImpl extends TcpDiscoveryImpl { return F.t(getLocalNodeId(), clientPingRes); } - GridFutureAdapter> fut = new GridFutureAdapter<>(); + GridPingFutureAdapter> fut = new GridPingFutureAdapter<>(); - IgniteInternalFuture> oldFut = pingMap.putIfAbsent(addr, fut); + GridPingFutureAdapter> oldFut = pingMap.putIfAbsent(addr, fut); if (oldFut != null) return oldFut.get(); @@ -520,7 +520,11 @@ class ServerImpl extends TcpDiscoveryImpl { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr, timeoutHelper); + sock = spi.createSocket(); + + fut.sock = sock; + + sock = spi.openSocket(sock, addr, timeoutHelper); openedSock = true; @@ -597,6 +601,21 @@ class ServerImpl extends TcpDiscoveryImpl { } } + /** + * Interrupts all existed 'ping' request for the given node. + * + * @param node Node that may be pinged. + */ + private void interruptPing(TcpDiscoveryNode node) { + for (InetSocketAddress addr : spi.getNodeAddresses(node)) { + GridPingFutureAdapter fut = pingMap.get(addr); + + if (fut != null && fut.sock != null) + // Reference to the socket is not set to null. No need to assign it to a local variable. + U.closeQuiet(fut.sock); + } + } + /** {@inheritDoc} */ @Override public void disconnect() throws IgniteSpiException { spiStop0(true); @@ -3366,6 +3385,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.verified() && !locNodeId.equals(leavingNodeId)) { TcpDiscoveryNode leftNode = ring.removeNode(leavingNodeId); + interruptPing(leavingNode); + assert leftNode != null; if (log.isDebugEnabled()) @@ -3533,6 +3554,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.verified()) { node = ring.removeNode(nodeId); + interruptPing(node); + assert node != null; long topVer; @@ -5142,4 +5165,30 @@ class ServerImpl extends TcpDiscoveryImpl { spi.writeToSocket(sock, msg, bout, timeout); } } + + /** + * + */ + private static class GridPingFutureAdapter extends GridFutureAdapter { + /** Socket. */ + private Socket sock; + + /** + * Returns socket associated with this ping future. + * + * @return Socket or {@code null} if no socket associated. + */ + public Socket sock() { + return sock; + } + + /** + * Associates socket with this ping futer. + * + * @param sock Socket. + */ + public void sock(Socket sock) { + this.sock = sock; + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 18a540c..65ab8fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1167,18 +1167,49 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * @param timeoutHelper Timeout helper. * @return Opened socket. * @throws IOException If failed. + * @throws IgniteSpiOperationTimeoutException In case of timeout. */ protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException { - assert sockAddr != null; + return openSocket(createSocket(), sockAddr, timeoutHelper); + } + + /** + * Connects to remote address sending {@code U.IGNITE_HEADER} when connection is established. + * + * @param sock Socket bound to a local host address. + * @param remAddr Remote address. + * @param timeoutHelper Timeout helper. + * @return Connected socket. + * @throws IOException If failed. + * @throws IgniteSpiOperationTimeoutException In case of timeout. + */ + Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) + throws IOException, IgniteSpiOperationTimeoutException { + + assert remAddr != null; - InetSocketAddress resolved = sockAddr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr; + InetSocketAddress resolved = remAddr.isUnresolved() ? + new InetSocketAddress(InetAddress.getByName(remAddr.getHostName()), remAddr.getPort()) : remAddr; InetAddress addr = resolved.getAddress(); assert addr != null; + sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout)); + + writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); + + return sock; + } + + /** + * Creates socket binding it to a local host address. This operation is not blocking. + * + * @return Created socket. + * @throws IOException If failed. + */ + Socket createSocket() throws IOException { Socket sock; if (isSslEnabled()) @@ -1190,10 +1221,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T sock.setTcpNoDelay(true); - sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout)); - - writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); - return sock; } -- 1.9.5.msysgit.0 From 23dc6558e771382a244cd5d3ec7abe5ddbad1f61 Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Fri, 14 Aug 2015 10:35:57 +0300 Subject: [PATCH 6/6] ignite-1229: added tests that check ping interruption --- .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 6 +- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 151 ++++++++++++++++++++- .../tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java | 5 +- 3 files changed, 156 insertions(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 65ab8fd..2f3d410 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1184,7 +1184,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * @throws IOException If failed. * @throws IgniteSpiOperationTimeoutException In case of timeout. */ - Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) + protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException { assert remAddr != null; @@ -1277,8 +1277,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * @throws IOException If IO failed or write timed out. * @throws IgniteCheckedException If marshalling failed. */ - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) - throws IOException, IgniteCheckedException { + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, + IgniteCheckedException { writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K. } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 9a44c24..e5118e3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -70,7 +70,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi spi = new TcpDiscoverySpi(); + TcpDiscoverySpi spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ? + new TestTcpDiscoverySpi() : new TcpDiscoverySpi(); discoMap.put(gridName, spi); @@ -128,6 +129,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { if (U.isMacOs()) spi.setLocalAddress(F.first(U.allLocalIps())); } + else if (gridName.contains("testPingInterruptedOnNodeFailedPingingNode")) + cfg.setFailureDetectionTimeout(30_000); return cfg; } @@ -339,6 +342,152 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** * @throws Exception If any error occurs. */ + public void testPingInterruptedOnNodeFailed() throws Exception { + try { + final Ignite pingingNode = startGrid("testPingInterruptedOnNodeFailedPingingNode"); + final Ignite failedNode = startGrid("testPingInterruptedOnNodeFailedFailingNode"); + startGrid("testPingInterruptedOnNodeFailedSimpleNode"); + + ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).ignorePingResponse = true; + + final CountDownLatch pingLatch = new CountDownLatch(1); + + final CountDownLatch eventLatch = new CountDownLatch(1); + + final AtomicBoolean pingRes = new AtomicBoolean(true); + + final AtomicBoolean failRes = new AtomicBoolean(false); + + long startTs = System.currentTimeMillis(); + + pingingNode.events().localListen( + new IgnitePredicate() { + @Override public boolean apply(Event event) { + if (((DiscoveryEvent)event).eventNode().id().equals(failedNode.cluster().localNode().id())) { + failRes.set(true); + eventLatch.countDown(); + } + + return true; + } + }, + EventType.EVT_NODE_FAILED); + + IgniteInternalFuture pingFut = multithreadedAsync( + new Callable() { + @Override public Object call() throws Exception { + pingLatch.countDown(); + + pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode( + failedNode.cluster().localNode().id())); + + return null; + } + }, 1); + + IgniteInternalFuture failingFut = multithreadedAsync( + new Callable() { + @Override public Object call() throws Exception { + pingLatch.await(); + + Thread.sleep(3000); + + ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).simulateNodeFailure(); + + return null; + } + }, 1); + + failingFut.get(); + pingFut.get(); + + assertFalse(pingRes.get()); + + assertTrue(System.currentTimeMillis() - startTs < + pingingNode.configuration().getFailureDetectionTimeout() / 2); + + assertTrue(eventLatch.await(7, TimeUnit.SECONDS)); + assertTrue(failRes.get()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testPingInterruptedOnNodeLeft() throws Exception { + try { + final Ignite pingingNode = startGrid("testPingInterruptedOnNodeFailedPingingNode"); + final Ignite leftNode = startGrid("testPingInterruptedOnNodeFailedFailingNode"); + startGrid("testPingInterruptedOnNodeFailedSimpleNode"); + + ((TestTcpDiscoverySpi)leftNode.configuration().getDiscoverySpi()).ignorePingResponse = true; + + final CountDownLatch pingLatch = new CountDownLatch(1); + + final AtomicBoolean pingRes = new AtomicBoolean(true); + + long startTs = System.currentTimeMillis(); + + IgniteInternalFuture pingFut = multithreadedAsync( + new Callable() { + @Override public Object call() throws Exception { + pingLatch.countDown(); + + pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode( + leftNode.cluster().localNode().id())); + + return null; + } + }, 1); + + IgniteInternalFuture stoppingFut = multithreadedAsync( + new Callable() { + @Override public Object call() throws Exception { + pingLatch.await(); + + Thread.sleep(3000); + + stopGrid("testPingInterruptedOnNodeFailedFailingNode"); + + return null; + } + }, 1); + + stoppingFut.get(); + pingFut.get(); + + assertFalse(pingRes.get()); + + assertTrue(System.currentTimeMillis() - startTs < + pingingNode.configuration().getFailureDetectionTimeout() / 2); + } + finally { + stopAllGrids(); + } + } + + /** + * + */ + private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + private boolean ignorePingResponse; + + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, + IgniteCheckedException { + if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse) + return; + else + super.writeToSocket(sock, msg, timeout); + } + } + + /** + * @throws Exception If any error occurs. + */ public void testNodeAdded() throws Exception { try { final Ignite g1 = startGrid(1); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java index fbea187..630f2fd 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java @@ -305,7 +305,8 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf /** {@inheritDoc} */ - @Override protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) + @Override protected Socket openSocket(Socket sock, InetSocketAddress sockAddr, + IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException { if (openSocketTimeout) { @@ -330,7 +331,7 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf } } - Socket sock = super.openSocket(sockAddr, timeoutHelper); + super.openSocket(sock, sockAddr, timeoutHelper); try { Thread.sleep(1500); -- 1.9.5.msysgit.0