Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java (revision da02f3556ebeb80f4aa87335fa41932c41f61c0a) +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java (date 1680766347668) @@ -26,6 +26,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; +import java.util.function.Function; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCluster; import org.apache.ignite.IgniteLogger; @@ -89,6 +90,13 @@ * */ public class CacheGroupContext { + /** Default offheap manager provider. */ + public static final Function DEFAULT_OFFHEAP_MANAGER_PROVIDER = + persistenceEnabled -> persistenceEnabled ? new GridCacheOffheapManager() : new IgniteCacheOffheapManagerImpl(); + + /** Offheap manager provider. */ + public static Function offheapMgrProvider = DEFAULT_OFFHEAP_MANAGER_PROVIDER; + /** * Unique group ID. Currently for shared group it is generated as group name hash, * for non-shared as cache name hash (see {@link ClusterCachesInfo#checkCacheConflict}). @@ -1072,9 +1080,7 @@ metrics.onTopologyInitialized(); try { - offheapMgr = ctx.kernalContext().resource().resolve(persistenceEnabled - ? new GridCacheOffheapManager() - : new IgniteCacheOffheapManagerImpl()); + offheapMgr = ctx.kernalContext().resource().resolve(offheapMgrProvider.apply(persistenceEnabled)); } catch (Exception e) { throw new IgniteCheckedException("Failed to initialize offheap manager", e); Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/BlockingThreadsOnSnapshotRestoreReproducerTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/BlockingThreadsOnSnapshotRestoreReproducerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/BlockingThreadsOnSnapshotRestoreReproducerTest.java new file mode 100644 --- /dev/null (date 1680767414424) +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/BlockingThreadsOnSnapshotRestoreReproducerTest.java (date 1680767414424) @@ -0,0 +1,140 @@ +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE; +import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT; +import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS; + +/** + * + */ +public class BlockingThreadsOnSnapshotRestoreReproducerTest extends GridCommonAbstractTest { + /** Keys count. */ + public static final int KEYS_CNT = 10000; + + /** Delay of partitions states restoring. */ + public static final long DELAY = DFLT_FAILURE_DETECTION_TIMEOUT * 2; + + /** Failure handler invoked flag. */ + private final AtomicBoolean failureHndInvoked = new AtomicBoolean(); + + /** Delay partition states restore flag. */ + private final AtomicBoolean delayPartitionStatesRestore = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setFailureHandler((ign, ctx) -> { + failureHndInvoked.set(true); + + return false; + }) + .setFailureDetectionTimeout(DFLT_FAILURE_DETECTION_TIMEOUT) + .setDataStorageConfiguration( + new DataStorageConfiguration() + .setCheckpointFrequency(DFLT_FAILURE_DETECTION_TIMEOUT / 2) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true))); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + failureHndInvoked.set(false); + delayPartitionStatesRestore.set(false); + + CacheGroupContext.offheapMgrProvider = b -> new PartitionStatesRestoreDelayingOffheapManager(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + CacheGroupContext.offheapMgrProvider = CacheGroupContext.DEFAULT_OFFHEAP_MANAGER_PROVIDER; + } + + /** + * + */ + @Test + @WithSystemProperty(key = IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS, value = "true") + @WithSystemProperty(key = IGNITE_DUMP_THREADS_ON_FAILURE, value = "false") + public void test() throws Exception { + prepareSnapshotToRestore(); + + delayPartitionStatesRestore.set(true); + + grid(0).snapshot().restoreSnapshot("test", null).get(); + + awaitPartitionMapExchange(); + + doSleep(DELAY); + + IgniteCache cache = grid(0).cache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < KEYS_CNT; i++) + assertEquals(i, cache.get(i).intValue()); + + assertFalse("Failure handler called", failureHndInvoked.get()); + } + + /** + * Create and populate cache, make snapshot and clear cluster. + */ + private void prepareSnapshotToRestore() throws Exception { + startGrid(0).cluster().state(ClusterState.ACTIVE); + + IgniteCache cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME); + + IntStream.range(0, KEYS_CNT).forEach(i -> cache.put(i, i)); + + grid(0).snapshot().createSnapshot("test").get(); + awaitPartitionMapExchange(); + + grid(0).destroyCache(DEFAULT_CACHE_NAME); + awaitPartitionMapExchange(); + } + + /** + * + */ + private class PartitionStatesRestoreDelayingOffheapManager extends GridCacheOffheapManager { + /** Latch. */ + private final CountDownLatch latch = new CountDownLatch(1); + + /** {@inheritDoc} */ + @Override public void restorePartitionStates() throws IgniteCheckedException { + try { + if (delayPartitionStatesRestore.get()) + latch.await(DELAY, + TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + throw new IgniteCheckedException(e); + } + + super.restorePartitionStates(); + } + } +}