Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java	(revision 51c8ff1c2fa61fe7b375179bb65894d1422a5da5)
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java	(date 1651151934197)
@@ -26,6 +26,7 @@
 import javax.cache.CacheException;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteTransactions;
@@ -56,8 +57,11 @@
  *
  */
 public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommonAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void afterTest() throws Exception {
         stopAllGrids();
 
         super.afterTest();
@@ -68,7 +72,7 @@
      */
     @Test
     public void testConcurrentUpdatePartitionAtomic() throws Exception {
-        concurrentUpdatePartition(ATOMIC, false);
+        concurrentUpdatePartition(ATOMIC, false, false);
     }
 
     /**
@@ -76,7 +80,7 @@
      */
     @Test
     public void testConcurrentUpdatePartitionTx() throws Exception {
-        concurrentUpdatePartition(TRANSACTIONAL, false);
+        concurrentUpdatePartition(TRANSACTIONAL, false, false);
     }
 
     /**
@@ -84,7 +88,7 @@
      */
     @Test
     public void testConcurrentUpdatePartitionMvccTx() throws Exception {
-        concurrentUpdatePartition(TRANSACTIONAL_SNAPSHOT, false);
+        concurrentUpdatePartition(TRANSACTIONAL_SNAPSHOT, false, false);
     }
 
     /**
@@ -92,7 +96,7 @@
      */
     @Test
     public void testConcurrentUpdatePartitionAtomicCacheGroup() throws Exception {
-        concurrentUpdatePartition(ATOMIC, true);
+        concurrentUpdatePartition(ATOMIC, true, false);
     }
 
     /**
@@ -100,7 +104,7 @@
      */
     @Test
     public void testConcurrentUpdatePartitionTxCacheGroup() throws Exception {
-        concurrentUpdatePartition(TRANSACTIONAL, true);
+        concurrentUpdatePartition(TRANSACTIONAL, true, false);
     }
 
     /**
@@ -108,20 +112,68 @@
      */
     @Test
     public void testConcurrentUpdatePartitionMvccTxCacheGroup() throws Exception {
-        concurrentUpdatePartition(TRANSACTIONAL_SNAPSHOT, true);
+        concurrentUpdatePartition(TRANSACTIONAL_SNAPSHOT, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConcurrentUpdatePartitionAtomicServer() throws Exception {
+        concurrentUpdatePartition(ATOMIC, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConcurrentUpdatePartitionTxServer() throws Exception {
+        concurrentUpdatePartition(TRANSACTIONAL, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConcurrentUpdatePartitionMvccTxServer() throws Exception {
+        concurrentUpdatePartition(TRANSACTIONAL_SNAPSHOT, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConcurrentUpdatePartitionAtomicCacheGroupServer() throws Exception {
+        concurrentUpdatePartition(ATOMIC, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConcurrentUpdatePartitionTxCacheGroupServer() throws Exception {
+        concurrentUpdatePartition(TRANSACTIONAL, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConcurrentUpdatePartitionMvccTxCacheGroupServer() throws Exception {
+        concurrentUpdatePartition(TRANSACTIONAL_SNAPSHOT, true, true);
     }
 
     /**
      * @param atomicityMode Cache atomicity mode.
-     * @param cacheGrp {@code True} if test cache multiple caches in the same group.
+     * @param cacheGrp      {@code True} if test cache multiple caches in the same group.
      * @throws Exception If failed.
      */
-    private void concurrentUpdatePartition(CacheAtomicityMode atomicityMode, boolean cacheGrp) throws Exception {
+    private void concurrentUpdatePartition(CacheAtomicityMode atomicityMode, boolean cacheGrp, boolean useServerGrid) throws Exception {
         Ignite srv = startGrid(0);
         Ignite client = startClientGrid(1);
 
         List<AtomicInteger> cntrs = new ArrayList<>();
-        List<String> caches = new ArrayList<>();
+        List<IgniteCache<Object, Object>> caches = new ArrayList<>();
 
         if (cacheGrp) {
             for (int i = 0; i < 3; i++) {
@@ -131,27 +183,35 @@
                 ccfg.setWriteSynchronizationMode(FULL_SYNC);
                 ccfg.setAtomicityMode(atomicityMode);
 
-                IgniteCache<Object, Object> cache = client.createCache(ccfg);
+                IgniteCache<Object, Object> cache;
 
-                caches.add(cache.getName());
+                if (useServerGrid) cache = srv.createCache(ccfg);
+                else cache = client.createCache(ccfg);
+
+                caches.add(cache);
 
                 cntrs.add(startListener(cache).get1());
             }
-        }
-        else {
+        } else {
             CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
 
             ccfg.setWriteSynchronizationMode(FULL_SYNC);
             ccfg.setAtomicityMode(atomicityMode);
 
-            IgniteCache<Object, Object> cache = client.createCache(ccfg);
+            IgniteCache<Object, Object> cache;
 
-            caches.add(cache.getName());
+            if (useServerGrid) cache = srv.createCache(ccfg);
+            else cache = client.createCache(ccfg);
+
+            caches.add(cache);
 
             cntrs.add(startListener(cache).get1());
         }
 
-        Affinity<Integer> aff = srv.affinity(caches.get(0));
+        Affinity<Integer> aff;
+        if (useServerGrid) aff = srv.affinity(caches.get(0).getName());
+        else aff = client.affinity(caches.get(0).getName());
+
 
         final List<Integer> keys = new ArrayList<>();
 
@@ -171,24 +231,21 @@
         final int THREADS = SF.applyLB(10, 4);
         final int UPDATES = SF.applyLB(1000, 100);
 
-        final List<IgniteCache<Object, Object>> srvCaches = new ArrayList<>();
-
-        for (String cacheName : caches)
-            srvCaches.add(srv.cache(cacheName));
 
         for (int i = 0; i < SF.applyLB(15, 5); i++) {
             log.info("Iteration: " + i);
 
             GridTestUtils.runMultiThreaded(new Callable<Void>() {
-                @Override public Void call() throws Exception {
+                @Override
+                public Void call() throws Exception {
                     ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
                     for (int i = 0; i < UPDATES; i++) {
-                        for (int c = 0; c < srvCaches.size(); c++) {
+                        for (int c = 0; c < caches.size(); c++) {
                             if (atomicityMode == ATOMIC)
-                                srvCaches.get(c).put(keys.get(rnd.nextInt(KEYS)), i);
+                                caches.get(c).put(keys.get(rnd.nextInt(KEYS)), i);
                             else {
-                                IgniteCache<Object, Object> cache0 = srvCaches.get(c);
+                                IgniteCache<Object, Object> cache0 = caches.get(c);
                                 IgniteTransactions txs = cache0.unwrap(Ignite.class).transactions();
 
                                 boolean committed = false;
@@ -200,8 +257,7 @@
                                         tx.commit();
 
                                         committed = true;
-                                    }
-                                    catch (CacheException e) {
+                                    } catch (CacheException e) {
                                         assertTrue(e.getCause() instanceof TransactionSerializationException);
                                         assertEquals(atomicityMode, TRANSACTIONAL_SNAPSHOT);
                                     }
@@ -216,7 +272,8 @@
 
             for (final AtomicInteger evtCnt : cntrs) {
                 GridTestUtils.waitForCondition(new GridAbsPredicate() {
-                    @Override public boolean apply() {
+                    @Override
+                    public boolean apply() {
                         log.info("Events: " + evtCnt.get());
 
                         return evtCnt.get() >= THREADS * UPDATES;
@@ -240,12 +297,13 @@
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
         qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
-            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            @Override
+            public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
                 for (CacheEntryEvent evt : evts) {
                     assertNotNull(evt.getKey());
                     assertNotNull(evt.getValue());
 
-                    if ((Integer)evt.getValue() >= 0)
+                    if ((Integer) evt.getValue() >= 0)
                         evtCnt.incrementAndGet();
                 }
             }
@@ -261,7 +319,7 @@
      */
     @Test
     public void testConcurrentUpdatesAndQueryStartAtomic() throws Exception {
-        concurrentUpdatesAndQueryStart(ATOMIC, false);
+        concurrentUpdatesAndQueryStart(ATOMIC, false, false);
     }
 
     /**
@@ -269,7 +327,7 @@
      */
     @Test
     public void testConcurrentUpdatesAndQueryStartTx() throws Exception {
-        concurrentUpdatesAndQueryStart(TRANSACTIONAL, false);
+        concurrentUpdatesAndQueryStart(TRANSACTIONAL, false, false);
     }
 
     /**
@@ -277,7 +335,7 @@
      */
     @Test
     public void testConcurrentUpdatesAndQueryStartMvccTx() throws Exception {
-        concurrentUpdatesAndQueryStart(TRANSACTIONAL_SNAPSHOT, false);
+        concurrentUpdatesAndQueryStart(TRANSACTIONAL_SNAPSHOT, false, false);
     }
 
     /**
@@ -285,7 +343,7 @@
      */
     @Test
     public void testConcurrentUpdatesAndQueryStartAtomicCacheGroup() throws Exception {
-        concurrentUpdatesAndQueryStart(ATOMIC, true);
+        concurrentUpdatesAndQueryStart(ATOMIC, true, false);
     }
 
     /**
@@ -293,7 +351,7 @@
      */
     @Test
     public void testConcurrentUpdatesAndQueryStartTxCacheGroup() throws Exception {
-        concurrentUpdatesAndQueryStart(TRANSACTIONAL, true);
+        concurrentUpdatesAndQueryStart(TRANSACTIONAL, true, false);
     }
 
     /**
@@ -301,45 +359,100 @@
      */
     @Test
     public void testConcurrentUpdatesAndQueryStartMvccTxCacheGroup() throws Exception {
-        concurrentUpdatesAndQueryStart(TRANSACTIONAL_SNAPSHOT, true);
+        concurrentUpdatesAndQueryStart(TRANSACTIONAL_SNAPSHOT, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConcurrentUpdatesAndQueryStartAtomicServer() throws Exception {
+        concurrentUpdatesAndQueryStart(ATOMIC, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConcurrentUpdatesAndQueryStartTxServer() throws Exception {
+        concurrentUpdatesAndQueryStart(TRANSACTIONAL, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConcurrentUpdatesAndQueryStartMvccTxServer() throws Exception {
+        concurrentUpdatesAndQueryStart(TRANSACTIONAL_SNAPSHOT, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConcurrentUpdatesAndQueryStartAtomicCacheGroupServer() throws Exception {
+        concurrentUpdatesAndQueryStart(ATOMIC, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConcurrentUpdatesAndQueryStartTxCacheGroupServer() throws Exception {
+        concurrentUpdatesAndQueryStart(TRANSACTIONAL, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConcurrentUpdatesAndQueryStartMvccTxCacheGroupServer() throws Exception {
+        concurrentUpdatesAndQueryStart(TRANSACTIONAL_SNAPSHOT, true, true);
     }
 
     /**
      * @param atomicityMode Cache atomicity mode.
-     * @param cacheGrp {@code True} if test cache multiple caches in the same group.
+     * @param cacheGrp      {@code True} if test cache multiple caches in the same group.
      * @throws Exception If failed.
      */
-    private void concurrentUpdatesAndQueryStart(CacheAtomicityMode atomicityMode, boolean cacheGrp) throws Exception {
+    private void concurrentUpdatesAndQueryStart(CacheAtomicityMode atomicityMode, boolean cacheGrp, boolean useServerGrid) throws Exception {
         Ignite srv = startGrid(0);
         Ignite client = startClientGrid(1);
 
-        List<String> caches = new ArrayList<>();
+        final List<IgniteCache<Object, Object>> caches = new ArrayList<>();
 
         if (cacheGrp) {
             for (int i = 0; i < 3; i++) {
-                CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME + i);
+                CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME + i);
 
                 ccfg.setGroupName("testGroup");
                 ccfg.setWriteSynchronizationMode(FULL_SYNC);
                 ccfg.setAtomicityMode(atomicityMode);
 
-                IgniteCache<?, ?> cache = client.createCache(ccfg);
+                IgniteCache<Object, Object> cache = null;
 
-                caches.add(cache.getName());
+                if (useServerGrid) cache = srv.createCache(ccfg);
+                else cache = client.createCache(ccfg);
+
+                caches.add(cache);
             }
-        }
-        else {
-            CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+        } else {
+            CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
 
             ccfg.setWriteSynchronizationMode(FULL_SYNC);
             ccfg.setAtomicityMode(atomicityMode);
 
-            IgniteCache<?, ?> cache = client.createCache(ccfg);
+            IgniteCache<Object, Object> cache;
 
-            caches.add(cache.getName());
+            if (useServerGrid) cache = srv.createCache(ccfg);
+            else cache = client.createCache(ccfg);
+
+            caches.add(cache);
         }
 
-        Affinity<Integer> aff = srv.affinity(caches.get(0));
+        Affinity<Integer> aff;
+        if (useServerGrid) aff = srv.affinity(caches.get(0).getName());
+        else aff = client.affinity(caches.get(0).getName());
 
         final List<Integer> keys = new ArrayList<>();
 
@@ -359,10 +472,6 @@
         final int THREADS = 10;
         final int UPDATES = 1000;
 
-        final List<IgniteCache<Object, Object>> srvCaches = new ArrayList<>();
-
-        for (String cacheName : caches)
-            srvCaches.add(srv.cache(cacheName));
 
         for (int i = 0; i < SF.applyLB(5, 2); i++) {
             log.info("Iteration: " + i);
@@ -373,27 +482,27 @@
 
             try {
                 IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
-                    @Override public Void call() throws Exception {
+                    @Override
+                    public Void call() throws Exception {
                         ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
                         while (!stop.get()) {
-                            for (IgniteCache<Object, Object> srvCache : srvCaches) {
+                            for (IgniteCache<Object, Object> cache : caches) {
                                 if (atomicityMode == ATOMIC)
-                                    srvCache.put(keys.get(rnd.nextInt(KEYS)), rnd.nextInt(100) - 200);
+                                    cache.put(keys.get(rnd.nextInt(KEYS)), rnd.nextInt(100) - 200);
                                 else {
-                                    IgniteTransactions txs = srvCache.unwrap(Ignite.class).transactions();
+                                    IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
 
                                     boolean committed = false;
 
                                     while (!committed) {
                                         try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                                            srvCache.put(keys.get(rnd.nextInt(KEYS)), rnd.nextInt(100) - 200);
+                                            cache.put(keys.get(rnd.nextInt(KEYS)), rnd.nextInt(100) - 200);
 
                                             tx.commit();
 
                                             committed = true;
-                                        }
-                                        catch (CacheException e) {
+                                        } catch (CacheException e) {
                                             assertTrue(e.getCause() instanceof TransactionSerializationException);
                                             assertEquals(atomicityMode, TRANSACTIONAL_SNAPSHOT);
                                         }
@@ -408,42 +517,41 @@
 
                 U.sleep(1000);
 
-                for (String cache : caches)
+                for (IgniteCache<Object, Object> cache : caches)
                     for (int l = 0; l < 10; l++)
-                        qrys.add(startListener(client.cache(cache)));
+                        qrys.add(startListener(cache));
 
                 U.sleep(1000);
 
                 stop.set(true);
 
                 fut.get();
-            }
-            finally {
+            } finally {
                 stop.set(true);
             }
 
             GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
-                @Override public Void call() throws Exception {
+                @Override
+                public Void call() throws Exception {
                     ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
                     for (int i = 0; i < UPDATES; i++) {
-                        for (IgniteCache<Object, Object> srvCache : srvCaches) {
+                        for (IgniteCache<Object, Object> cache : caches) {
                             if (atomicityMode == ATOMIC)
-                                srvCache.put(keys.get(rnd.nextInt(KEYS)), i);
+                                cache.put(keys.get(rnd.nextInt(KEYS)), i);
                             else {
-                                IgniteTransactions txs = srvCache.unwrap(Ignite.class).transactions();
+                                IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
 
                                 boolean committed = false;
 
                                 while (!committed) {
                                     try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                                        srvCache.put(keys.get(rnd.nextInt(KEYS)), i);
+                                        cache.put(keys.get(rnd.nextInt(KEYS)), i);
 
                                         tx.commit();
 
                                         committed = true;
-                                    }
-                                    catch (CacheException e) {
+                                    } catch (CacheException e) {
                                         assertTrue(e.getCause() instanceof TransactionSerializationException);
                                         assertEquals(atomicityMode, TRANSACTIONAL_SNAPSHOT);
                                     }
@@ -460,7 +568,8 @@
                 final AtomicInteger evtCnt = qry.get1();
 
                 GridTestUtils.waitForCondition(new GridAbsPredicate() {
-                    @Override public boolean apply() {
+                    @Override
+                    public boolean apply() {
                         log.info("Events: " + evtCnt.get());
 
                         return evtCnt.get() >= THREADS * UPDATES;
