From b74323a640d2c5ebbe2c4e33f7676059e5e7e491 Mon Sep 17 00:00:00 2001 From: agura Date: Tue, 7 Apr 2015 02:03:50 +0300 Subject: [PATCH] ignite-688 Age related cluster group doesn't refresh dynamically --- .../internal/cluster/ClusterGroupAdapter.java | 150 ++++++++++++++++++--- .../ignite/internal/GridProjectionSelfTest.java | 82 +++++++++++ 2 files changed, 215 insertions(+), 17 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java index 0daffcc..e52bed4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java @@ -60,7 +60,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { private String gridName; /** Subject ID. */ - private UUID subjId; + protected UUID subjId; /** Cluster group predicate. */ protected IgnitePredicate p; @@ -320,12 +320,12 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { } /** {@inheritDoc} */ - @Override public final IgnitePredicate predicate() { + @Override public IgnitePredicate predicate() { return p != null ? p : F.alwaysTrue(); } /** {@inheritDoc} */ - @Override public final ClusterGroup forPredicate(IgnitePredicate p) { + @Override public ClusterGroup forPredicate(IgnitePredicate p) { A.notNull(p, "p"); guard(); @@ -657,7 +657,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { IgniteKernal g = IgnitionEx.gridx(gridName); return ids != null ? new ClusterGroupAdapter(g.context(), subjId, ids) : - p != null ? new ClusterGroupAdapter(g.context(), subjId, p) : g; + new ClusterGroupAdapter(g.context(), subjId, p); } catch (IllegalStateException e) { throw U.withCause(new InvalidObjectException(e.getMessage()), e); @@ -788,11 +788,8 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { /** Oldest flag. */ private boolean isOldest; - /** Selected node. */ - private volatile ClusterNode node; - - /** Last topology version. */ - private volatile long lastTopVer; + /** State. */ + private volatile AgeClusterGroupState state; /** * Required for {@link Externalizable}. @@ -806,7 +803,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { * @param isOldest Oldest flag. */ private AgeClusterGroup(ClusterGroupAdapter parent, boolean isOldest) { - super(parent.ctx, parent.subjId, (IgnitePredicate) null); + super(parent.ctx, parent.subjId, parent.p, parent.ids); this.isOldest = isOldest; @@ -820,10 +817,13 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { guard(); try { - lastTopVer = ctx.discovery().topologyVersion(); + long lastTopVer = ctx.discovery().topologyVersion(); + + ClusterNode node = isOldest ? U.oldest(super.nodes(), null) : U.youngest(super.nodes(), null); + + IgnitePredicate p = F.nodeForNodes(node); - this.node = isOldest ? U.oldest(super.nodes(), null) : U.youngest(super.nodes(), null); - this.p = F.nodeForNodes(node); + state = new AgeClusterGroupState(node, p, lastTopVer); } finally { unguard(); @@ -832,20 +832,136 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { /** {@inheritDoc} */ @Override public ClusterNode node() { - if (ctx.discovery().topologyVersion() != lastTopVer) + if (ctx.discovery().topologyVersion() != state.lastTopVer) reset(); - return node; + return state.node; } /** {@inheritDoc} */ @Override public Collection nodes() { - if (ctx.discovery().topologyVersion() != lastTopVer) + if (ctx.discovery().topologyVersion() != state.lastTopVer) reset(); - ClusterNode node = this.node; + ClusterNode node = state.node; return node == null ? Collections.emptyList() : Collections.singletonList(node); } + + /** {@inheritDoc} */ + @Override public IgnitePredicate predicate() { + if (ctx.discovery().topologyVersion() != state.lastTopVer) + reset(); + + return state.p; + } + + /** {@inheritDoc} */ + @Override public ClusterGroup forPredicate(IgnitePredicate p) { + A.notNull(p, "p"); + + guard(); + + try { + if (p != null) + ctx.resource().injectGeneric(p); + + return new ClusterGroupAdapter(ctx, this.subjId, new GroupPredicate(this, p)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeBoolean(isOldest); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + isOldest = in.readBoolean(); + } + + /** + * Reconstructs object on unmarshalling. + * + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. + */ + protected Object readResolve() throws ObjectStreamException { + ClusterGroupAdapter parent = (ClusterGroupAdapter)super.readResolve(); + + return new AgeClusterGroup(parent, isOldest); + } + } + + /** + * Container for age-based cluster group state. + */ + private static class AgeClusterGroupState { + /** Selected node. */ + private final ClusterNode node; + + /** Node predicate. */ + private final IgnitePredicate p; + + /** Last topology version. */ + private final long lastTopVer; + + /** + * @param node Node. + * @param p Predicate. + * @param lastTopVer Last topology version. + */ + public AgeClusterGroupState(ClusterNode node, IgnitePredicate p, long lastTopVer) { + this.node = node; + this.p = p; + this.lastTopVer = lastTopVer; + } + } + + /** + * Dynamic cluster group based predicate. + */ + private static class GroupPredicate implements IgnitePredicate { + /** */ + private static final long serialVersionUID = 0L; + + /** Target cluster group. */ + private final ClusterGroup grp; + + /** Predicate. */ + private final IgnitePredicate p; + + /** + * @param grp Cluster group. + * @param p Predicate. + */ + public GroupPredicate(ClusterGroup grp, IgnitePredicate p) { + this.grp = grp; + this.p = p; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + A.notNull(node, "node is null"); + + return grp.predicate().apply(node) && p.apply(node); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return getClass().getName() + + " [grp='" + grp.getClass().getName() + + "', p='" + p.getClass().getName() + "']"; + } } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java index 1fc5535..9fbad80 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; import org.apache.ignite.testframework.junits.common.*; import java.util.*; @@ -153,6 +155,74 @@ public class GridProjectionSelfTest extends GridProjectionAbstractTest { /** * @throws Exception If failed. */ + public void testForPredicate() throws Exception { + IgnitePredicate evenP = new IgnitePredicate() { + @Override public boolean apply(ClusterNode node) { + return node.order() % 2 == 0; + } + }; + + IgnitePredicate oddP = new IgnitePredicate() { + @Override public boolean apply(ClusterNode node) { + return node.order() % 2 == 1; + } + }; + + ClusterGroup remotes = ignite.cluster().forRemotes(); + + ClusterGroup evenYoungest = remotes.forPredicate(evenP).forYoungest(); + ClusterGroup evenOldest = remotes.forPredicate(evenP).forOldest(); + + ClusterGroup oddYoungest = remotes.forPredicate(oddP).forYoungest(); + ClusterGroup oddOldest = remotes.forPredicate(oddP).forOldest(); + + int clusterSize = ignite.cluster().nodes().size(); + + assertEquals(grid(gridMaxOrder(clusterSize, true)).localNode().id(), evenYoungest.node().id()); + assertEquals(grid(1).localNode().id(), evenOldest.node().id()); + + assertEquals(grid(gridMaxOrder(clusterSize, false)).localNode().id(), oddYoungest.node().id()); + assertEquals(grid(2).localNode().id(), oddOldest.node().id()); + + try (Ignite g4 = startGrid(NODES_CNT); + Ignite g5 = startGrid(NODES_CNT + 1)) + { + clusterSize = g4.cluster().nodes().size(); + + assertEquals(grid(gridMaxOrder(clusterSize, true)).localNode().id(), evenYoungest.node().id()); + assertEquals(grid(1).localNode().id(), evenOldest.node().id()); + + assertEquals(grid(gridMaxOrder(clusterSize, false)).localNode().id(), oddYoungest.node().id()); + assertEquals(grid(2).localNode().id(), oddOldest.node().id()); + } + } + + /** + * @throws Exception If failed. + */ + public void testAgeClusterGroupSerialization() throws Exception { + Marshaller marshaller = getConfiguration().getMarshaller(); + + ClusterGroup grp = ignite.cluster().forYoungest(); + ClusterNode node = grp.node(); + + byte[] arr = marshaller.marshal(grp); + + ClusterGroup obj = marshaller.unmarshal(arr, null); + + assertEquals(node.id(), obj.node().id()); + + try (Ignite ignore = startGrid()) { + obj = marshaller.unmarshal(arr, null); + + assertEquals(grp.node().id(), obj.node().id()); + assertFalse(node.id().equals(obj.node().id())); + } + } + + /** + * @throws Exception If failed. + */ public void testClientServer() throws Exception { ClusterGroup srv = ignite.cluster().forServers(); @@ -166,4 +236,16 @@ public class GridProjectionSelfTest extends GridProjectionAbstractTest { assertTrue(cli.nodes().contains(ignite(2).cluster().localNode())); assertTrue(cli.nodes().contains(ignite(3).cluster().localNode())); } + + /** + * @param cnt Count. + * @param even Even. + */ + private static int gridMaxOrder(int cnt, boolean even) { + assert cnt > 2; + + cnt = cnt - (cnt % 2); + + return even ? cnt - 1 : cnt - 2; + } } -- 1.9.5.msysgit.0