Index: incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java (date 1455107523000) +++ incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java (revision ) @@ -523,7 +523,7 @@ val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); // Set unswapped value. - update(val, e.expireTime(), e.ttl(), e.version()); + update(val, e.expireTime(), e.ttl(), e.version(), false); // Must update valPtr again since update() will reset it. if (cctx.offheapTiered() && e.offheapPointer() > 0) @@ -939,7 +939,7 @@ boolean hadValPtr = hasOffHeapPointer(); // Don't change version for read-through. - update(ret, expTime, ttl, nextVer); + update(ret, expTime, ttl, nextVer, true); if (hadValPtr && cctx.offheapTiered()) cctx.swap().removeOffheap(key); @@ -1034,7 +1034,7 @@ deletedUnlocked(true); } - update(ret, expTime, ttl, nextVer); + update(ret, expTime, ttl, nextVer, true); touch = true; @@ -1188,7 +1188,7 @@ if (updateCntr != null && updateCntr != 0) updateCntr0 = updateCntr; - update(val, expireTime, ttl, newVer); + update(val, expireTime, ttl, newVer, true); drReplicate(drType, val, newVer); @@ -1350,7 +1350,7 @@ boolean hadValPtr = hasOffHeapPointer(); - update(null, 0, 0, newVer); + update(null, 0, 0, newVer, true); if (cctx.offheapTiered() && hadValPtr) { boolean rmv = cctx.swap().removeOffheap(key); @@ -1566,7 +1566,7 @@ else clearIndex(null); - update(old, expireTime, ttl, ver); + update(old, expireTime, ttl, ver, true); } // Apply metrics. @@ -1713,7 +1713,7 @@ assert ttl != CU.TTL_ZERO; - update(updated, expireTime, ttl, ver); + update(updated, expireTime, ttl, ver, true); if (evt) { CacheObject evtOld = null; @@ -1750,7 +1750,7 @@ // in load methods without actually holding entry lock. clearIndex(old); - update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver); + update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true); if (cctx.offheapTiered() && hasValPtr) { boolean rmv = cctx.swap().removeOffheap(key); @@ -2125,7 +2125,7 @@ else clearIndex(null); - update(oldVal, initExpireTime, initTtl, ver); + update(oldVal, initExpireTime, initTtl, ver, true); if (deletedUnlocked() && oldVal != null && !isInternal()) deletedUnlocked(false); @@ -2339,7 +2339,7 @@ // in load methods without actually holding entry lock. updateIndex(updated, newExpireTime, newVer, oldVal); - update(updated, newExpireTime, newTtl, newVer); + update(updated, newExpireTime, newTtl, newVer, true); updateCntr0 = nextPartCounter(topVer); @@ -2424,7 +2424,7 @@ boolean hasValPtr = hasOffHeapPointer(); // Clear value on backup. Entry will be removed from cache when it got evicted from queue. - update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer); + update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true); assert newSysTtl == CU.TTL_NOT_CHANGED; assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE; @@ -2735,7 +2735,7 @@ if (cctx.deferredDelete() && !isStartVersion() && !detached() && !isInternal()) { if (!deletedUnlocked()) { - update(null, 0L, 0L, ver); + update(null, 0L, 0L, ver, true); deletedUnlocked(true); @@ -2923,24 +2923,24 @@ * @param ttl Time to live. * @param ver Update version. */ - protected final void update(@Nullable CacheObject val, long expireTime, long ttl, GridCacheVersion ver) { + protected final void update(@Nullable CacheObject val, long expireTime, long ttl, GridCacheVersion ver, boolean addTracked) { assert ver != null; assert Thread.holdsLock(this); assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl; long oldExpireTime = expireTimeExtras(); - if (oldExpireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl()) + if (addTracked && oldExpireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl()) cctx.ttl().removeTrackedEntry(this); value(val); ttlAndExpireTimeExtras(ttl, expireTime); - if (expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl()) - cctx.ttl().addTrackedEntry(this); - this.ver = ver; + + if (addTracked && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl()) + cctx.ttl().addTrackedEntry(this); } /** @@ -3252,7 +3252,7 @@ @Override public synchronized CacheObject rawPut(CacheObject val, long ttl) { CacheObject old = this.val; - update(val, CU.toExpireTime(ttl), ttl, nextVersion()); + update(val, CU.toExpireTime(ttl), ttl, nextVersion(), true); return old; } @@ -3280,7 +3280,7 @@ updateIndex(val, expTime, ver, null); // Version does not change for load ops. - update(val, expTime, ttl, ver); + update(val, expTime, ttl, ver, true); boolean skipQryNtf = false; @@ -3366,7 +3366,8 @@ update(val, unswapped.expireTime(), unswapped.ttl(), - unswapped.version() + unswapped.version(), + true ); return true; @@ -3425,7 +3426,7 @@ } // Version does not change for load ops. - update(val, expTime, ttl, newVer); + update(val, expTime, ttl, newVer, true); return newVer; } @@ -3640,7 +3641,7 @@ if (!obsolete()) { if (cctx.deferredDelete() && !detached() && !isInternal()) { if (!deletedUnlocked()) { - update(null, 0L, 0L, ver0 = ver); + update(null, 0L, 0L, ver0 = ver, true); deletedUnlocked(true); Index: incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java (date 1455107523000) +++ incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java (revision ) @@ -580,7 +580,7 @@ clearIndex(prev); // Give to GC. - update(null, 0L, 0L, ver); + update(null, 0L, 0L, ver, true); if (swap) { releaseSwap(); Index: incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java (date 1455107523000) +++ incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java (revision ) @@ -33,6 +33,7 @@ import javax.cache.expiry.Duration; import javax.cache.expiry.EternalExpiryPolicy; import javax.cache.expiry.ExpiryPolicy; +import javax.cache.expiry.ModifiedExpiryPolicy; import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCache; @@ -109,6 +110,37 @@ cfg.setEagerTtl(false); return cfg; + } + + /** + * @throws Exception if failed. + */ + public void testCreateUpdate0() throws Exception { + startGrids(1); + + long ttl = 60L; + + final String key = "key1"; + + final IgniteCache cache = jcache(); + + for (int i = 0; i < 1000; i++) { + final IgniteCache cache0 = cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(TimeUnit.HOURS, ttl))); + + cache0.put(key, key); + + info("PUT DONE"); + } + + int pSize = grid(0).context().cache().internalCache(null).context().ttl().pendingSize(); + + assertTrue("Too many pending entries: " + pSize, pSize <= 1); + + cache.remove(key); + + pSize = grid(0).context().cache().internalCache(null).context().ttl().pendingSize(); + + assertEquals(0, pSize); } /** \ No newline at end of file Index: incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java (date 1455107523000) +++ incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java (revision ) @@ -18,11 +18,13 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridConcurrentSkipListSet; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.thread.IgniteThread; @@ -74,6 +76,9 @@ * @param entry Entry to add. */ public void addTrackedEntry(GridCacheMapEntry entry) { + assert Thread.holdsLock(entry); + assert cleanupWorker != null; + pendingEntries.add(new EntryWrapper(entry)); } @@ -82,10 +87,18 @@ */ public void removeTrackedEntry(GridCacheMapEntry entry) { assert Thread.holdsLock(entry); + assert cleanupWorker != null; pendingEntries.remove(new EntryWrapper(entry)); } + /** + * @return The size of pending entries. + */ + public int pendingSize() { + return pendingEntries.sizex(); + } + /** {@inheritDoc} */ @Override public void printMemoryStats() { X.println(">>>"); @@ -150,6 +163,46 @@ } /** + * @param cctx Cache context. + * @param key1 Left key to compare. + * @param key2 Right key to compare. + * @return Comparison result. + */ + private static int compareKeys(GridCacheContext cctx, CacheObject key1, CacheObject key2) { + int key1Hash = key1.hashCode(); + int key2Hash = key2.hashCode(); + + int res = Integer.compare(key1Hash, key2Hash); + + if (res == 0) { + key1 = (CacheObject)cctx.unwrapTemporary(key1); + key2 = (CacheObject)cctx.unwrapTemporary(key2); + + try { + byte[] key1ValBytes = key1.valueBytes(cctx.cacheObjectContext()); + byte[] key2ValBytes = key2.valueBytes(cctx.cacheObjectContext()); + + // Must not do fair array comparison. + res = Integer.compare(key1ValBytes.length, key2ValBytes.length); + + if (res == 0) { + for (int i = 0; i < key1ValBytes.length; i++) { + res = Byte.compare(key1ValBytes[i], key2ValBytes[i]); + + if (res != 0) + break; + } + } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + return res; + } + + /** * Entry wrapper. */ private static class EntryWrapper implements Comparable { @@ -174,9 +227,13 @@ @Override public int compareTo(EntryWrapper o) { int res = Long.compare(expireTime, o.expireTime); - if (res == 0) - res = Long.compare(entry.startVersion(), o.entry.startVersion()); + if (res == 0) { + // Must compare entries of the same cache. + assert entry.context() == o.entry.context(); + res = compareKeys(entry.context(), entry.key(), o.entry.key()); + } + return res; } @@ -190,7 +247,7 @@ EntryWrapper that = (EntryWrapper)o; - return expireTime == that.expireTime && entry.startVersion() == that.entry.startVersion(); + return expireTime == that.expireTime && compareKeys(entry.context(), entry.key(), that.entry.key()) == 0; } @@ -198,11 +255,16 @@ @Override public int hashCode() { int res = (int)(expireTime ^ (expireTime >>> 32)); - res = 31 * res + (int)(entry.startVersion() ^ (entry.startVersion() >>> 32)); + res = 31 * res + entry.key().hashCode(); return res; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(EntryWrapper.class, this); - } + } + } /** * Provides additional method {@code #sizex()}. NOTE: Only the following methods supports this addition: @@ -230,7 +292,7 @@ @Override public boolean add(EntryWrapper e) { boolean res = super.add(e); - assert res; + assert res : "Failed to add entry wrapper:" + e; size.increment(); \ No newline at end of file Index: incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java (date 1455107523000) +++ incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java (revision ) @@ -166,7 +166,7 @@ if (isNew() || !valid(topVer)) { // Version does not change for load ops. - update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version()); + update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version(), true); if (cctx.deferredDelete() && !isNew() && !isInternal()) { boolean deleted = val == null; @@ -402,7 +402,7 @@ if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0) { primaryNode(primaryNodeId, topVer); - update(val, expireTime, ttl, ver); + update(val, expireTime, ttl, ver, true); if (cctx.deferredDelete() && !isInternal()) { boolean deleted = val == null;