Index: modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java (revision db343b649e4289ac28b769a741eee7ea77db8018) +++ modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java (revision 0433f4c6b30b12abbfd363eae2a0aba9bf3b3efe) @@ -77,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; @@ -655,6 +656,9 @@ topSnap.set(new Snapshot(snapshot.topVer, discoCache)); + if (GridClosureProcessor.awaitTaskExecute != null) + GridClosureProcessor.awaitTaskExecute.countDown(); + incMinorTopVer = false; } else { @@ -718,6 +722,9 @@ ", evt=" + U.gridEventName(type) + ']'; topSnap.set(new Snapshot(nextTopVer, discoCache)); + + if (GridClosureProcessor.awaitTaskExecute != null) + GridClosureProcessor.awaitTaskExecute.countDown(); } else // Current version. Index: modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java (revision db343b649e4289ac28b769a741eee7ea77db8018) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java (revision 0433f4c6b30b12abbfd363eae2a0aba9bf3b3efe) @@ -23,10 +23,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.binary.BinaryRawReader; import org.apache.ignite.binary.BinaryRawWriter; @@ -511,6 +513,14 @@ ctx.task().setThreadContext(TC_SUBGRID, nodes); + if (awaitTaskExecute != null) + try { + awaitTaskExecute.await(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + return ctx.task().execute(new T5(node, job, cacheNames, partId, mapTopVer), null, false, execName); } @@ -519,6 +529,8 @@ } } + public static volatile CountDownLatch awaitTaskExecute; + /** * @param cacheNames Cache names. * @param partId Partition. Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java (revision db343b649e4289ac28b769a741eee7ea77db8018) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java (revision 0433f4c6b30b12abbfd363eae2a0aba9bf3b3efe) @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCompute; @@ -28,6 +29,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteRunnable; @@ -50,7 +52,7 @@ private static final String CACHE_NAME = "myCache"; /** */ - private static final int SRVS = 4; + private static final int SRVS = 32; /** */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); @@ -126,6 +128,8 @@ AffinityTopologyVersion topVer = grid(0).context().discovery().topologyVersionEx(); + GridClosureProcessor.awaitTaskExecute = new CountDownLatch(1); + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { @Override public Object call() throws Exception { U.sleep(500);