Index: modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java (revision cd50cf7010a4dcb1f58734983a29e5b7042c239d) +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java (date 1658329829922) @@ -25,9 +25,12 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.IntStream; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; import javax.management.DynamicMBean; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; @@ -427,6 +430,37 @@ fut.cancel(); } } + + /** + * + */ + @Test + public void testWithExpiryPolicy() throws Exception { + IgniteCache srcCache = createCache(srcCluster[0], ACTIVE_PASSIVE_CACHE); + IgniteCache destCache = createCache(destCluster[0], ACTIVE_PASSIVE_CACHE); + + List> futs = startActivePassiveCdc(ACTIVE_PASSIVE_CACHE); + + try { + srcCache.put(0, ConflictResolvableTestData.create()); + + assertTrue(srcCache.containsKey(0)); + + log.warning(">>>>>> Waiting for entry in destination cache"); + assertTrue(waitForCondition(() -> destCache.containsKey(0), getTestTimeout())); + + log.warning(">>>>>> Waiting for removing in source cache"); + assertTrue(waitForCondition(() -> !srcCache.containsKey(0), getTestTimeout())); + + log.warning(">>>>>> Waiting for removing in destination cache"); + assertTrue(waitForCondition(() -> !destCache.containsKey(0), 20_000)); + } + finally { + for (IgniteInternalFuture fut : futs) { + fut.cancel(); + } + } + } /** */ public Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) { @@ -483,7 +517,8 @@ CacheConfiguration ccfg = new CacheConfiguration() .setName(name) .setCacheMode(mode) - .setAtomicityMode(atomicity); + .setAtomicityMode(atomicity) + .setExpiryPolicyFactory(() -> new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 10))); if (mode != REPLICATED) ccfg.setBackups(backups);