Index: ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java (revision aed83af5f76c47bc9e4d0e8f60955fc6c6b42aac) +++ ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java (revision ) @@ -20,6 +20,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; @@ -60,6 +61,13 @@ return cfg; } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + /** * @throws Exception If failed. */ @@ -80,15 +88,27 @@ QueryCursor cur = clientNode.cache(null).query(qry); - Ignite joined = startGrid(4); + Ignite joined1 = startGrid(4); - IgniteCache joinedCache = joined.cache(null); + IgniteCache joinedCache1 = joined1.cache(null); - joinedCache.put(primaryKey(joinedCache), 1); + joinedCache1.put(primaryKey(joinedCache1), 1); assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); cur.close(); + + lsnr.latch = new CountDownLatch(1); + + Ignite joined2 = startGrid(5); + + IgniteCache joinedCache2 = joined2.cache(null); + + joinedCache2.put(primaryKey(joinedCache2), 2); + + U.sleep(1000); + + assertEquals("Unexpected event received.", 1, lsnr.latch.getCount()); } /** @@ -96,7 +116,7 @@ */ private static class CacheEventListener implements CacheEntryUpdatedListener { /** */ - private final CountDownLatch latch = new CountDownLatch(1); + private volatile CountDownLatch latch = new CountDownLatch(1); /** */ @LoggerResource Index: ../incubator-ignite/scripts/git-format-patch.sh IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/scripts/git-format-patch.sh (revision aed83af5f76c47bc9e4d0e8f60955fc6c6b42aac) +++ ../incubator-ignite/scripts/git-format-patch.sh (revision ) @@ -20,7 +20,7 @@ # Git patch-file maker. # echo 'Usage: scripts/git-format-patch.sh [-ih|--ignitehome ] [-idb|--ignitedefbranch ] [-ph|--patchhome ]' -echo 'It is a script to create patch between Current branch (branch with changes) and Default branche. The script is safe and do not broke or lose your changes.' +echo 'It is a script to create patch between Current branch (branch with changes) and Default branch. The script is safe and does not break or lose your changes.' echo "It should be called from IGNITE_HOME directory." echo "Patch will be created at PATCHES_HOME (= IGNITE_HOME, by default) between Default branch (IGNITE_DEFAULT_BRANCH) and Current branch." echo "Note: you can use ${IGNITE_HOME}/scripts/git-patch-prop-local.sh to set your own local properties (to rewrite settings at git-patch-prop-local.sh). " Index: ../incubator-ignite/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java (revision aed83af5f76c47bc9e4d0e8f60955fc6c6b42aac) +++ ../incubator-ignite/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java (revision ) @@ -98,6 +98,7 @@ suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class); + suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class); // Reduce fields queries. suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class); Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java (revision aed83af5f76c47bc9e4d0e8f60955fc6c6b42aac) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java (revision ) @@ -193,10 +193,10 @@ unregisterRemote(routineId); if (snd.isClient()) { - Map infoMap = clientInfos.get(snd.id()); + Map clientRouteMap = clientInfos.get(snd.id()); - if (infoMap != null) - infoMap.remove(msg.routineId()); + if (clientRouteMap != null) + clientRouteMap.remove(msg.routineId()); } } } @@ -370,6 +370,34 @@ } for (Map.Entry> entry : data.clientInfos.entrySet()) { + UUID clientNodeId = entry.getKey(); + + Map clientRouteMap = entry.getValue(); + + for (Map.Entry e : clientRouteMap.entrySet()) { + UUID routineId = e.getKey(); + LocalRoutineInfo info = e.getValue(); + + try { + if (info.prjPred != null) + ctx.resource().injectGeneric(info.prjPred); + + if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) { + if (registerHandler(clientNodeId, + routineId, + info.hnd, + info.bufSize, + info.interval, + info.autoUnsubscribe, + false)) + info.hnd.onListenerRegistered(routineId, ctx); + } + } + catch (IgniteCheckedException err) { + U.error(log, "Failed to register continuous handler.", err); + } + } + Map map = clientInfos.get(entry.getKey()); if (map == null) {