Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java (revision 12fd4976f482ebc43831754645e34042c9073b2d) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java (revision ) @@ -21,22 +21,19 @@ import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; -import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -51,6 +48,13 @@ /** */ private boolean client; + /** Always false filter. */ + private static class AlwaysFilter implements P1 { + @Override public boolean apply(ClusterNode node) { + return false; + } + } + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -62,6 +66,7 @@ ccfg.setCacheMode(PARTITIONED); ccfg.setAtomicityMode(ATOMIC); ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setNodeFilter(new AlwaysFilter()); cfg.setCacheConfiguration(ccfg); @@ -80,7 +85,7 @@ /** * @throws Exception If failed. */ - public void testNodeJoins() throws Exception { + public void testNodeFilter() throws Exception { startGrids(2); client = true; @@ -98,153 +103,6 @@ qry.setLocalListener(lsnr); QueryCursor cur = clientNode.cache(null).query(qry); - - for (int i = 0; i < 10; i++) { - log.info("Start iteration: " + i); - - lsnr.latch = new CountDownLatch(1); - - Ignite joined1 = startGrid(4); - - IgniteCache joinedCache1 = joined1.cache(null); - - joinedCache1.put(primaryKey(joinedCache1), 1); - - assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); - - lsnr.latch = new CountDownLatch(1); - - Ignite joined2 = startGrid(5); - - IgniteCache joinedCache2 = joined2.cache(null); - - joinedCache2.put(primaryKey(joinedCache2), 2); - - assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); - - stopGrid(4); - - stopGrid(5); - } - - cur.close(); - } - - /** - * @throws Exception If failed. - */ - public void testNodeJoinsRestartQuery() throws Exception { - startGrids(2); - - client = true; - - final int CLIENT_ID = 3; - - Ignite clientNode = startGrid(CLIENT_ID); - - client = false; - - for (int i = 0; i < 10; i++) { - log.info("Start iteration: " + i); - - final CacheEventListener lsnr = new CacheEventListener(); - - ContinuousQuery qry = new ContinuousQuery<>(); - - qry.setLocalListener(lsnr); - - QueryCursor cur = clientNode.cache(null).query(qry); - - lsnr.latch = new CountDownLatch(1); - - Ignite joined1 = startGrid(4); - - IgniteCache joinedCache1 = joined1.cache(null); - - joinedCache1.put(primaryKey(joinedCache1), 1); - - assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); - - cur.close(); - - lsnr.latch = new CountDownLatch(1); - - Ignite joined2 = startGrid(5); - - IgniteCache joinedCache2 = joined2.cache(null); - - joinedCache2.put(primaryKey(joinedCache2), 2); - - assertFalse("Unexpected event received.", GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return 1 != lsnr.latch.getCount(); - } - }, 1000)); - - stopGrid(4); - - stopGrid(5); - } - } - - /** - * @throws Exception If failed. - */ - public void testServerNodeLeft() throws Exception { - startGrids(3); - - client = true; - - final int CLIENT_ID = 3; - - Ignite clnNode = startGrid(CLIENT_ID); - - client = false; - - IgniteOutClosure> rndCache = - new IgniteOutClosure>() { - int cnt = 0; - - @Override public IgniteCache apply() { - ++cnt; - - return grid(CLIENT_ID).cache(null); - } - }; - - final CacheEventListener lsnr = new CacheEventListener(); - - ContinuousQuery qry = new ContinuousQuery<>(); - - qry.setLocalListener(lsnr); - - QueryCursor cur = clnNode.cache(null).query(qry); - - boolean first = true; - - int keyCnt = 1; - - for (int i = 0; i < 10; i++) { - log.info("Start iteration: " + i); - - if (first) - first = false; - else { - for (int srv = 0; srv < CLIENT_ID - 1; srv++) - startGrid(srv); - } - - lsnr.latch = new CountDownLatch(keyCnt); - - for (int key = 0; key < keyCnt; key++) - rndCache.apply().put(key, key); - - assertTrue("Failed to wait for event. Left events: " + lsnr.latch.getCount(), - lsnr.latch.await(10, SECONDS)); - - for (int srv = 0; srv < CLIENT_ID - 1; srv++) - stopGrid(srv); - } cur.close(); } \ No newline at end of file