Index: ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java (revision f82fb5ca6d1c628d6e423e470305734ced31dc2b) +++ ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java (revision 325458125fe5cb34c1cf79b3e7ece161b6cff05c) @@ -32,6 +32,7 @@ TestSuite suite = new TestSuite("Communication SPI Test Suite"); suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckSelfTest.class)); + suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckFutureSelfTest.class)); suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoverySelfTest.class)); suite.addTest(new TestSuite(GridTcpCommunicationSpiConcurrentConnectSelfTest.class)); Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java (revision 5c3d82973cb80601540e6552c7b7b0b81c3f1fce) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java (revision e7bd078b4133022786271d748664419d1dee0a8f) @@ -36,6 +36,7 @@ import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.*; +import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.thread.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -974,6 +975,7 @@ * @param ordered Ordered flag. * @param timeout Timeout. * @param skipOnTimeout Whether message can be skipped on timeout. + * @param ackClosure Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ private void send( @@ -984,7 +986,8 @@ byte plc, boolean ordered, long timeout, - boolean skipOnTimeout + boolean skipOnTimeout, + IgniteInClosure ackClosure ) throws IgniteCheckedException { assert node != null; assert topic != null; @@ -1004,13 +1007,19 @@ processOrderedMessage(locNodeId, ioMsg, plc, null); else processRegularMessage0(ioMsg, locNodeId); + + if (ackClosure != null) + ackClosure.apply(null); } else { if (topicOrd < 0) ioMsg.topicBytes(marsh.marshal(topic)); try { + if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi) + ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessageWithAck(node, ioMsg, ackClosure); + else - getSpi().sendMessage(node, ioMsg); + getSpi().sendMessage(node, ioMsg); } catch (IgniteSpiException e) { throw new IgniteCheckedException("Failed to send message (node may have left the grid or " + @@ -1030,7 +1039,7 @@ *

* How to use it: *

    - *
  1. Replace {@link #send(ClusterNode, Object, int, Message, byte, boolean, long, boolean)} + *
  2. Replace {@link #send(ClusterNode, Object, int, Message, byte, boolean, long, boolean, IgniteInClosure)} * with this method.
  3. *
  4. Start all grids for your test, then set {@link #TURBO_DEBUG_MODE} to {@code true}.
  5. *
  6. Perform test operations on the topology. No network will be there.
  7. @@ -1132,7 +1141,7 @@ if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, topic.ordinal(), msg, plc, false, 0, false); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); } /** @@ -1144,7 +1153,7 @@ */ public void send(ClusterNode node, Object topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false); + send(node, topic, -1, msg, plc, false, 0, false, null); } /** @@ -1154,9 +1163,21 @@ * @param plc Type of processing. * @throws IgniteCheckedException Thrown in case of any errors. */ + public void sendWithAck(ClusterNode node, Object topic, Message msg, byte plc) + throws IgniteCheckedException { + send(node, topic, -1, msg, plc, false, 0, false, null); + } + + /** + * @param node Destination node. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc Type of processing. + * @throws IgniteCheckedException Thrown in case of any errors. + */ public void send(ClusterNode node, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); } /** @@ -1164,6 +1185,19 @@ * @param topic Topic to send the message to. * @param msg Message to send. * @param plc Type of processing. + * @param ackClosure Ack closure. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void sendWithAck(ClusterNode node, GridTopic topic, Message msg, byte plc, + IgniteInClosure ackClosure) throws IgniteCheckedException { + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackClosure); + } + + /** + * @param node Destination node. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc Type of processing. * @param timeout Timeout to keep a message on receiving queue. * @param skipOnTimeout Whether message can be skipped on timeout. * @throws IgniteCheckedException Thrown in case of any errors. @@ -1178,16 +1212,41 @@ ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); } /** + * @param node Destination node. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc Type of processing. + * @param timeout Timeout to keep a message on receiving queue. + * @param skipOnTimeout Whether message can be skipped on timeout. + * @param ackClosure Ack closure. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void sendOrderedMessageWithAck( + ClusterNode node, + Object topic, + Message msg, + byte plc, + long timeout, + boolean skipOnTimeout, + IgniteInClosure ackClosure + ) throws IgniteCheckedException { + assert timeout > 0 || skipOnTimeout; + + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); + } + + /** * @param nodeId Destination node. * @param topic Topic to send the message to. * @param msg Message to send. * @param plc Type of processing. * @param timeout Timeout to keep a message on receiving queue. * @param skipOnTimeout Whether message can be skipped on timeout. + * @param ackClosure Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ public void sendOrderedMessage( @@ -1196,7 +1255,8 @@ Message msg, byte plc, long timeout, - boolean skipOnTimeout + boolean skipOnTimeout, + IgniteInClosure ackClosure ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; @@ -1205,10 +1265,39 @@ if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); } /** + * @param nodeId Destination node. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc Type of processing. + * @param timeout Timeout to keep a message on receiving queue. + * @param skipOnTimeout Whether message can be skipped on timeout. + * @param ackClosure Ack closure. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void sendOrderedMessageWithAck( + UUID nodeId, + Object topic, + Message msg, + byte plc, + long timeout, + boolean skipOnTimeout, + IgniteInClosure ackClosure + ) throws IgniteCheckedException { + assert timeout > 0 || skipOnTimeout; + + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) + throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); + + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); + } + + /** * @param nodes Destination nodes. * @param topic Topic to send the message to. * @param msg Message to send. @@ -1416,7 +1505,7 @@ // messages to one node vs. many. if (!nodes.isEmpty()) { for (ClusterNode node : nodes) - send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout); + send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null); } else if (log.isDebugEnabled()) log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" + Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java (revision f82fb5ca6d1c628d6e423e470305734ced31dc2b) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java (revision e7bd078b4133022786271d748664419d1dee0a8f) @@ -19,6 +19,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; @@ -94,10 +95,12 @@ /** * @param nodeId Node ID (provided only if versions of local and remote nodes are different). * @param msg Message to send. + * @param closure Ack closure. * @throws IgniteCheckedException If failed. * @return {@code True} if should try to resend message. */ - public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException; + public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure closure) + throws IgniteCheckedException; /** * @return {@code True} if send is asynchronous. Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java (revision 5c3d82973cb80601540e6552c7b7b0b81c3f1fce) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java (revision e7bd078b4133022786271d748664419d1dee0a8f) @@ -19,6 +19,7 @@ import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; /** * Future that represents already completed result. @@ -54,6 +55,16 @@ /** {@inheritDoc} */ @Override public boolean skipRecovery() { return true; + } + + /** {@inheritDoc} */ + @Override public void ackClosure(IgniteInClosure closure) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteInClosure ackClosure() { + return null; } /** {@inheritDoc} */ Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java (revision 5c3d82973cb80601540e6552c7b7b0b81c3f1fce) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java (revision e7bd078b4133022786271d748664419d1dee0a8f) @@ -18,6 +18,7 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; /** * NIO future. @@ -39,4 +40,16 @@ * @return {@code True} if skip recovery for this operation. */ public boolean skipRecovery(); + + /** + * Sets ack closure which will be applied when ack recevied. + * + * @param closure Ack closure. + */ + public void ackClosure(IgniteInClosure closure); + + /** + * @return Ack closure. + */ + public IgniteInClosure ackClosure(); } Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java (revision 5c3d82973cb80601540e6552c7b7b0b81c3f1fce) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java (revision e7bd078b4133022786271d748664419d1dee0a8f) @@ -19,6 +19,7 @@ import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; /** * Default future implementation. @@ -30,6 +31,9 @@ /** */ protected boolean msgThread; + /** */ + protected IgniteInClosure ackClosure; + /** {@inheritDoc} */ @Override public void messageThread(boolean msgThread) { this.msgThread = msgThread; @@ -43,6 +47,16 @@ /** {@inheritDoc} */ @Override public boolean skipRecovery() { return false; + } + + /** {@inheritDoc} */ + @Override public void ackClosure(IgniteInClosure closure) { + ackClosure = closure; + } + + /** {@inheritDoc} */ + @Override public IgniteInClosure ackClosure() { + return ackClosure; } /** {@inheritDoc} */ Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java (revision f82fb5ca6d1c628d6e423e470305734ced31dc2b) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java (revision e7bd078b4133022786271d748664419d1dee0a8f) @@ -19,6 +19,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; @@ -182,6 +183,9 @@ assert fut.isDone() : fut; + if (fut.ackClosure() != null) + fut.ackClosure().apply(null); + acked++; } } @@ -358,8 +362,14 @@ * @param futs Futures to complete. */ private void completeOnNodeLeft(GridNioFuture[] futs) { - for (GridNioFuture msg : futs) - ((GridNioFutureImpl)msg).onDone(new IOException("Failed to send message, node has left: " + node.id())); + for (GridNioFuture msg : futs) { + IOException e = new IOException("Failed to send message, node has left: " + node.id()); + + ((GridNioFutureImpl)msg).onDone(e); + + if (msg.ackClosure() != null) + msg.ackClosure().apply(e); + } } /** {@inheritDoc} */ Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java (revision 5c3d82973cb80601540e6552c7b7b0b81c3f1fce) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java (revision e7bd078b4133022786271d748664419d1dee0a8f) @@ -21,6 +21,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.nio.ssl.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; @@ -390,6 +391,11 @@ assert fut != null; int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut); + + IgniteInClosure ackClosure; + + if (!sys && (ackClosure = ses.removeMeta(ACK_CLOSURE.ordinal())) != null) + fut.ackClosure(ackClosure); if (ses.closed()) { if (ses.removeFuture(fut)) Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java (revision 5c3d82973cb80601540e6552c7b7b0b81c3f1fce) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java (revision e7bd078b4133022786271d748664419d1dee0a8f) @@ -45,7 +45,10 @@ MSG_WRITER, /** SSL engine. */ - SSL_ENGINE; + SSL_ENGINE, + + /** Ack closure. */ + ACK_CLOSURE; /** Maximum count of NIO session keys in system. */ public static final int MAX_KEYS_CNT = 64; Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java (revision f82fb5ca6d1c628d6e423e470305734ced31dc2b) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java (revision e7bd078b4133022786271d748664419d1dee0a8f) @@ -21,6 +21,7 @@ import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; @@ -113,7 +114,8 @@ } /** {@inheritDoc} */ - @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg) + @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg, + IgniteInClosure closure) throws IgniteCheckedException { if (closed()) throw new IgniteCheckedException("Communication client was closed: " + this); Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java (revision f82fb5ca6d1c628d6e423e470305734ced31dc2b) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java (revision e7bd078b4133022786271d748664419d1dee0a8f) @@ -20,6 +20,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; @@ -27,6 +28,8 @@ import java.nio.*; import java.util.*; +import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; + /** * Grid client for NIO server. */ @@ -97,11 +100,14 @@ } /** {@inheritDoc} */ - @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg) + @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure closure) throws IgniteCheckedException { // Node ID is never provided in asynchronous send mode. assert nodeId == null; + if (closure != null) + ses.addMeta(ACK_CLOSURE.ordinal(), closure); + GridNioFuture fut = ses.send(msg); if (fut.isDone()) { @@ -109,6 +115,9 @@ fut.get(); } catch (IgniteCheckedException e) { + if (closure != null) + ses.removeMeta(ACK_CLOSURE.ordinal()); + if (log.isDebugEnabled()) log.debug("Failed to send message [client=" + this + ", err=" + e + ']'); @@ -118,6 +127,9 @@ throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e); } } + + if (closure != null) + ses.removeMeta(ACK_CLOSURE.ordinal()); return false; } Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java (revision f82fb5ca6d1c628d6e423e470305734ced31dc2b) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java (revision e7bd078b4133022786271d748664419d1dee0a8f) @@ -1350,7 +1350,7 @@ if (slowClientQueueLimit > 0 && msgQueueLimit > 0 && slowClientQueueLimit >= msgQueueLimit) { U.quietAndWarn(log, "Slow client queue limit is set to a value greater than message queue limit " + "(slow client queue limit will have no effect) [msgQueueLimit=" + msgQueueLimit + - ", slowClientQueueLimit=" + slowClientQueueLimit + ']'); + ", slowClientQueueLimit=" + slowClientQueueLimit + ']'); } registerMBean(gridName, this, TcpCommunicationSpiMBean.class); @@ -1555,6 +1555,7 @@ /** * Creates new shared memory communication server. + * * @return Server. * @throws IgniteCheckedException If failed. */ @@ -1696,16 +1697,50 @@ /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + sendMessage(node, msg, null); + } + + /** + * Sends given message to destination node. Note that characteristics of the + * exchange such as durability, guaranteed delivery or error notification is + * dependant on SPI implementation. + * + * @param node Destination node. + * @param msg Message to send. + * @param ackClosure Ack closure. + * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message. + * Note that this is not guaranteed that failed communication will result + * in thrown exception as this is dependant on SPI implementation. + */ + public void sendMessageWithAck(ClusterNode node, Message msg, IgniteInClosure ackClosure) + throws IgniteSpiException { + sendMessage(node, msg, ackClosure); + } + + /** + * @param node Destination node. + * @param msg Message to send. + * @param ackClosure Ack closure. + * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message. + * Note that this is not guaranteed that failed communication will result + * in thrown exception as this is dependant on SPI implementation. + */ + private void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackClosure) + throws IgniteSpiException { assert node != null; assert msg != null; if (log.isTraceEnabled()) - log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']'); + log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']'); UUID locNodeId = getLocalNodeId(); - if (node.id().equals(locNodeId)) + if (node.id().equals(locNodeId)) { notifyListener(locNodeId, msg, NOOP); + + if (ackClosure != null) + ackClosure.apply(null); + } else { GridCommunicationClient client = null; @@ -1720,7 +1755,7 @@ if (!client.async() && !getSpiContext().localNode().version().equals(node.version())) nodeId = node.id(); - retry = client.sendMessage(nodeId, msg); + retry = client.sendMessage(nodeId, msg, ackClosure); client.release(); @@ -1783,7 +1818,7 @@ GridCommunicationClient old = clients.put(nodeId, client0); assert old == null : "Client already created " + - "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']'; + "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']'; if (client0 instanceof GridTcpNioCommunicationClient) { GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0); @@ -1879,7 +1914,8 @@ * @return Client. * @throws IgniteCheckedException If failed. */ - @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, Integer port) throws IgniteCheckedException { + @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, + Integer port) throws IgniteCheckedException { int attempt = 1; int connectAttempts = 1; @@ -2086,6 +2122,7 @@ meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine); } + if (recoveryDesc != null) { recoveryDesc.onHandshake(rcvCnt); @@ -2285,7 +2322,7 @@ else if (log.isDebugEnabled()) log.debug("Received remote node ID: " + rmtNodeId0); - if (isSslEnabled() ) { + if (isSslEnabled()) { assert sslHnd != null; ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER))); Index: ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java (revision e7bd078b4133022786271d748664419d1dee0a8f) +++ ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java (revision e7bd078b4133022786271d748664419d1dee0a8f) @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.*; +import org.apache.ignite.testframework.junits.spi.*; + +import org.eclipse.jetty.util.*; + +import java.net.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") +public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest extends GridSpiAbstractTest { + /** */ + private static final Collection spiRsrcs = new ArrayList<>(); + + /** */ + protected static final List spis = new ArrayList<>(); + + /** */ + protected static final List nodes = new ArrayList<>(); + + /** */ + private static final int SPI_CNT = 2; + + /** + * + */ + static { + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO() { + @Override public Message apply() { + return new GridTestMessage(); + } + }); + } + + /** + * Disable SPI auto-start. + */ + public GridTcpCommunicationSpiRecoveryAckFutureSelfTest() { + super(false); + } + + /** */ + @SuppressWarnings({"deprecation"}) + private class TestListener implements CommunicationListener { + /** */ + private ConcurrentHashSet msgIds = new ConcurrentHashSet<>(); + + /** */ + private AtomicInteger rcvCnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) { + info("Test listener received message: " + msg); + + assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage); + + GridTestMessage msg0 = (GridTestMessage)msg; + + assertTrue("Duplicated message received: " + msg0, msgIds.add(msg0.getMsgId())); + + rcvCnt.incrementAndGet(); + + msgC.run(); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(UUID nodeId) { + // No-op. + } + } + + /** + * @throws Exception If failed. + */ + public void testAckOnIdle() throws Exception { + checkAck(10, 2000, 9); + } + + /** + * @throws Exception If failed. + */ + public void testAckOnCount() throws Exception { + checkAck(10, 60_000, 10); + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param msgPerIter Messages per iteration. + * @throws Exception If failed. + */ + private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception { + createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT); + + try { + TcpCommunicationSpi spi0 = spis.get(0); + TcpCommunicationSpi spi1 = spis.get(1); + + ClusterNode node0 = nodes.get(0); + ClusterNode node1 = nodes.get(1); + + int msgId = 0; + + int expMsgs = 0; + + for (int i = 0; i < 5; i++) { + info("Iteration: " + i); + + final AtomicInteger ackMsgs = new AtomicInteger(0); + + IgniteInClosure ackClosure = new CI1() { + @Override public void apply(Exception o) { + assert o == null; + + ackMsgs.incrementAndGet(); + } + }; + + for (int j = 0; j < msgPerIter; j++) { + spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + + spi1.sendMessageWithAck(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure); + } + + expMsgs += msgPerIter; + + for (TcpCommunicationSpi spi : spis) { + GridNioServer srv = U.field(spi, "nioSrvr"); + + Collection sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + assertFalse(sessions.isEmpty()); + + boolean found = false; + + for (GridNioSession ses : sessions) { + final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor(); + + if (recoveryDesc != null) { + found = true; + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return recoveryDesc.messagesFutures().isEmpty(); + } + }, 10_000); + + assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0, + recoveryDesc.messagesFutures().size()); + + break; + } + } + + assertTrue(found); + } + + final int expMsgs0 = expMsgs; + + for (TcpCommunicationSpi spi : spis) { + final TestListener lsnr = (TestListener)spi.getListener(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + return lsnr.rcvCnt.get() >= expMsgs0; + } + }, 5000); + + assertEquals(expMsgs, lsnr.rcvCnt.get()); + } + + assertEquals(msgPerIter * 2, ackMsgs.get()); + } + } + finally { + stopSpis(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueueOverflow() throws Exception { + for (int i = 0; i < 3; i++) { + try { + startSpis(5, 60_000, 10); + + checkOverflow(); + + break; + } + catch (IgniteCheckedException e) { + if (e.hasCause(BindException.class)) { + if (i < 2) { + info("Got exception caused by BindException, will retry after delay: " + e); + + stopSpis(); + + U.sleep(10_000); + } + else + throw e; + } + else + throw e; + } + finally { + stopSpis(); + } + } + } + + /** + * @throws Exception If failed. + */ + private void checkOverflow() throws Exception { + TcpCommunicationSpi spi0 = spis.get(0); + TcpCommunicationSpi spi1 = spis.get(1); + + ClusterNode node0 = nodes.get(0); + ClusterNode node1 = nodes.get(1); + + final GridNioServer srv1 = U.field(spi1, "nioSrvr"); + + final AtomicInteger ackMsgs = new AtomicInteger(0); + + IgniteInClosure ackClosure = new CI1() { + @Override public void apply(Exception o) { + assert o == null; + + ackMsgs.incrementAndGet(); + } + }; + + int msgId = 0; + + // Send message to establish connection. + spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + + // Prevent node1 from send + GridTestUtils.setFieldValue(srv1, "skipWrite", true); + + final GridNioSession ses0 = communicationSession(spi0); + + for (int i = 0; i < 150; i++) + spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + + // Wait when session is closed because of queue overflow. + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ses0.closeTime() != 0; + } + }, 5000); + + assertTrue("Failed to wait for session close", ses0.closeTime() != 0); + + GridTestUtils.setFieldValue(srv1, "skipWrite", false); + + for (int i = 0; i < 100; i++) + spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + + final int expMsgs = 251; + + final TestListener lsnr = (TestListener)spi1.getListener(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr.rcvCnt.get() >= expMsgs; + } + }, 5000); + + assertEquals(expMsgs, lsnr.rcvCnt.get()); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return expMsgs == ackMsgs.get(); + } + }, 5000); + } + + /** + * @param spi SPI. + * @return Session. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception { + final GridNioServer srv = U.field(spi, "nioSrvr"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + Collection sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + return !sessions.isEmpty(); + } + }, 5000); + + Collection sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + assertEquals(1, sessions.size()); + + return sessions.iterator().next(); + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @return SPI instance. + */ + protected TcpCommunicationSpi getSpi(int ackCnt, int idleTimeout, int queueLimit) { + TcpCommunicationSpi spi = new TcpCommunicationSpi(); + + spi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); + spi.setIdleConnectionTimeout(idleTimeout); + spi.setTcpNoDelay(true); + spi.setAckSendThreshold(ackCnt); + spi.setMessageQueueLimit(queueLimit); + spi.setSharedMemoryPort(-1); + + return spi; + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @throws Exception If failed. + */ + private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception { + spis.clear(); + nodes.clear(); + spiRsrcs.clear(); + + Map ctxs = new HashMap<>(); + + for (int i = 0; i < SPI_CNT; i++) { + TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit); + + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i); + + IgniteTestResources rsrcs = new IgniteTestResources(); + + GridTestNode node = new GridTestNode(rsrcs.getNodeId()); + + GridSpiTestContext ctx = initSpiContext(); + + ctx.setLocalNode(node); + + spiRsrcs.add(rsrcs); + + rsrcs.inject(spi); + + spi.setListener(new TestListener()); + + node.setAttributes(spi.getNodeAttributes()); + + nodes.add(node); + + spi.spiStart(getTestGridName() + (i + 1)); + + spis.add(spi); + + spi.onContextInitialized(ctx); + + ctxs.put(node, ctx); + } + + // For each context set remote nodes. + for (Map.Entry e : ctxs.entrySet()) { + for (ClusterNode n : nodes) { + if (!n.equals(e.getKey())) + e.getValue().remoteNodes().add(n); + } + } + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @throws Exception If failed. + */ + private void createSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception { + for (int i = 0; i < 3; i++) { + try { + startSpis(ackCnt, idleTimeout, queueLimit); + + break; + } + catch (IgniteCheckedException e) { + if (e.hasCause(BindException.class)) { + if (i < 2) { + info("Failed to start SPIs because of BindException, will retry after delay."); + + stopSpis(); + + U.sleep(10_000); + } + else + throw e; + } + else + throw e; + } + } + } + + /** + * @throws Exception If failed. + */ + private void stopSpis() throws Exception { + for (CommunicationSpi spi : spis) { + spi.onContextDestroyed(); + + spi.setListener(null); + + spi.spiStop(); + } + + for (IgniteTestResources rsrcs : spiRsrcs) + rsrcs.stopThreads(); + + spis.clear(); + nodes.clear(); + spiRsrcs.clear(); + } +}