diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 823ddcd..3402e55 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -190,6 +190,9 @@ public class IgniteConfiguration { /** Default value for cache sanity check enabled flag. */ public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true; + /** Default failure detection threshold in millis. */ + public static final long DFLT_FAILURE_DETECTION_THRESHOLD = 10_000; + /** Optional grid name. */ private String gridName; @@ -367,6 +370,9 @@ public class IgniteConfiguration { /** Port number range for time server. */ private int timeSrvPortRange = DFLT_TIME_SERVER_PORT_RANGE; + /** Failure detection threshold. */ + private long failureDetectionThreshold = DFLT_FAILURE_DETECTION_THRESHOLD; + /** Property names to include into node attributes. */ private String[] includeProps; @@ -449,7 +455,7 @@ public class IgniteConfiguration { consistentId = cfg.getConsistentId(); deployMode = cfg.getDeploymentMode(); discoStartupDelay = cfg.getDiscoveryStartupDelay(); - pubPoolSize = cfg.getPublicThreadPoolSize(); + failureDetectionThreshold = cfg.getFailureDetectionThreshold(); ggHome = cfg.getIgniteHome(); ggWork = cfg.getWorkDirectory(); gridName = cfg.getGridName(); @@ -479,6 +485,7 @@ public class IgniteConfiguration { p2pMissedCacheSize = cfg.getPeerClassLoadingMissedResourcesCacheSize(); p2pPoolSize = cfg.getPeerClassLoadingThreadPoolSize(); pluginCfgs = cfg.getPluginConfigurations(); + pubPoolSize = cfg.getPublicThreadPoolSize(); segChkFreq = cfg.getSegmentCheckFrequency(); segPlc = cfg.getSegmentationPolicy(); segResolveAttempts = cfg.getSegmentationResolveAttempts(); @@ -1655,6 +1662,39 @@ public class IgniteConfiguration { } /** + * Returns failure detection threshold used by {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}. + *

+ * Default is {@link #DFLT_FAILURE_DETECTION_THRESHOLD}. + * + * @see #setFailureDetectionThreshold(long) + * @return Failure detection threshold in milliseconds. + */ + public long getFailureDetectionThreshold() { + return failureDetectionThreshold; + } + + /** + * Sets failure detection threshold to use in {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}. + *

+ * The threshold allows to detect node's communication failures with the rest of a cluster faster. Every time when + * the threshold is reached during communication between the node and the cluster appropriate actions are + * performed depending on a kind of the node (server or client node) and a message with the error is printed out. + *

+ * The failure detection threshold is an easy and straightforward way to setup {@link TcpDiscoverySpi} and + * {@link TcpCommunicationSpi} depending on network conditions of a cluster. On the other hand if advanced setting + * of socket write, acknowledgement timeouts or other parameters is needed it can be done using specific + * {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi} APIs. However, sometimes the failure detection threshold + * will be ignored when such a timeout or parameter is set explicitly. The full list of such timeouts and + * parameters is available as a part of {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi} + * documentations. + * + * @param failureDetectionThreshold Failure detection threshold in milliseconds. + */ + public void setFailureDetectionThreshold(long failureDetectionThreshold) { + this.failureDetectionThreshold = failureDetectionThreshold; + } + + /** * Should return fully configured load balancing SPI implementation. If not provided, * {@link RoundRobinLoadBalancingSpi} will be used. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 2f3def9..6f5e9e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.spi; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; @@ -74,6 +75,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** Local node. */ private ClusterNode locNode; + /** Failure detection threshold usage switch. */ + private boolean failureDetectionThresholdEnabled = true; + + /** + * Failure detection threshold. Initialized with the value of + * {@link IgniteConfiguration#getFailureDetectionThreshold()}. + */ + private long failureDetectionThreshold; + /** * Creates new adapter and initializes it from the current (this) class. * SPI name will be initialized to the simple name of the class @@ -583,6 +593,49 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement } /** + * Initiates and checks failure detection threshold value. + */ + protected void initFailureDetectionThreshold() { + if (failureDetectionThresholdEnabled) { + failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold(); + + if (failureDetectionThreshold <= 0) + throw new IgniteSpiException("Invalid failure detection threshold value: " + failureDetectionThreshold); + else if (failureDetectionThreshold <= 10) + // Because U.currentTimeInMillis() is updated once in 10 milliseconds. + log.warning("Failure detection threshold is too low, it may lead to unpredictable behaviour " + + "[failureDetectionThreshold=" + failureDetectionThreshold + ']'); + } + } + + /** + * Enables or disables failure detection threshold. + * + * @param enabled {@code true} if enable, {@code false} otherwise. + */ + public void failureDetectionThresholdEnabled(boolean enabled) { + failureDetectionThresholdEnabled = enabled; + } + + /** + * Checks whether failure detection threshold is enabled for this {@link IgniteSpi}. + * + * @return {@code true} if enabled, {@code false} otherwise. + */ + public boolean failureDetectionThresholdEnabled() { + return failureDetectionThresholdEnabled; + } + + /** + * Returns failure detection threshold set to use for network related operations. + * + * @return failure detection threshold in milliseconds or {@code 0} if the threshold is disabled. + */ + public long failureDetectionThreshold() { + return failureDetectionThreshold; + } + + /** * Temporarily SPI context. */ private class GridDummySpiContext implements IgniteSpiContext { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java new file mode 100644 index 0000000..235fd2b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi; + +import org.apache.ignite.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.configuration.*; + +/** + * Kind of exception that is used when failure detection threshold is enabled for {@link TcpDiscoverySpi} or + * {@link TcpCommunicationSpi}. + * + * For more information refer to {@link IgniteConfiguration#setFailureDetectionThreshold(long)} and + * {@link IgniteSpiOperationTimeoutHelper}. + */ +public class IgniteSpiOperationTimeoutException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Constructor. + * @param msg Error message. + */ + public IgniteSpiOperationTimeoutException(String msg) { + super(msg); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java new file mode 100644 index 0000000..03858d9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.spi; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.net.*; + +/** + * Object that incorporates logic that determines a timeout value for the next network related operation and checks + * whether a failure detection threshold is reached or not. + * + * A new instance of the class should be created for every complex network based operations that usually consists of + * request and response parts. + */ +public class IgniteSpiOperationTimeoutHelper { + /** */ + private long lastOperStartTs; + + /** */ + private long timeout; + + /** */ + private final boolean failureDetectionThresholdEnabled; + + /** */ + private final long failureDetectionThreshold; + + /** + * Constructor. + * + * @param adapter SPI adapter. + */ + public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter) { + failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled(); + failureDetectionThreshold = adapter.failureDetectionThreshold(); + } + + /** + * Returns a timeout value to use for the next network operation. + * + * If failure detection threshold is enabled then the returned value is a portion of time left since the last time + * this method is called. If the threshold is disabled then {@code dfltTimeout} is returned. + * + * @param dfltTimeout Timeout to use if failure detection threshold is disabled. + * @return Timeout in milliseconds. + * @throws IgniteSpiOperationTimeoutException If failure detection threshold is reached for an operation that uses + * this {@code IgniteSpiOperationTimeoutController}. + */ + public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException { + if (!failureDetectionThresholdEnabled) + return dfltTimeout; + + if (lastOperStartTs == 0) { + timeout = failureDetectionThreshold; + lastOperStartTs = U.currentTimeMillis(); + } + else { + long curTs = U.currentTimeMillis(); + + timeout = timeout - (curTs - lastOperStartTs); + + lastOperStartTs = curTs; + + if (timeout <= 0) + throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " + + "'failureDetectionThreshold' configuration property or set SPI specific timeouts" + + " manually. Current failure detection threshold: " + failureDetectionThreshold); + } + + return timeout; + } + + /** + * Checks whether the given {@link Exception} is generated because failure detection threshold has been reached. + * + * @param e Exception. + * @return {@code true} if failure detection threshold is reached, {@code false} otherwise. + */ + public boolean checkThresholdReached(Exception e) { + if (!failureDetectionThresholdEnabled) + return false; + + return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException || + X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index e9fd696..b55dde2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -73,7 +73,19 @@ import static org.apache.ignite.events.EventType.*; * {@link #DFLT_IDLE_CONN_TIMEOUT} period and then are closed. Use * {@link #setIdleConnectionTimeout(long)} configuration parameter to configure * you own idle connection timeout. + *

Failure Detection

+ * Configuration defaults (see Configuration section below and + * {@link IgniteConfiguration#getFailureDetectionThreshold()}) for details) are chosen to make possible for + * communication SPI work reliably on most of hardware and virtual deployments, but this has made failure detection + * time worse. *

+ * If it's needed to tune failure detection then it's highly recommended to do this using + * {@link IgniteConfiguration#setFailureDetectionThreshold(long)}. This is the easiest and most straightforward way + * to setup failure detection basing on network conditions of a cluster. + *

+ * If it's required to perform advanced settings of failure detection and + * {@link IgniteConfiguration#getFailureDetectionThreshold()} is unsuitable then various {@code TcpCommunicationSpi} + * configuration parameters may be used. *

Configuration

*

Mandatory

* This SPI has no mandatory configuration parameters. @@ -991,12 +1003,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * {@code 0} is interpreted as infinite timeout. *

* If not provided, default value is {@link #DFLT_CONN_TIMEOUT}. + *

+ * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionThreshold()} is ignored. * * @param connTimeout Connect timeout. */ @IgniteSpiConfiguration(optional = true) public void setConnectTimeout(long connTimeout) { this.connTimeout = connTimeout; + + failureDetectionThresholdEnabled(false); } /** {@inheritDoc} */ @@ -1013,12 +1029,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * {@code 0} is interpreted as infinite timeout. *

* If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}. + *

+ * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionThreshold()} is ignored. * * @param maxConnTimeout Maximum connect timeout. */ @IgniteSpiConfiguration(optional = true) public void setMaxConnectTimeout(long maxConnTimeout) { this.maxConnTimeout = maxConnTimeout; + + failureDetectionThresholdEnabled(false); } /** {@inheritDoc} */ @@ -1031,12 +1051,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * with remote nodes. *

* If not provided, default value is {@link #DFLT_RECONNECT_CNT}. + *

+ * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionThreshold()} is ignored. * * @param reconCnt Maximum number of reconnection attempts. */ @IgniteSpiConfiguration(optional = true) public void setReconnectCount(int reconCnt) { this.reconCnt = reconCnt; + + failureDetectionThresholdEnabled(false); } /** {@inheritDoc} */ @@ -1264,6 +1288,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public Map getNodeAttributes() throws IgniteSpiException { + initFailureDetectionThreshold(); + assertParameter(locPort > 1023, "locPort > 1023"); assertParameter(locPort <= 0xffff, "locPort < 0xffff"); assertParameter(locPortRange >= 0, "locPortRange >= 0"); @@ -1272,10 +1298,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0"); assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0"); assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1"); - assertParameter(reconCnt > 0, "reconnectCnt > 0"); assertParameter(selectorsCnt > 0, "selectorsCnt > 0"); - assertParameter(connTimeout >= 0, "connTimeout >= 0"); - assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout"); + + if (!failureDetectionThresholdEnabled()) { + assertParameter(reconCnt > 0, "reconnectCnt > 0"); + assertParameter(connTimeout >= 0, "connTimeout >= 0"); + assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout"); + } + assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0"); assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0"); assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0"); @@ -1351,9 +1381,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("sockRcvBuf", sockRcvBuf)); log.debug(configInfo("shmemPort", shmemPort)); log.debug(configInfo("msgQueueLimit", msgQueueLimit)); - log.debug(configInfo("connTimeout", connTimeout)); - log.debug(configInfo("maxConnTimeout", maxConnTimeout)); - log.debug(configInfo("reconCnt", reconCnt)); + + if (failureDetectionThresholdEnabled()) { + log.debug(configInfo("connTimeout", connTimeout)); + log.debug(configInfo("maxConnTimeout", maxConnTimeout)); + log.debug(configInfo("reconCnt", reconCnt)); + } + else + log.debug(configInfo("failureDetectionThreshold", failureDetectionThreshold())); + log.debug(configInfo("sockWriteTimeout", sockWriteTimeout)); log.debug(configInfo("ackSndThreshold", ackSndThreshold)); log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize)); @@ -1906,17 +1942,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter long connTimeout0 = connTimeout; + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this); + while (true) { GridCommunicationClient client; try { client = new GridShmemCommunicationClient(metricsLsnr, port, - connTimeout, + timeoutHelper.nextTimeoutChunk(connTimeout), log, getSpiContext().messageFormatter()); } catch (IgniteCheckedException e) { + if (timeoutHelper.checkThresholdReached(e)) + throw e; + // Reconnect for the second time, if connection is not established. if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) { connectAttempts++; @@ -1928,15 +1969,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { - safeHandshake(client, null, node.id(), connTimeout0); + safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0)); } - catch (HandshakeTimeoutException e) { + catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { + client.forceClose(); + + if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException || + timeoutHelper.checkThresholdReached(e))) { + log.debug("Handshake timed out (failure threshold reached) [failureDetectionThreshold=" + + failureDetectionThreshold() + ", err=" + e.getMessage() + ", client=" + client + ']'); + + throw e; + } + + assert !failureDetectionThresholdEnabled(); + if (log.isDebugEnabled()) - log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + + log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", err=" + e.getMessage() + ", client=" + client + ']'); - client.forceClose(); - if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { if (log.isDebugEnabled()) log.debug("Handshake timedout (will stop attempts to perform the handshake) " + @@ -2050,6 +2101,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter int attempt = 1; + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this); + while (!conn) { // Reconnection on handshake timeout. try { SocketChannel ch = SocketChannel.open(); @@ -2076,9 +2129,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter long rcvCnt = -1; try { - ch.socket().connect(addr, (int)connTimeout); + ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout)); - rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0); + rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), + timeoutHelper.nextTimeoutChunk(connTimeout0)); if (rcvCnt == -1) return null; @@ -2112,19 +2166,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } } } - catch (HandshakeTimeoutException e) { + catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { if (client != null) { client.forceClose(); client = null; } + if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException || + timeoutHelper.checkThresholdReached(e))) { + + String msg = "Handshake timed out (failure detection threshold is reached) " + + "[failureDetectionThreshold=" + failureDetectionThreshold() + ", addr=" + addr + ']'; + + onException(msg, e); + + if (log.isDebugEnabled()) + log.debug(msg); + + if (errs == null) + errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + + "Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " + + "in order to prevent parties from waiting forever in case of network issues " + + "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); + + errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); + + break; + } + + assert !failureDetectionThresholdEnabled(); + onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", addr=" + addr + ']', e); if (log.isDebugEnabled()) log.debug( - "Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + + "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", addr=" + addr + ", err=" + e + ']'); if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { @@ -2164,7 +2242,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']'); - if (X.hasCause(e, SocketTimeoutException.class)) + boolean failureDetThrReached = timeoutHelper.checkThresholdReached(e); + + if (failureDetThrReached) + LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionThreshold' " + + "configuration property) [addr=" + addr + ", failureDetectionThreshold=" + + failureDetectionThreshold() + ']'); + else if (X.hasCause(e, SocketTimeoutException.class)) LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " + "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']'); @@ -2177,7 +2261,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); // Reconnect for the second time, if connection is not established. - if (connectAttempts < 2 && + if (!failureDetThrReached && connectAttempts < 2 && (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) { connectAttempts++; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 572ba2c..f6a1cdc 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -480,13 +480,17 @@ class ClientImpl extends TcpDiscoveryImpl { Collection errs = null; - long ackTimeout0 = spi.ackTimeout; + long ackTimeout0 = spi.getAckTimeout(); + + int reconCnt = 0; int connectAttempts = 1; UUID locNodeId = getLocalNodeId(); - for (int i = 0; i < spi.reconCnt; i++) { + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + + while (true) { boolean openSock = false; Socket sock = null; @@ -494,7 +498,7 @@ class ClientImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr); + sock = spi.openSocket(addr, timeoutHelper); openSock = true; @@ -502,7 +506,7 @@ class ClientImpl extends TcpDiscoveryImpl { req.client(true); - spi.writeToSocket(sock, req); + spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); @@ -532,7 +536,7 @@ class ClientImpl extends TcpDiscoveryImpl { msg.client(true); - spi.writeToSocket(sock, msg); + spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -540,7 +544,8 @@ class ClientImpl extends TcpDiscoveryImpl { log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr + ", rmtNodeId=" + rmtNodeId + ']'); - return new T3<>(sock, spi.readReceipt(sock, ackTimeout0), res.clientAck()); + return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), + res.clientAck()); } catch (IOException | IgniteCheckedException e) { U.closeQuiet(sock); @@ -555,6 +560,12 @@ class ClientImpl extends TcpDiscoveryImpl { errs.add(e); + if (timeoutHelper.checkThresholdReached(e)) + break; + + if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) + break; + if (!openSock) { // Reconnect for the second time, if connection is not established. if (connectAttempts < 2) { @@ -566,7 +577,8 @@ class ClientImpl extends TcpDiscoveryImpl { break; // Don't retry if we can not establish connection. } - if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { + if (!spi.failureDetectionThresholdEnabled() && (e instanceof SocketTimeoutException || + X.hasCause(e, SocketTimeoutException.class))) { ackTimeout0 *= 2; if (!checkAckTimeout(ackTimeout0)) @@ -868,6 +880,9 @@ class ClientImpl extends TcpDiscoveryImpl { private final Queue queue = new ArrayDeque<>(); /** */ + private final long socketTimeout; + + /** */ private TcpDiscoveryAbstractMessage unackedMsg; /** @@ -875,6 +890,9 @@ class ClientImpl extends TcpDiscoveryImpl { */ protected SocketWriter() { super(spi.ignite().name(), "tcp-client-disco-sock-writer", log); + + socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : + spi.getSocketTimeout(); } /** @@ -968,12 +986,13 @@ class ClientImpl extends TcpDiscoveryImpl { } } - spi.writeToSocket(sock, msg); + spi.writeToSocket(sock, msg, socketTimeout); msg = null; if (ack) { - long waitEnd = U.currentTimeMillis() + spi.ackTimeout; + long waitEnd = U.currentTimeMillis() + (spi.failureDetectionThresholdEnabled() ? + spi.failureDetectionThreshold() : spi.getAckTimeout()); TcpDiscoveryAbstractMessage unacked; @@ -989,7 +1008,10 @@ class ClientImpl extends TcpDiscoveryImpl { if (unacked != null) { if (log.isDebugEnabled()) log.debug("Failed to get acknowledge for message, will try to reconnect " + - "[msg=" + unacked + ", timeout=" + spi.ackTimeout + ']'); + "[msg=" + unacked + + (spi.failureDetectionThresholdEnabled() ? + ", failureDetectionThershold=" + spi.failureDetectionThreshold() : + ", timeout=" + spi.getAckTimeout()) + ']'); throw new IOException("Failed to get acknowledge for message: " + unacked); } @@ -1068,12 +1090,12 @@ class ClientImpl extends TcpDiscoveryImpl { if (join) { joinError(new IgniteSpiException("Join process timed out, connection failed and " + "failed to reconnect (consider increasing 'joinTimeout' configuration property) " + - "[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']')); + "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']')); } else - U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " + - "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']'); - + U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" + + " configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']'); + return; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index dc343eb..b085b3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -80,14 +80,6 @@ class ServerImpl extends TcpDiscoveryImpl { /** Client message workers. */ protected ConcurrentMap clientMsgWorkers = new ConcurrentHashMap8<>(); - /** Metrics sender. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private HeartbeatsSender hbsSnd; - - /** Status checker. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private CheckStatusSender chkStatusSnd; - /** IP finder cleaner. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private IpFinderCleaner ipFinderCleaner; @@ -229,12 +221,6 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onJoinFinished(); - hbsSnd = new HeartbeatsSender(); - hbsSnd.start(); - - chkStatusSnd = new CheckStatusSender(); - chkStatusSnd.start(); - if (spi.ipFinder.isShared()) { ipFinderCleaner = new IpFinderCleaner(); ipFinderCleaner.start(); @@ -278,10 +264,10 @@ class ServerImpl extends TcpDiscoveryImpl { msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id())); synchronized (mux) { - long threshold = U.currentTimeMillis() + spi.netTimeout; - long timeout = spi.netTimeout; + long threshold = U.currentTimeMillis() + timeout; + while (spiState != LEFT && timeout > 0) { try { mux.wait(timeout); @@ -319,12 +305,6 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(tmp); U.joinThreads(tmp, log); - U.interrupt(hbsSnd); - U.join(hbsSnd, log); - - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); - U.interrupt(ipFinderCleaner); U.join(ipFinderCleaner, log); @@ -482,6 +462,8 @@ class ServerImpl extends TcpDiscoveryImpl { UUID locNodeId = getLocalNodeId(); + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + if (F.contains(spi.locNodeAddrs, addr)) { if (clientNodeId == null) return F.t(getLocalNodeId(), false); @@ -494,7 +476,7 @@ class ServerImpl extends TcpDiscoveryImpl { boolean clientPingRes; try { - clientPingRes = clientWorker.ping(); + clientPingRes = clientWorker.ping(timeoutHelper); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -517,18 +499,26 @@ class ServerImpl extends TcpDiscoveryImpl { try { Socket sock = null; - for (int i = 0; i < spi.reconCnt; i++) { + int reconCnt = 0; + + boolean openedSock = false; + + while (true) { try { if (addr.isUnresolved()) addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()); long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr); + sock = spi.openSocket(addr, timeoutHelper); + + openedSock = true; - spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId)); + spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId), + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); - TcpDiscoveryPingResponse res = spi.readMessage(sock, null, spi.netTimeout); + TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( + spi.getAckTimeout())); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -550,6 +540,16 @@ class ServerImpl extends TcpDiscoveryImpl { errs = new ArrayList<>(); errs.add(e); + + reconCnt++; + + if (!openedSock && reconCnt == 2) + break; + + if (timeoutHelper.checkThresholdReached(e)) + break; + else if (!spi.failureDetectionThresholdEnabled() && reconCnt == spi.getReconnectCount()) + break; } finally { U.closeQuiet(sock); @@ -607,6 +607,12 @@ class ServerImpl extends TcpDiscoveryImpl { } } + /** {@inheritDoc} */ + @Override protected void onDataReceived() { + if (spi.failureDetectionThresholdEnabled() && locNode != null) + locNode.lastDataReceivedTime(U.currentTimeMillis()); + } + /** * Tries to join this node to topology. * @@ -678,10 +684,10 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Join request message has been sent (waiting for coordinator response)."); synchronized (mux) { - long threshold = U.currentTimeMillis() + spi.netTimeout; - long timeout = spi.netTimeout; + long threshold = U.currentTimeMillis() + timeout; + while (spiState == CONNECTING && timeout > 0) { try { mux.wait(timeout); @@ -883,15 +889,19 @@ class ServerImpl extends TcpDiscoveryImpl { Collection errs = null; - long ackTimeout0 = spi.ackTimeout; + long ackTimeout0 = spi.getAckTimeout(); int connectAttempts = 1; - boolean joinReqSent = false; + boolean joinReqSent; UUID locNodeId = getLocalNodeId(); - for (int i = 0; i < spi.reconCnt; i++) { + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + + int reconCnt = 0; + + while (true){ // Need to set to false on each new iteration, // since remote node may leave in the middle of the first iteration. joinReqSent = false; @@ -903,14 +913,16 @@ class ServerImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr); + sock = spi.openSocket(addr, timeoutHelper); openSock = true; // Handshake. - spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); + spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk( + spi.getSocketTimeout())); - TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( + ackTimeout0)); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -924,7 +936,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Send message. tstamp = U.currentTimeMillis(); - spi.writeToSocket(sock, msg); + spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -941,7 +953,7 @@ class ServerImpl extends TcpDiscoveryImpl { // E.g. due to class not found issue. joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; - return spi.readReceipt(sock, ackTimeout0); + return spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); } catch (ClassCastException e) { // This issue is rarely reproducible on AmazonEC2, but never @@ -967,6 +979,12 @@ class ServerImpl extends TcpDiscoveryImpl { errs.add(e); + if (timeoutHelper.checkThresholdReached(e)) + break; + + if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) + break; + if (!openSock) { // Reconnect for the second time, if connection is not established. if (connectAttempts < 2) { @@ -978,7 +996,8 @@ class ServerImpl extends TcpDiscoveryImpl { break; // Don't retry if we can not establish connection. } - if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { + if (!spi.failureDetectionThresholdEnabled() && (e instanceof SocketTimeoutException || + X.hasCause(e, SocketTimeoutException.class))) { ackTimeout0 *= 2; if (!checkAckTimeout(ackTimeout0)) @@ -1256,12 +1275,6 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(tcpSrvr); U.join(tcpSrvr, log); - U.interrupt(hbsSnd); - U.join(hbsSnd, log); - - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); - U.interrupt(ipFinderCleaner); U.join(ipFinderCleaner, log); @@ -1350,8 +1363,7 @@ class ServerImpl extends TcpDiscoveryImpl { b.append("Internal threads: ").append(U.nl()); b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); - b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl()); - b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl()); + b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl()); b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl()); @@ -1398,7 +1410,8 @@ class ServerImpl extends TcpDiscoveryImpl { private boolean recordable(TcpDiscoveryAbstractMessage msg) { return !(msg instanceof TcpDiscoveryHeartbeatMessage) && !(msg instanceof TcpDiscoveryStatusCheckMessage) && - !(msg instanceof TcpDiscoveryDiscardMessage); + !(msg instanceof TcpDiscoveryDiscardMessage) && + !(msg instanceof TcpDiscoveryConnectionCheckMessage); } /** @@ -1434,112 +1447,6 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Thread that sends heartbeats. - */ - private class HeartbeatsSender extends IgniteSpiThread { - /** - * Constructor. - */ - private HeartbeatsSender() { - super(spi.ignite().name(), "tcp-disco-hb-sender", log); - - setPriority(spi.threadPri); - } - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override protected void body() throws InterruptedException { - while (!isLocalNodeCoordinator()) - Thread.sleep(1000); - - if (log.isDebugEnabled()) - log.debug("Heartbeats sender has been started."); - - UUID nodeId = getConfiguredNodeId(); - - while (!isInterrupted()) { - if (spiStateCopy() != CONNECTED) { - if (log.isDebugEnabled()) - log.debug("Stopping heartbeats sender (SPI is not connected to topology)."); - - return; - } - - TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(nodeId); - - msg.verify(getLocalNodeId()); - - msgWorker.addMessage(msg); - - Thread.sleep(spi.hbFreq); - } - } - } - - /** - * Thread that sends status check messages to next node if local node has not - * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage}) - * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} * - * {@link TcpDiscoverySpi#getHeartbeatFrequency()}. - */ - private class CheckStatusSender extends IgniteSpiThread { - /** - * Constructor. - */ - private CheckStatusSender() { - super(spi.ignite().name(), "tcp-disco-status-check-sender", log); - - setPriority(spi.threadPri); - } - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Status check sender has been started."); - - // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm. - long checkTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50; - - long lastSent = 0; - - while (!isInterrupted()) { - // 1. Determine timeout. - if (lastSent < locNode.lastUpdateTime()) - lastSent = locNode.lastUpdateTime(); - - long timeout = (lastSent + checkTimeout) - U.currentTimeMillis(); - - if (timeout > 0) - Thread.sleep(timeout); - - // 2. Check if SPI is still connected. - if (spiStateCopy() != CONNECTED) { - if (log.isDebugEnabled()) - log.debug("Stopping status check sender (SPI is not connected to topology)."); - - return; - } - - // 3. Was there an update? - if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) { - if (log.isDebugEnabled()) - log.debug("Skipping status check send " + - "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) + - ", hasRmts=" + ring.hasRemoteNodes() + ']'); - - continue; - } - - // 4. Send status check message. - lastSent = U.currentTimeMillis(); - - msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); - } - } - } - - /** * Thread that cleans IP finder and keeps it in the correct state, unregistering * addresses of the nodes that has left the topology. *

@@ -1861,10 +1768,49 @@ class ServerImpl extends TcpDiscoveryImpl { /** Socket. */ private Socket sock; + /** Last time status message has been sent. */ + private long lastTimeStatusMsgSent; + + /** Incoming heartbeats check frequency. */ + private long hbCheckFreq = (long)spi.maxMissedHbs * spi.hbFreq + 50; + + /** Last time heartbeat message has been sent. */ + private long lastTimeHbMsgSent; + + /** Time when the last status message has been sent. */ + private long lastTimeConnCheckMsgSent; + + /** Flag that keeps info on whether the threshold is reached or not. */ + private boolean failureThresholdReached; + + /** Connection check frequency. */ + private long connCheckFreq; + /** */ protected RingMessageWorker() { - super("tcp-disco-msg-worker"); + super("tcp-disco-msg-worker", 10); + + initConnectionCheckFrequency(); + } + + /** + * Initializes connection check frequency. Used only when failure detection threshold is enabled. + */ + private void initConnectionCheckFrequency() { + if (spi.failureDetectionThresholdEnabled()) { + for (int i = 3; i > 0; i--) { + connCheckFreq = spi.failureDetectionThreshold() / i; + + if (connCheckFreq > 0) + break; + } + + assert connCheckFreq > 0; + + if (log.isDebugEnabled()) + log.debug("Connection check frequency is calculated: " + connCheckFreq); + } } /** @@ -1882,6 +1828,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg instanceof TcpDiscoveryJoinRequestMessage) processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg); + else if (msg instanceof TcpDiscoveryConnectionCheckMessage) + processConnectionCheckMessage((TcpDiscoveryConnectionCheckMessage)msg); + else if (msg instanceof TcpDiscoveryClientReconnectMessage) processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg); @@ -1921,9 +1870,25 @@ class ServerImpl extends TcpDiscoveryImpl { if (spi.ensured(msg)) msgHist.add(msg); + if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) + // Reset the flag. + failureThresholdReached = false; + spi.stats.onMessageProcessingFinished(msg); } + /** {@inheritDoc} */ + @Override protected void noMessageLoop() { + if (locNode == null) + return; + + checkConnection(); + + sendHeartbeatMessage(); + + checkHeartbeatsReceiving(); + } + /** * Sends message across the ring. * @@ -1990,7 +1955,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode) debugLog("No next node in topology."); - if (ring.hasRemoteNodes()) { + if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) && + !(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) { msg.senderNodeId(locNodeId); addMessage(msg); @@ -2027,7 +1993,7 @@ class ServerImpl extends TcpDiscoveryImpl { List locNodeAddrs = U.arrayList(locNode.socketAddresses()); addr: for (InetSocketAddress addr : spi.getNodeAddresses(next, sameHost)) { - long ackTimeout0 = spi.ackTimeout; + long ackTimeout0 = spi.getAckTimeout(); if (locNodeAddrs.contains(addr)){ if (log.isDebugEnabled()) @@ -2037,8 +2003,15 @@ class ServerImpl extends TcpDiscoveryImpl { continue; } - for (int i = 0; i < spi.reconCnt; i++) { + int reconCnt = 0; + + IgniteSpiOperationTimeoutHelper timeoutHelper = null; + + while (true) { if (sock == null) { + if (timeoutHelper == null) + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + nextNodeExists = false; boolean success = false; @@ -2049,14 +2022,16 @@ class ServerImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr); + sock = spi.openSocket(addr, timeoutHelper); openSock = true; // Handshake. - writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); + writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); - TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, + timeoutHelper.nextTimeoutChunk(ackTimeout0)); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -2140,8 +2115,13 @@ class ServerImpl extends TcpDiscoveryImpl { if (!openSock) break; // Don't retry if we can not establish connection. - if (e instanceof SocketTimeoutException || - X.hasCause(e, SocketTimeoutException.class)) { + if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) + break; + + if (timeoutHelper.checkThresholdReached(e)) + break; + else if (!spi.failureDetectionThresholdEnabled() && (e instanceof + SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) { ackTimeout0 *= 2; if (!checkAckTimeout(ackTimeout0)) @@ -2156,9 +2136,13 @@ class ServerImpl extends TcpDiscoveryImpl { sock = null; } - else + else { // Next node exists and accepts incoming messages. nextNodeExists = true; + // Resetting timeout control object to let the code below to use a new one + // for the next bunch of operations. + timeoutHelper = null; + } } } @@ -2195,8 +2179,12 @@ class ServerImpl extends TcpDiscoveryImpl { prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId); + if (timeoutHelper == null) + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + try { - writeToSocket(sock, pendingMsg); + writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk( + spi.getSocketTimeout())); } finally { clearNodeAddedMessage(pendingMsg); @@ -2204,7 +2192,7 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp); - int res = spi.readReceipt(sock, ackTimeout0); + int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); if (log.isDebugEnabled()) log.debug("Pending message has been sent to next node [msg=" + msg.id() + @@ -2215,6 +2203,10 @@ class ServerImpl extends TcpDiscoveryImpl { debugLog("Pending message has been sent to next node [msg=" + msg.id() + ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ", res=" + res + ']'); + + // Resetting timeout control object to create a new one for the next bunch of + // operations. + timeoutHelper = null; } } @@ -2223,11 +2215,14 @@ class ServerImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - writeToSocket(sock, msg); + if (timeoutHelper == null) + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + + writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); - int res = spi.readReceipt(sock, ackTimeout0); + int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); if (log.isDebugEnabled()) log.debug("Message has been sent to next node [msg=" + msg + @@ -2262,11 +2257,19 @@ class ServerImpl extends TcpDiscoveryImpl { onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']', e); - if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { - ackTimeout0 *= 2; + if (timeoutHelper.checkThresholdReached(e)) + break; - if (!checkAckTimeout(ackTimeout0)) + if (!spi.failureDetectionThresholdEnabled()) { + if (++reconCnt == spi.getReconnectCount()) break; + else if (e instanceof SocketTimeoutException || + X.hasCause(e, SocketTimeoutException.class)) { + ackTimeout0 *= 2; + + if (!checkAckTimeout(ackTimeout0)) + break; + } } } finally { @@ -2279,7 +2282,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg + - ", i=" + i + ']'); + (!spi.failureDetectionThresholdEnabled() ? ", i=" + reconCnt : "") + ']'); } } } // Try to reconnect. @@ -3342,7 +3345,8 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (leftNode.equals(next) && sock != null) { try { - writeToSocket(sock, msg); + writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ? + spi.failureDetectionThreshold() : spi.getSocketTimeout()); if (log.isDebugEnabled()) log.debug("Sent verified node left message to leaving node: " + msg); @@ -3790,6 +3794,37 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Processes connection check message. + * + * @param msg Connection check message. + */ + private void processConnectionCheckMessage(TcpDiscoveryConnectionCheckMessage msg) { + assert msg.creatorNodeId().equals(getLocalNodeId()) && msg.senderNodeId() == null; + + if (spiStateCopy() != CONNECTED) { + if (log.isDebugEnabled()) + log.debug("Connection check message discarded (local node is leaving topology)."); + + return; + } + + if (next == null) { + if (log.isDebugEnabled()) + log.debug("Connection check message discarded (no next node in topology)."); + + return; + } + + // Link to the 'next' node is updated only inside RingMessageWorker thread, no need to check on 'null'. + if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER, + TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER, TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) { + // Preserve backward compatibility with nodes of older versions. + processStatusCheckMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); + } else if (ring.hasRemoteNodes()) + sendMessageAcrossRing(msg); + } + + /** * @param nodeId Node ID. * @param metrics Metrics. * @param cacheMetrics Cache metrics. @@ -3991,6 +4026,74 @@ class ServerImpl extends TcpDiscoveryImpl { } } } + + /** + * Sends heartbeat message if needed. + */ + private void sendHeartbeatMessage() { + if (!isLocalNodeCoordinator()) + return; + + long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis(); + + if (elapsed > 0) + return; + + TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId()); + + msg.verify(getLocalNodeId()); + + msgWorker.addMessage(msg); + + lastTimeHbMsgSent = U.currentTimeMillis(); + } + + /** + * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than + * {@link TcpDiscoveryStatusCheckMessage} is sent accros the ring. + */ + private void checkHeartbeatsReceiving() { + if (lastTimeStatusMsgSent < locNode.lastUpdateTime()) + lastTimeStatusMsgSent = locNode.lastUpdateTime(); + + long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis(); + + if (elapsed > 0) + return; + + msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); + + lastTimeStatusMsgSent = U.currentTimeMillis(); + } + + /** + * Check connection aliveness status. + */ + private void checkConnection() { + if (!spi.failureDetectionThresholdEnabled()) + return; + + if (!failureThresholdReached && U.currentTimeMillis() - locNode.lastDataReceivedTime() + >= spi.failureDetectionThreshold() && ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) { + + log.info("Local node seems to be disconnected from topology (failure detection threshold " + + "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() + + ", connCheckFreq=" + connCheckFreq + ']'); + + failureThresholdReached = true; + } + + long elapsed = (lastTimeConnCheckMsgSent + connCheckFreq) - U.currentTimeMillis(); + + if (elapsed > 0) + return; + + if (ring.hasRemoteNodes()) { + processConnectionCheckMessage(new TcpDiscoveryConnectionCheckMessage(locNode)); + + lastTimeConnCheckMsgSent = U.currentTimeMillis(); + } + } } /** @@ -4186,14 +4289,17 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId); + IgniteSpiOperationTimeoutHelper timeoutHelper = + new IgniteSpiOperationTimeoutHelper(spi); + if (req.clientNodeId() != null) { ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId()); if (clientWorker != null) - res.clientExists(clientWorker.ping()); + res.clientExists(clientWorker.ping(timeoutHelper)); } - spi.writeToSocket(sock, res); + spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); } else if (log.isDebugEnabled()) log.debug("Ignore ping request, node is stopping."); @@ -4214,7 +4320,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (req.client()) res.clientAck(true); - spi.writeToSocket(sock, res); + spi.writeToSocket(sock, res, spi.failureDetectionThresholdEnabled() ? + spi.failureDetectionThreshold() : spi.getSocketTimeout()); // It can happen if a remote node is stopped and it has a loopback address in the list of addresses, // the local node sends a handshake request message on the loopback address, so we get here. @@ -4323,6 +4430,9 @@ class ServerImpl extends TcpDiscoveryImpl { return; } + long socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : + spi.getSocketTimeout(); + while (!isInterrupted()) { try { TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader()); @@ -4337,7 +4447,12 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode && recordable(msg)) debugLog("Message has been received: " + msg); - if (msg instanceof TcpDiscoveryJoinRequestMessage) { + if (msg instanceof TcpDiscoveryConnectionCheckMessage) { + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + + continue; + } + else if (msg instanceof TcpDiscoveryJoinRequestMessage) { TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg; if (!req.responded()) { @@ -4355,7 +4470,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState state = spiStateCopy(); if (state == CONNECTED) { - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); if (clientMsgWrk.getState() == State.NEW) clientMsgWrk.start(); @@ -4365,7 +4480,7 @@ class ServerImpl extends TcpDiscoveryImpl { continue; } else { - spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN); + spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, socketTimeout); break; } @@ -4373,7 +4488,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryDuplicateIdMessage) { // Send receipt back. - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); boolean ignored = false; @@ -4402,7 +4517,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryAuthFailedMessage) { // Send receipt back. - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); boolean ignored = false; @@ -4431,7 +4546,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryCheckFailedMessage) { // Send receipt back. - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); boolean ignored = false; @@ -4460,7 +4575,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) { // Send receipt back. - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); boolean ignored = false; @@ -4509,7 +4624,7 @@ class ServerImpl extends TcpDiscoveryImpl { clientMsgWrk.addMessage(ack); } else - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) @@ -4610,8 +4725,11 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState state = spiStateCopy(); + long socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : + spi.getSocketTimeout(); + if (state == CONNECTED) { - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); if (log.isDebugEnabled()) log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']'); @@ -4648,7 +4766,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Local node is stopping. Remote node should try next one. res = RES_CONTINUE_JOIN; - spi.writeToSocket(msg, sock, res); + spi.writeToSocket(msg, sock, res, socketTimeout); if (log.isDebugEnabled()) log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']'); @@ -4741,7 +4859,7 @@ class ServerImpl extends TcpDiscoveryImpl { * @param clientNodeId Node ID. */ protected ClientMessageWorker(Socket sock, UUID clientNodeId) { - super("tcp-disco-client-message-worker"); + super("tcp-disco-client-message-worker", 2000); this.sock = sock; this.clientNodeId = clientNodeId; @@ -4791,7 +4909,8 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - writeToSocket(sock, msg); + writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ? + spi.failureDetectionThreshold() : spi.getSocketTimeout()); } } else { @@ -4802,7 +4921,8 @@ class ServerImpl extends TcpDiscoveryImpl { prepareNodeAddedMessage(msg, clientNodeId, null, null); - writeToSocket(sock, msg); + writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ? + spi.failureDetectionThreshold() : spi.getSocketTimeout()); } finally { clearNodeAddedMessage(msg); @@ -4836,10 +4956,11 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * @param timeoutHelper Timeout controller. * @return Ping result. * @throws InterruptedException If interrupted. */ - public boolean ping() throws InterruptedException { + public boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException { if (spi.isNodeStopping0()) return false; @@ -4865,7 +4986,8 @@ class ServerImpl extends TcpDiscoveryImpl { } try { - return fut.get(spi.ackTimeout, TimeUnit.MILLISECONDS); + return fut.get(timeoutHelper.nextTimeoutChunk(spi.getAckTimeout()), + TimeUnit.MILLISECONDS); } catch (IgniteInterruptedCheckedException ignored) { throw new InterruptedException(); @@ -4904,12 +5026,18 @@ class ServerImpl extends TcpDiscoveryImpl { /** Backed interrupted flag. */ private volatile boolean interrupted; + /** Polling timeout. */ + private final long pollingTimeout; + /** * @param name Thread name. + * @param pollingTimeout Messages polling timeout. */ - protected MessageWorkerAdapter(String name) { + protected MessageWorkerAdapter(String name, long pollingTimeout) { super(spi.ignite().name(), name, log); + this.pollingTimeout = pollingTimeout; + setPriority(spi.threadPri); } @@ -4919,12 +5047,12 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']'); while (!isInterrupted()) { - TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS); + TcpDiscoveryAbstractMessage msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS); if (msg == null) - continue; - - processMessage(msg); + noMessageLoop(); + else + processMessage(msg); } } @@ -4968,16 +5096,24 @@ class ServerImpl extends TcpDiscoveryImpl { protected abstract void processMessage(TcpDiscoveryAbstractMessage msg); /** + * Called when there is no message to process giving ability to perform other activity. + */ + protected void noMessageLoop() { + // No-op. + } + + /** * @param sock Socket. * @param msg Message. + * @param timeout Socket timeout. * @throws IOException If IO failed. * @throws IgniteCheckedException If marshalling failed. */ - protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) + protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { bout.reset(); - spi.writeToSocket(sock, msg, bout); + spi.writeToSocket(sock, msg, bout, timeout); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index c271b7c..4dacf45 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -131,6 +131,13 @@ abstract class TcpDiscoveryImpl { } /** + * Called when a chunk of data is received from a remote node. + */ + protected void onDataReceived() { + // No-op + } + + /** * @param log Logger. */ public abstract void dumpDebugInfo(IgniteLogger log); @@ -273,10 +280,10 @@ abstract class TcpDiscoveryImpl { * maximum acknowledgement timeout, {@code false} otherwise. */ protected boolean checkAckTimeout(long ackTimeout) { - if (ackTimeout > spi.maxAckTimeout) { + if (ackTimeout > spi.getMaxAckTimeout()) { LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " + "(consider increasing 'maxAckTimeout' configuration property) " + - "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']'); + "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.getMaxAckTimeout() + ']'); return false; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index b7d6e3f..be042eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -65,12 +65,18 @@ import java.util.concurrent.atomic.*; * and then this info goes to coordinator. When coordinator processes join request * and issues node added messages and all other nodes then receive info about new node. *

Failure Detection

- * Configuration defaults (see Configuration section below for details) - * are chosen to make possible for discovery SPI work reliably on - * most of hardware and virtual deployments, but this has made failure detection time worse. + * Configuration defaults (see Configuration section below and + * {@link IgniteConfiguration#getFailureDetectionThreshold()}) for details) are chosen to make possible for discovery + * SPI work reliably on most of hardware and virtual deployments, but this has made failure detection time worse. *

- * For stable low-latency networks the following more aggressive settings are recommended - * (which allows failure detection time ~200ms): + * If it's needed to tune failure detection then it's highly recommended to do this using + * {@link IgniteConfiguration#setFailureDetectionThreshold(long)}. This is the easiest and most straightforward way + * to setup failure detection basing on network conditions of a cluster. + *

+ * If it's required to perform advanced settings of failure detection and + * {@link IgniteConfiguration#getFailureDetectionThreshold()} is unsuitable then various {@code TcpDiscoverySpi} + * configuration parameters may be used. As an example, for stable low-latency networks the following more aggressive + * settings are recommended (which allows failure detection time ~200ms): *