From a990acfa4a394eaae8f67446ef7a1d0bc0de4b97 Mon Sep 17 00:00:00 2001 From: Andrey Gura Date: Wed, 1 Apr 2015 16:41:17 +0300 Subject: [PATCH] ignite-328 IgniteMessaging.remoteListen ignores forOldest() projection --- .../internal/cluster/ClusterGroupAdapter.java | 3 +- .../ignite/messaging/GridMessagingSelfTest.java | 50 ++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java index c1d31af..0daffcc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java @@ -806,7 +806,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { * @param isOldest Oldest flag. */ private AgeClusterGroup(ClusterGroupAdapter parent, boolean isOldest) { - super(parent.ctx, parent.subjId, parent.p, parent.ids); + super(parent.ctx, parent.subjId, (IgnitePredicate) null); this.isOldest = isOldest; @@ -823,6 +823,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { lastTopVer = ctx.discovery().topologyVersion(); this.node = isOldest ? U.oldest(super.nodes(), null) : U.youngest(super.nodes(), null); + this.p = F.nodeForNodes(node); } finally { unguard(); diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java index 4c844fc..c033750 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java @@ -67,6 +67,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest { /** */ private static final Integer I_TOPIC_2 = 2; + /** Message count. */ + private static AtomicInteger MSG_CNT; + /** */ public static final String EXT_RESOURCE_CLS_NAME = "org.apache.ignite.tests.p2p.TestUserResource"; @@ -161,6 +164,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { + MSG_CNT = new AtomicInteger(); + ignite1 = startGrid(1); ignite2 = startGrid(2); } @@ -1088,4 +1093,49 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest { assertEquals(1, msgCnt.get()); } + + /** + * Tests that message listener registers only for one oldest node. + * + * @throws Exception If an error occurred. + */ + public void testRemoteListenForOldest() throws Exception { + remoteListenForOldest(ignite1); + + // Restart oldest node. + stopGrid(1); + + ignite1 = startGrid(1); + + MSG_CNT.set(0); + + // Ignite2 is oldest now. + remoteListenForOldest(ignite2); + } + + /** + * @param expOldestIgnite Expected oldest ignite. + */ + private void remoteListenForOldest(Ignite expOldestIgnite) throws InterruptedException { + ClusterGroup grp = ignite1.cluster().forOldest(); + + assertEquals(1, grp.nodes().size()); + assertEquals(expOldestIgnite.cluster().localNode().id(), grp.node().id()); + + ignite1.message(grp).remoteListen(null, new P2() { + @Override public boolean apply(UUID nodeId, Object msg) { + System.out.println("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); + + MSG_CNT.incrementAndGet(); + + return true; + } + }); + + ignite1.message().send(null, MSG_1); + + Thread.sleep(3000); + + assertEquals(1, MSG_CNT.get()); + } } -- 1.9.5.msysgit.0