Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLockReleaseNodeLeaveTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLockReleaseNodeLeaveTest.java (revision e04d83c4911bbd3e18f03ee1e80a99cd49fae87f) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLockReleaseNodeLeaveTest.java (revision ) @@ -17,13 +17,24 @@ package org.apache.ignite.internal.processors.cache.distributed; +import java.util.ArrayDeque; +import java.util.Queue; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCompute; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -33,6 +44,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -43,6 +55,12 @@ /** */ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + /** */ + public static final int THREAD_CNT = Math.max(8, Runtime.getRuntime().availableProcessors()); + + /** */ + private static final String REPLICATED_TEST_CACHE = "REPLICATED_TEST_CACHE"; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -53,7 +71,13 @@ ccfg.setAtomicityMode(TRANSACTIONAL); - cfg.setCacheConfiguration(ccfg); + CacheConfiguration ccfg1 = new CacheConfiguration(REPLICATED_TEST_CACHE) + .setCacheMode(REPLICATED) + .setAtomicityMode(TRANSACTIONAL); + + ccfg1.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(SECONDS, 1))); + + cfg.setCacheConfiguration(ccfg, ccfg1); return cfg; } @@ -111,6 +135,59 @@ fut2.get(5, SECONDS); } + /** {@inheritDoc} */ + @Override public boolean isDebug() { + return true; + } + + /** + * @throws Exception If failed. + */ + public void testLockNodeRestart() throws Exception { + int nodeCnt = 4; + int threadCnt = 8; + + try { + Queue> q = new ArrayDeque<>(nodeCnt); + + for (int i = 0; i < nodeCnt; i++) { + final Ignite ignite = startGrid(i); + + IgniteInternalFuture f = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + while (!Thread.currentThread().isInterrupted()) { + IgniteCache cache = ignite.cache(REPLICATED_TEST_CACHE); + for (int i = 0; i < 100; i++) { + Object value = cache.get(i); + + if (value == null) { + Lock lock = cache.lock(i); + lock.lock(); + + cache.put(i, i); + + lock.unlock(); + } + } + } + } + }, threadCnt, "test-lock-thread"); + + q.add(f); + + U.sleep(3_000); + } + + IgniteInternalFuture f = null; + + while ((f = q.poll()) != null) + f.cancel(); + } + finally { + stopAllGrids(); + } + } + /** * @throws Exception If failed. */