Index: modules/core/src/test/java/org/apache/ignite/internal/processors/database/TtlHangReproducerTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/database/TtlHangReproducerTest.java (revision ) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/database/TtlHangReproducerTest.java (revision ) @@ -0,0 +1,225 @@ +package org.apache.ignite.internal.processors.database; + +import java.io.Serializable; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.MemoryPolicyConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class TtlHangReproducerTest extends GridCommonAbstractTest { + /** Ip finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Cache name. */ + private static final String CACHE_NAME = "cache1"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + discoverySpi.setIpFinder(ipFinder); + + MemoryConfiguration dbCfg = new MemoryConfiguration(); + + dbCfg.setMemoryPolicies(new MemoryPolicyConfiguration().setMaxSize(2000 * 1024 * 1024).setName("dfltMemPlc").setMetricsEnabled(true)); + + dbCfg.setDefaultMemoryPolicyName("dfltMemPlc"); + + cfg.setMemoryConfiguration(dbCfg); + + CacheConfiguration ccfg1 = new CacheConfiguration(); + + ccfg1.setName(CACHE_NAME); + ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg1.setAffinity(new RendezvousAffinityFunction(false, 256)); + ccfg1.setIndexedTypes(Integer.class, Integer.class); + ccfg1.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MINUTES, 2))); + + cfg.setCacheConfiguration(ccfg1); + + cfg.setPersistentStoreConfiguration( + new PersistentStoreConfiguration() + .setWalMode(WALMode.BACKGROUND) + .setCheckpointingFrequency(10_000) + .setWalHistorySize(2) + .setCheckpointingThreads(4)); + + if ("client".equals(gridName)) + cfg.setClientMode(true); + + cfg.setConsistentId(gridName); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + deleteWorkFiles(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + deleteWorkFiles(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60 * 1000; + } + + /** + * @throws Exception if failed. + */ + public void testSnapshotDuringLoadEvictions() throws Exception { + startGrids(2).active(true); + + try { + Ignite ig = ignite(0); + + final int keyCnt = 10_000_000; + + final AtomicBoolean run = new AtomicBoolean(true); + + HitRateMetrics getRate = new HitRateMetrics(5000, 5); + + GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + while (run.get()) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int key = rnd.nextInt(keyCnt * 2); + + ignite(0).cache(CACHE_NAME).get(key); + + getRate.onHit(); + } + + return null; + } + }, 2, "read-loader"); + + HitRateMetrics putRate = new HitRateMetrics(5000, 5); + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + while (run.get()) { + System.out.println("@@@ putsPerSec=" + (putRate.getRate() / 5) + " " + " getsPerSec=" + (getRate.getRate() / 5) + " " + "expiresPerSec=" + (CacheConfiguration.EXPIRES.getRate() / 5) + " " + ig.memoryMetrics("dfltMemPlc")); + + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + }, "metrics-view"); + + try (IgniteDataStreamer ds = ig.dataStreamer(CACHE_NAME)) { + ds.allowOverwrite(true); + + for (int i = 0; i < keyCnt; i++) { + ds.addData(ThreadLocalRandom.current().nextInt(keyCnt * 2), new TestValue(ThreadLocalRandom.current().nextInt(), + ThreadLocalRandom.current().nextInt())); + + putRate.onHit(); + } + } + + run.set(false); + } + finally { + stopAllGrids(); + } + } + + /** + * + */ + private static class TestValue implements Serializable { + /** */ + private final int v1; + + /** */ + private final int v2; + + /** */ + private byte[] payload = new byte[400]; + + /** + * @param v1 Value 1. + * @param v2 Value 2. + */ + private TestValue(int v1, int v2) { + this.v1 = v1; + this.v2 = v2; + payload[Math.abs(v1) % 400] = 1; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue value = (TestValue)o; + + return v1 == value.v1 && v2 == value.v2; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = v1; + + result = 31 * result + v2; + + return result; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void deleteWorkFiles() throws IgniteCheckedException { + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "snapshot", false)); + } +} Index: modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java (revision cf345b81820fbbcb82e5fcb2283b43ea286a40b4) +++ modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java (revision ) @@ -67,6 +67,7 @@ import org.apache.ignite.cache.store.CacheStoreSessionListener; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.binary.BinaryContext; +import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics; import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -93,6 +94,8 @@ */ @SuppressWarnings("RedundantFieldInitialization") public class CacheConfiguration extends MutableConfiguration { + public static final HitRateMetrics EXPIRES = new HitRateMetrics(5000, 5); + /** */ private static final long serialVersionUID = 0L; Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/MemoryMetricsSnapshot.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/MemoryMetricsSnapshot.java (revision cf345b81820fbbcb82e5fcb2283b43ea286a40b4) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/MemoryMetricsSnapshot.java (revision ) @@ -109,4 +109,19 @@ @Override public long getPhysicalMemoryPages() { return physicalMemoryPages; } + + /** {@inheritDoc} */ + @Override public String toString() { + return "MemoryMetricsSnapshot{" + + "name='" + name + '\'' + + ", totalAllocatedPages=" + totalAllocatedPages + + ", allocationRate=" + allocationRate + + ", evictionRate=" + evictionRate + + ", largeEntriesPagesPercentage=" + largeEntriesPagesPercentage + + ", pagesFillFactor=" + pagesFillFactor + + ", dirtyPages=" + dirtyPages + + ", pageReplaceRate=" + pageReplaceRate + + ", physicalMemoryPages=" + physicalMemoryPages + + '}'; + } } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java (revision cf345b81820fbbcb82e5fcb2283b43ea286a40b4) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java (revision ) @@ -73,6 +73,7 @@ import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.configuration.CacheConfiguration.EXPIRES; import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId; @@ -1005,6 +1006,8 @@ obsoleteVer = ctx.versions().next(); c.apply(cctx.cache().entryEx(row.key), obsoleteVer); + + EXPIRES.onHit(); } cleared++;