Index: modules/core/src/main/java/org/gridgain/grid/cache/store/local/GridCacheFileLocalStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/gridgain/grid/cache/store/local/GridCacheFileLocalStore.java (revision aed9340bf948c07897cd70e49dc71e841f08a53b) +++ modules/core/src/main/java/org/gridgain/grid/cache/store/local/GridCacheFileLocalStore.java (revision a11786eb3bf1381a6d73e4036d5ea18a6d86a303) @@ -654,7 +654,7 @@ * @return Consistent ID. */ String consistentId() { - return G.ignite(ignite.name()).cluster().localNode().consistentId().toString(); + return ignite.cluster().localNode().consistentId().toString(); } /** Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java (revision e86c69ee8ecbe4ef1ecb5dfa903f40f636a1b0de) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java (revision c3b7db8eacb778d6ab303265941f59cf5094115a) @@ -3130,6 +3130,15 @@ cctx.dataStructures().onEntryUpdated(key, false); } + if (cctx.store().isLocalStore()) { + if (val != null || valBytes != null) { + if (val == null) + val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader()); + + cctx.store().putToStore(null, key, val, ver); + } + } + return true; } Index: ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java (revision f33c07486ff932672b838604af3acb67397f34fc) +++ ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java (revision f33c07486ff932672b838604af3acb67397f34fc) @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class GridCachePartitionedLocalStoreSelfTest extends GridCacheAbstractLocalStoreSelfTest { + /** + * + */ + public GridCachePartitionedLocalStoreSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDistributionMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return PARTITIONED; + } +} Index: ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java (revision f33c07486ff932672b838604af3acb67397f34fc) +++ ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java (revision f33c07486ff932672b838604af3acb67397f34fc) @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class GridCacheReplicatedLocalStoreSelfTest extends GridCacheAbstractLocalStoreSelfTest { + /** + * + */ + public GridCacheReplicatedLocalStoreSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDistributionMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return REPLICATED; + } +} Index: ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java (revision f33c07486ff932672b838604af3acb67397f34fc) +++ ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java (revision f33c07486ff932672b838604af3acb67397f34fc) @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class GridCacheTxPartitionedLocalStoreSelfTest extends GridCacheAbstractLocalStoreSelfTest { + /** + * + */ + public GridCacheTxPartitionedLocalStoreSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDistributionMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return PARTITIONED; + } +} Index: ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java (revision e86c69ee8ecbe4ef1ecb5dfa903f40f636a1b0de) +++ ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java (revision f33c07486ff932672b838604af3acb67397f34fc) @@ -138,6 +138,10 @@ suite.addTestSuite(GridCacheOffHeapTieredAtomicSelfTest.class); suite.addTestSuite(GridCacheOffHeapTieredSelfTest.class); suite.addTestSuite(GridCacheGlobalLoadTest.class); + suite.addTestSuite(GridCachePartitionedLocalStoreSelfTest.class); + suite.addTestSuite(GridCacheReplicatedLocalStoreSelfTest.class); + suite.addTestSuite(GridCachePartitionedOffHeapLocalStoreSelfTest.class); + suite.addTestSuite(GridCacheTxPartitionedLocalStoreSelfTest.class); // Heuristic exception handling. TODO IGNITE-257 // suite.addTestSuite(GridCacheColocatedTxExceptionSelfTest.class); @@ -393,8 +397,7 @@ // suite.addTestSuite(IgniteCacheInvokeReadThroughTest.class); // suite.addTestSuite(GridCacheVersionMultinodeTest.class); - // TODO IGNITE-285. - // suite.addTestSuite(IgniteCacheNearReadCommittedTest.class); + suite.addTestSuite(IgniteCacheNearReadCommittedTest.class); return suite; } Index: ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedOffHeapLocalStoreSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedOffHeapLocalStoreSelfTest.java (revision 8899128028dd3f84b93394f3ef425c24d73e8d67) +++ ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedOffHeapLocalStoreSelfTest.java (revision 8899128028dd3f84b93394f3ef425c24d73e8d67) @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class GridCachePartitionedOffHeapLocalStoreSelfTest extends GridCacheAbstractLocalStoreSelfTest { + /** + * + */ + public GridCachePartitionedOffHeapLocalStoreSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDistributionMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected boolean isOffHeapTieredMode() { + return true; + } +} Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java (revision 75ba3e44cd8cb4268a26203f39736ae1db420ae5) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java (revision 3a466e8cbdbde90bbc4debb5f9b998621e2804e7) @@ -563,6 +563,9 @@ log.debug("Entry has been cleared from swap storage: " + this); } + if (cctx.store().isLocalStore()) + cctx.store().removeFromStore(null, key()); + rmv = true; return true; Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java (revision e86c69ee8ecbe4ef1ecb5dfa903f40f636a1b0de) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java (revision 3a466e8cbdbde90bbc4debb5f9b998621e2804e7) @@ -488,6 +488,8 @@ try { GridCloseableIterator>> it = cctx.swap().iterator(id, false); + boolean isLocStore = cctx.store().isLocalStore(); + if (it != null) { // We can safely remove these values because no entries will be created for evicted partition. while (it.hasNext()) { @@ -498,6 +500,9 @@ K key = cctx.marshaller().unmarshal(keyBytes, cctx.deploy().globalLoader()); cctx.swap().remove(key, keyBytes); + + if (isLocStore) + cctx.store().removeFromStore(null, key); } } } Index: ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java (revision 2e115bf663aef70f6141119adcbd11e2c7bbbaaa) +++ ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java (revision 2e115bf663aef70f6141119adcbd11e2c7bbbaaa) @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import com.google.common.collect.*; +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.processors.cache.store.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.expiry.*; +import javax.cache.integration.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheMemoryMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CachePreloadMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + public static final TestLocalStore LOCAL_STORE_1 = new TestLocalStore<>(); + + /** */ + public static final TestLocalStore LOCAL_STORE_2 = new TestLocalStore<>(); + + /** */ + public static final TestLocalStore LOCAL_STORE_3 = new TestLocalStore<>(); + + /** */ + public static final int KEYS = 1000; + + /** */ + public static final String BACKUP_CACHE = "backup"; + + /** + * + */ + public GridCacheAbstractLocalStoreSelfTest() { + super(false /* doesn't start grid */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cacheCfg = cache(gridName, null, 0); + + CacheConfiguration cacheBackupCfg = cache(gridName, BACKUP_CACHE, 2); + + cfg.setCacheConfiguration(cacheCfg, cacheBackupCfg); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(spi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + LOCAL_STORE_1.clear(); + LOCAL_STORE_2.clear(); + LOCAL_STORE_3.clear(); + } + + /** + * @param gridName Grid name. + * @param cacheName Cache name. + * @param backups Number of backups. + * @return Configuration. + */ + @SuppressWarnings("unchecked") + private CacheConfiguration cache(String gridName, String cacheName, int backups) { + CacheConfiguration cacheCfg = new CacheConfiguration(); + + cacheCfg.setName(cacheName); + cacheCfg.setCacheMode(getCacheMode()); + cacheCfg.setAtomicityMode(getAtomicMode()); + cacheCfg.setDistributionMode(getDistributionMode()); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setPreloadMode(SYNC); + + if (gridName.endsWith("1")) + cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(LOCAL_STORE_1)); + else if (gridName.endsWith("2")) + cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(LOCAL_STORE_2)); + else + cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(LOCAL_STORE_3)); + + cacheCfg.setWriteThrough(true); + cacheCfg.setReadThrough(true); + cacheCfg.setBackups(backups); + cacheCfg.setOffHeapMaxMemory(0); + cacheCfg.setSwapEnabled(true); + + if (isOffHeapTieredMode()) + cacheCfg.setMemoryMode(OFFHEAP_TIERED); + + return cacheCfg; + } + + /** + * @return Distribution mode. + */ + protected abstract CacheDistributionMode getDistributionMode(); + + /** + * @return Cache atomicity mode. + */ + protected abstract CacheAtomicityMode getAtomicMode(); + + /** + * @return Cache mode. + */ + protected abstract CacheMode getCacheMode(); + + /** + * @return {@code True} if {@link CacheMemoryMode#OFFHEAP_TIERED} memory mode should be used. + */ + protected boolean isOffHeapTieredMode() { + return false; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testEvict() throws Exception { + Ignite ignite1 = startGrid(1); + + IgniteCache cache = ignite1.jcache(null).withExpiryPolicy(new CreatedExpiryPolicy( + new Duration(TimeUnit.MILLISECONDS, 100L))); + + // Putting entry. + for (int i = 0; i < KEYS; i++) + cache.put(i, i); + + // Wait when entry + U.sleep(200); + + // Check that entry is evicted from cache, but local store does contain it. + for (int i = 0; i < KEYS; i++) { + cache.localEvict(Arrays.asList(i)); + + assertNull(cache.localPeek(i)); + + assertEquals(i, (int)LOCAL_STORE_1.load(i).get1()); + + assertEquals(i, cache.get(i)); + } + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNode() throws Exception { + Ignite ignite1 = startGrid(1); + + IgniteCache cache = ignite1.jcache(null); + + // Populate cache and check that local store has all value. + for (int i = 0; i < KEYS; i++) + cache.put(i, i); + + checkLocalStore(ignite1, LOCAL_STORE_1); + + final AtomicInteger evtCnt = new AtomicInteger(0); + + if (getCacheMode() != REPLICATED) { + ignite1.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + evtCnt.incrementAndGet(); + + return true; + } + }, EventType.EVT_CACHE_PRELOAD_PART_UNLOADED); + } + + final Ignite ignite2 = startGrid(2); + + if (getCacheMode() != REPLICATED) { + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + // Partition count which must be transferred to 2'nd node. + int parts = ignite2.affinity(null).allPartitions(ignite2.cluster().localNode()).length; + + return evtCnt.get() >= parts; + } + }, 5000); + + assertTrue(wait); + } + + assertEquals(Ignition.allGrids().size(), 2); + + checkLocalStore(ignite1, LOCAL_STORE_1); + checkLocalStore(ignite2, LOCAL_STORE_2); + } + + /** + * @throws Exception If failed. + */ + public void testBackupNode() throws Exception { + Ignite ignite1 = startGrid(1); + + IgniteCache cache = ignite1.jcache(BACKUP_CACHE); + + for (int i = 0; i < KEYS; i++) + cache.put(i, i); + + for (int i = 0; i < KEYS; i++) + assertEquals(LOCAL_STORE_1.load(i).get1().intValue(), i); + + // Start 2'nd node. + Ignite ignite2 = startGrid(2); + + assertEquals(2, Ignition.allGrids().size()); + + checkLocalStoreForBackup(ignite2, LOCAL_STORE_2); + + // Start 3'nd node. + Ignite ignite3 = startGrid(3); + + assertEquals(Ignition.allGrids().size(), 3); + + for (int i = 0; i < KEYS; i++) + cache.put(i, i * 3); + + checkLocalStoreForBackup(ignite2, LOCAL_STORE_2); + checkLocalStoreForBackup(ignite3, LOCAL_STORE_3); + + // Stop 3'nd node. + stopGrid(3, true); + + assertEquals(Ignition.allGrids().size(), 2); + + checkLocalStoreForBackup(ignite2, LOCAL_STORE_2); + } + + /** + * @throws Exception If failed. + */ + public void testSwap() throws Exception { + Ignite ignite1 = startGrid(1); + + IgniteCache cache = ignite1.jcache(null); + + // Populate cache and check that local store has all value. + for (int i = 0; i < KEYS; i++) + cache.put(i, i); + + checkLocalStore(ignite1, LOCAL_STORE_1); + + // Push entry to swap. + for (int i = 0; i < KEYS; i++) + cache.localEvict(Lists.newArrayList(i)); + + for (int i = 0; i < KEYS; i++) + assertNull(cache.localPeek(i, CachePeekMode.ONHEAP)); + + final AtomicInteger evtCnt = new AtomicInteger(0); + + if (getCacheMode() != REPLICATED) { + ignite1.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + evtCnt.incrementAndGet(); + + return true; + } + }, EventType.EVT_CACHE_PRELOAD_PART_UNLOADED); + } + + final Ignite ignite2 = startGrid(2); + + if (getCacheMode() != REPLICATED) { + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + // Partition count which must be transferred to 2'nd node. + int parts = ignite2.affinity(null).allPartitions(ignite2.cluster().localNode()).length; + + return evtCnt.get() >= parts; + } + }, 5000); + + assertTrue(wait); + } + + assertEquals(Ignition.allGrids().size(), 2); + + checkLocalStore(ignite1, LOCAL_STORE_1); + checkLocalStore(ignite2, LOCAL_STORE_2); + } + + /** + * Checks that local stores contains only primary entry. + * + * @param ignite Ignite. + * @param store Store. + */ + private void checkLocalStore(Ignite ignite, CacheStore> store) { + for (int i = 0; i < KEYS; i++) { + if (ignite.affinity(null).isPrimary(ignite.cluster().localNode(), i)) + assertEquals(store.load(i).get1().intValue(), i); + else if (!ignite.affinity(null).isPrimaryOrBackup(ignite.cluster().localNode(), i)) + assertNull(store.load(i)); + } + } + + /** + * Checks that local stores contains only primary entry. + * + * @param ignite Ignite. + * @param store Store. + */ + private void checkLocalStoreForBackup(Ignite ignite, CacheStore> store) { + for (int i = 0; i < KEYS; i++) { + if (ignite.affinity(BACKUP_CACHE).isBackup(ignite.cluster().localNode(), i)) + assertEquals(store.load(i).get1().intValue(), i); + else if (!ignite.affinity(BACKUP_CACHE).isPrimaryOrBackup(ignite.cluster().localNode(), i)) + assertNull(store.load(i).get1()); + } + } + + /** + * + */ + @CacheLocalStore + public static class TestLocalStore implements CacheStore> { + /** */ + private Map> map = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure> clo, @Nullable Object... args) + throws CacheLoaderException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void txEnd(boolean commit) throws CacheWriterException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteBiTuple load(K key) throws CacheLoaderException { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public Map> loadAll(Iterable keys) throws CacheLoaderException { + Map> res = new HashMap<>(); + + for (K key : keys) { + IgniteBiTuple val = map.get(key); + + if (val != null) + res.put(key, val); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry> entry) + throws CacheWriterException { + map.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection>> entries) + throws CacheWriterException { + for (Cache.Entry> e : entries) + map.put(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + map.remove(key); + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection keys) throws CacheWriterException { + for (Object key : keys) + map.remove(key); + } + + /** + * Clear store. + */ + public void clear(){ + map.clear(); + } + } +}