diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 25b219b..7af00bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -390,6 +390,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ses.send(new RecoveryLastReceivedMessage(-1)); + fut.onDone(oldClient); + return; } else { @@ -430,15 +432,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ses.send(new RecoveryLastReceivedMessage(-1)); } else { + if (!hasShmemClient) + // Prevents a race between incoming TCP connection and 'outgoing' shmem connection. + hasShmemClient = oldFut instanceof ShmemConnectFuture; + boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); - if (reserved) { - GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); - - fut.onDone(client); - } + if (reserved) + connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); } } } @@ -1413,7 +1415,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (slowClientQueueLimit > 0 && msgQueueLimit > 0 && slowClientQueueLimit >= msgQueueLimit) { U.quietAndWarn(log, "Slow client queue limit is set to a value greater than message queue limit " + "(slow client queue limit will have no effect) [msgQueueLimit=" + msgQueueLimit + - ", slowClientQueueLimit=" + slowClientQueueLimit + ']'); + ", slowClientQueueLimit=" + slowClientQueueLimit + ']'); } registerMBean(gridName, this, TcpCommunicationSpiMBean.class); @@ -1891,8 +1893,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (isNodeStopping()) throw new IgniteSpiException("Node is stopping."); + Integer shmemPort = shmemPortToUse(node); + // Do not allow concurrent connects. - GridFutureAdapter fut = new ConnectFuture(); + GridFutureAdapter fut = shmemPort != null ? new ShmemConnectFuture() : + new ConnectFuture(); GridFutureAdapter oldFut = clientFuts.putIfAbsent(nodeId, fut); @@ -1901,7 +1906,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient client0 = clients.get(nodeId); if (client0 == null) { - client0 = createNioClient(node); + client0 = createNioClient(node, shmemPort); if (client0 != null) { GridCommunicationClient old = clients.put(nodeId, client0); @@ -1969,16 +1974,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException { assert node != null; - Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT)); - - ClusterNode locNode = getSpiContext().localNode(); + return createNioClient(node, shmemPortToUse(node)); + } - if (locNode == null) - throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)"); + /** + * @param node Node to create client for. + * @param shmemPort Shared memory port to use or {@code null} if tcp client should be created instead. + * @return Client. + */ + @Nullable private GridCommunicationClient createNioClient(ClusterNode node, Integer shmemPort) + throws IgniteCheckedException { + assert node != null; - // If remote node has shared memory server enabled and has the same set of MACs - // then we are likely to run on the same host and shared memory communication could be tried. - if (shmemPort != null && U.sameMacs(locNode, node)) { + if (shmemPort != null) { try { return createShmemClient(node, shmemPort); } @@ -2096,6 +2104,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** + * Gets shared memory port to use for connection with {@code node}. + * + * @param node Node to open connection with. + * @return Shmem port or {@code null} if this kind of connection is not supported. + * @throws IgniteCheckedException If failed. + */ + private Integer shmemPortToUse(ClusterNode node) throws IgniteCheckedException { + assert node != null; + + Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT)); + + ClusterNode locNode = getSpiContext().localNode(); + + if (locNode == null) + throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)"); + + // If remote node has shared memory server enabled and has the same set of MACs + // then we are likely to run on the same host and shared memory communication could be tried. + return shmemPort != null && U.sameMacs(locNode, node) ? shmemPort : null; + } + + /** * Checks client message queue size and initiates client drop if message queue size exceeds the configured limit. * * @param ses Node communication session. @@ -3090,6 +3120,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * */ + private static class ShmemConnectFuture extends ConnectFuture { + /** */ + private static final long serialVersionUID = 0L; + + // No-op. + } + + /** + * + */ private static class HandshakeTimeoutObject implements IgniteSpiTimeoutObject { /** */ private final IgniteUuid id = IgniteUuid.randomUuid();