From 8e50c49e202f82e56e175a056357119271e69e80 Mon Sep 17 00:00:00 2001 From: sboikov Date: Thu, 27 Aug 2015 15:22:40 +0300 Subject: [PATCH] # ignite-1124 --- .../processors/cache/GridCacheAdapter.java | 4 + .../processors/cache/GridCacheAtomicFuture.java | 5 - .../processors/cache/GridCacheMvccManager.java | 62 +- .../distributed/dht/atomic/GridDhtAtomicCache.java | 10 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 7 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 1235 ++++++++++---------- .../internal/util/future/GridCompoundFuture.java | 3 +- .../org/apache/ignite/internal/util/typedef/X.java | 1 + ...niteCacheEntryListenerEagerTtlDisabledTest.java | 4 +- .../CacheAsyncOperationsFailoverAbstractTest.java | 329 ++++++ .../CacheAsyncOperationsFailoverAtomicTest.java | 32 + .../CacheAsyncOperationsFailoverTxTest.java | 32 + .../dht/IgniteCachePutRetryAbstractSelfTest.java | 282 ++++- .../dht/IgniteCachePutRetryAtomicSelfTest.java | 77 +- .../IgniteCachePutRetryTransactionalSelfTest.java | 14 +- ...teCacheAtomicReplicatedNodeRestartSelfTest.java | 5 - .../tcp/IgniteCacheSslStartStopSelfTest.java | 9 +- .../testsuites/IgniteCacheFailoverTestSuite2.java | 4 + 18 files changed, 1353 insertions(+), 762 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 54d33e0..c3bbbe4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -4135,6 +4135,10 @@ public abstract class GridCacheAdapter implements IgniteInternalCache f = new GridEmbeddedFuture(fut, new IgniteOutClosure() { @Override public IgniteInternalFuture apply() { + if (ctx.kernalContext().isStopping()) + return new GridFinishedFuture<>( + new IgniteCheckedException("Operation has been cancelled (node is stopping).")); + return op.op(tx0).chain(new CX1, T>() { @Override public T applyx(IgniteInternalFuture tFut) throws IgniteCheckedException { try { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index 8724d3a..e64a9e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -27,11 +27,6 @@ import java.util.*; */ public interface GridCacheAtomicFuture extends GridCacheFuture { /** - * @return Future topology version. - */ - public AffinityTopologyVersion topologyVersion(); - - /** * Gets future that will be completed when it is safe when update is finished on the given version of topology. * * @param topVer Topology version to finish. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 6a8c6fe..bbac42b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -196,8 +196,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { cacheFut.onNodeLeft(discoEvt.eventNode().id()); - if (cacheFut.isCancelled() || cacheFut.isDone()) - atomicFuts.remove(cacheFut.futureId(), fut); + if (cacheFut.isCancelled() || cacheFut.isDone()) { + GridCacheVersion futVer = cacheFut.version(); + + if (futVer != null) + atomicFuts.remove(futVer, fut); + } } } } @@ -347,16 +351,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** - * @param fut Future to check. - * @return {@code True} if future is registered. - */ - public boolean hasFuture(GridCacheFuture fut) { - assert fut != null; - - return future(fut.version(), fut.futureId()) != null; - } - - /** * @param futVer Future ID. * @param fut Future. */ @@ -565,6 +559,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @param ver Version. * @return All futures for given lock version. */ + @SuppressWarnings("unchecked") public Collection> futures(GridCacheVersion ver) { Collection c = futs.get(ver); @@ -572,6 +567,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @param cacheCtx Cache context. * @param ver Lock version to check. * @return {@code True} if lock had been removed. */ @@ -580,6 +576,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @param cacheCtx Cache context. * @param ver Obsolete entry version. * @return {@code True} if added. */ @@ -688,27 +685,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** - * @param keys Keys. - * @param base Base version. - * @return Versions that are less than {@code base} whose keys are in the {@code keys} collection. - */ - public Collection localDhtPendingVersions(Collection keys, GridCacheVersion base) { - Collection lessPending = new GridLeanSet<>(5); - - for (GridCacheMvccCandidate cand : dhtLocCands) { - if (cand.version().isLess(base)) { - if (keys.contains(cand.key())) - lessPending.add(cand.version()); - } - else - break; - } - - return lessPending; - } - - /** - * + * @param cacheCtx Cache context. * @param cand Cache lock candidate to add. * @return {@code True} if added as a result of this operation, * {@code false} if was previously added. @@ -924,24 +901,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { X.println(">>> finishFutsSize: " + finishFuts.size()); } - - /** - * @param nodeId Node ID. - * @return Filter. - */ - private IgnitePredicate nodeIdFilter(final UUID nodeId) { - if (nodeId == null) - return F.alwaysTrue(); - - return new P1() { - @Override public boolean apply(GridCacheMvccCandidate c) { - UUID otherId = c.otherNodeId(); - - return c.nodeId().equals(nodeId) || (otherId != null && otherId.equals(nodeId)); - } - }; - } - /** * @param topVer Topology version. * @return Future that signals when all locks for given partitions are released. @@ -994,6 +953,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * * @return Finish update future. */ + @SuppressWarnings("unchecked") public IgniteInternalFuture finishAtomicUpdates(AffinityTopologyVersion topVer) { GridCompoundFuture res = new GridCompoundFuture<>(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 9087d20..0985ae3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -135,6 +135,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { @SuppressWarnings("ThrowableResultOfMethodCallIgnored") @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { if (ctx.config().getAtomicWriteOrderMode() == CLOCK) { + assert req.writeSynchronizationMode() != FULL_ASYNC : req; + // Always send reply in CLOCK ordering mode. sendNearUpdateReply(res.nodeId(), res); @@ -570,6 +572,10 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { IgniteInternalFuture f = new GridEmbeddedFuture(fut, new IgniteOutClosure() { @Override public IgniteInternalFuture apply() { + if (ctx.kernalContext().isStopping()) + return new GridFinishedFuture<>( + new IgniteCheckedException("Operation has been cancelled (node is stopping).")); + return op.apply(); } }); @@ -2243,6 +2249,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param req Request to remap. */ private void remapToNewPrimary(GridNearAtomicUpdateRequest req) { + assert req.writeSynchronizationMode() == FULL_ASYNC : req; + if (log.isDebugEnabled()) log.debug("Remapping near update request locally: " + req); @@ -2275,7 +2283,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { drRmvVals = null; } else { - assert req.operation() == DELETE; + assert req.operation() == DELETE : req; drRmvVals = req.conflictVersions(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 4b1a58f..04128b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -168,13 +168,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter } /** {@inheritDoc} */ - @Override public AffinityTopologyVersion topologyVersion() { - return updateReq.topologyVersion(); - } - - /** {@inheritDoc} */ @Override public IgniteInternalFuture completeFuture(AffinityTopologyVersion topVer) { - if (waitForExchange && topologyVersion().compareTo(topVer) < 0) + if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0) return this; return null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 07ec808..3f22808 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.affinity.*; @@ -36,11 +35,9 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; -import org.jsr166.*; import javax.cache.expiry.*; import java.util.*; -import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; @@ -64,9 +61,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter /** Cache. */ private GridDhtAtomicCache cache; - /** Future ID. */ - private volatile GridCacheVersion futVer; - /** Update operation. */ private final GridCacheOperation op; @@ -88,55 +82,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private Collection conflictRmvVals; - /** Mappings. */ - @GridToStringInclude - private ConcurrentMap mappings; - - /** Error. */ - private volatile CachePartialUpdateCheckedException err; - - /** Operation result. */ - private volatile GridCacheReturn opRes; - /** Return value require flag. */ private final boolean retval; /** Expiry policy. */ private final ExpiryPolicy expiryPlc; - /** Future map topology version. */ - private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; - - /** Completion future for a particular topology version. */ - private GridFutureAdapter topCompleteFut; - /** Optional filter. */ private final CacheEntryPredicate[] filter; /** Write synchronization mode. */ private final CacheWriteSynchronizationMode syncMode; - /** If this future mapped to single node. */ - private volatile Boolean single; - - /** If this future is mapped to a single node, this field will contain that node ID. */ - private UUID singleNodeId; - - /** Single update request. */ - private GridNearAtomicUpdateRequest singleReq; - /** Raw return value flag. */ private final boolean rawRetval; /** Fast map flag. */ private final boolean fastMap; - /** */ - private boolean fastMapRemap; - - /** */ - private GridCacheVersion updVer; - /** Near cache flag. */ private final boolean nearEnabled; @@ -156,7 +119,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter private final boolean waitTopFut; /** Remap count. */ - private AtomicInteger remapCnt; + private int remapCnt; + + /** State. */ + private final UpdateState state; /** * @param cctx Cache context. @@ -175,6 +141,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter * @param subjId Subject ID. * @param taskNameHash Task name hash code. * @param skipStore Skip store flag. + * @param remapCnt Maximum number of retries. + * @param waitTopFut If {@code false} does not wait for affinity change future. */ public GridNearAtomicUpdateFuture( GridCacheContext cctx, @@ -223,8 +191,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter if (log == null) log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); - mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f); - fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC && cctx.config().getAtomicWriteOrderMode() == CLOCK && !(cctx.writeThrough() && cctx.config().getInterceptor() != null); @@ -234,22 +200,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter if (!waitTopFut) remapCnt = 1; - this.remapCnt = new AtomicInteger(remapCnt); + this.remapCnt = remapCnt; + + state = new UpdateState(); } /** {@inheritDoc} */ @Override public IgniteUuid futureId() { - return futVer.asGridUuid(); + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public GridCacheVersion version() { - return futVer; + return state.futureVersion(); } /** {@inheritDoc} */ @Override public Collection nodes() { - return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull()); + throw new UnsupportedOperationException(); } /** @@ -261,45 +229,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter } /** {@inheritDoc} */ - @Override public AffinityTopologyVersion topologyVersion() { - return topVer; - } - - /** {@inheritDoc} */ @Override public Collection keys() { return keys; } /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - Boolean single0 = single; - - if (single0 != null && single0) { - if (singleNodeId.equals(nodeId)) { - onDone(addFailedKeys( - singleReq.keys(), - singleReq.topologyVersion(), - new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId))); - - return true; - } - - return false; - } - - GridNearAtomicUpdateRequest req = mappings.get(nodeId); - - if (req != null) { - addFailedKeys(req.keys(), - req.topologyVersion(), - new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId)); - - mappings.remove(nodeId); - - checkComplete(); - - return true; - } + state.onNodeLeft(nodeId); return false; } @@ -329,142 +265,34 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); if (topVer == null) - mapOnTopology(null, false, null); + mapOnTopology(); else { topLocked = true; // Cannot remap. - remapCnt.set(1); + remapCnt = 1; - map0(topVer, null, false, null); + state.map(topVer); } } /** {@inheritDoc} */ @Override public IgniteInternalFuture completeFuture(AffinityTopologyVersion topVer) { - if (waitForPartitionExchange() && topologyVersion().compareTo(topVer) < 0) { - GridFutureAdapter fut = null; - - synchronized (this) { - if (this.topVer == AffinityTopologyVersion.ZERO) - return null; + if (waitForPartitionExchange()) { + GridFutureAdapter fut = state.completeFuture(topVer); - if (this.topVer.compareTo(topVer) < 0) { - if (topCompleteFut == null) - topCompleteFut = new GridFutureAdapter<>(); + if (fut != null && isDone()) { + fut.onDone(); - fut = topCompleteFut; - } + return null; } - if (fut != null && isDone()) - fut.onDone(); - return fut; } return null; } - /** - * @param failed Keys to remap. - * @param errTopVer Topology version for failed update. - */ - private void remap(Collection failed, AffinityTopologyVersion errTopVer) { - assert errTopVer != null; - - GridCacheVersion futVer0 = futVer; - - if (futVer0 == null || cctx.mvcc().removeAtomicFuture(futVer0) == null) - return; - - Collection remapKeys = new ArrayList<>(failed.size()); - Collection remapVals = vals != null ? new ArrayList<>(failed.size()) : null; - Collection remapConflictPutVals = conflictPutVals != null ? new ArrayList(failed.size()) : null; - Collection remapConflictRmvVals = conflictRmvVals != null ? new ArrayList(failed.size()) : null; - - Iterator keyIt = keys.iterator(); - Iterator valsIt = vals != null ? vals.iterator() : null; - Iterator conflictPutValsIt = conflictPutVals != null ? conflictPutVals.iterator() : null; - Iterator conflictRmvValsIt = conflictRmvVals != null ? conflictRmvVals.iterator() : null; - - for (Object key : failed) { - while (keyIt.hasNext()) { - Object nextKey = keyIt.next(); - Object nextVal = valsIt != null ? valsIt.next() : null; - GridCacheDrInfo nextConflictPutVal = conflictPutValsIt != null ? conflictPutValsIt.next() : null; - GridCacheVersion nextConflictRmvVal = conflictRmvValsIt != null ? conflictRmvValsIt.next() : null; - - if (F.eq(key, nextKey)) { - remapKeys.add(nextKey); - - if (remapVals != null) - remapVals.add(nextVal); - - if (remapConflictPutVals != null) - remapConflictPutVals.add(nextConflictPutVal); - - if (remapConflictRmvVals != null) - remapConflictRmvVals.add(nextConflictRmvVal); - - break; - } - } - } - - keys = remapKeys; - vals = remapVals; - conflictPutVals = remapConflictPutVals; - conflictRmvVals = remapConflictRmvVals; - - single = null; - futVer = null; - err = null; - opRes = null; - - GridFutureAdapter fut0; - - synchronized (this) { - mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f); - - assert topVer != null && topVer.topologyVersion() > 0 : this; - - topVer = AffinityTopologyVersion.ZERO; - - fut0 = topCompleteFut; - - topCompleteFut = null; - } - - if (fut0 != null) - fut0.onDone(); - - singleNodeId = null; - singleReq = null; - fastMapRemap = false; - updVer = null; - topLocked = false; - - IgniteInternalFuture fut = cctx.affinity().affinityReadyFuture(errTopVer.topologyVersion() + 1); - - fut.listen(new CI1>() { - @Override public void apply(final IgniteInternalFuture fut) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - try { - fut.get(); - - map(); - } - catch (IgniteCheckedException e) { - onDone(e); - } - } - }); - } - }); - } - /** {@inheritDoc} */ @SuppressWarnings("ConstantConditions") @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { @@ -478,35 +306,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter if (op == TRANSFORM && retval == null) retval = Collections.emptyMap(); - if (err != null && X.hasCause(err, CachePartialUpdateCheckedException.class) && - X.hasCause(err, ClusterTopologyCheckedException.class) && - storeFuture() && - remapCnt.decrementAndGet() > 0) { - ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class); - - if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { - CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class); - - assert cause != null && cause.topologyVersion() != null : err; - - remap(cause.failedKeys(), cause.topologyVersion()); - - return false; - } - } - if (super.onDone(retval, err)) { - if (futVer != null) - cctx.mvcc().removeAtomicFuture(version()); - - GridFutureAdapter fut0; - - synchronized (this) { - fut0 = topCompleteFut; - } + GridCacheVersion futVer = state.onFutureDone(); - if (fut0 != null) - fut0.onDone(); + if (futVer != null) + cctx.mvcc().removeAtomicFuture(futVer); return true; } @@ -521,68 +325,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter * @param res Update response. */ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { - if (res.remapKeys() != null) { - assert !fastMap || cctx.kernalContext().clientNode(); - - Collection remapKeys = fastMap ? null : res.remapKeys(); - - mapOnTopology(remapKeys, true, nodeId); - - return; - } - - GridCacheReturn ret = res.returnValue(); - - Boolean single0 = single; - - if (single0 != null && single0) { - assert singleNodeId.equals(nodeId) : "Invalid response received for single-node mapped future " + - "[singleNodeId=" + singleNodeId + ", nodeId=" + nodeId + ", res=" + res + ']'; - - updateNear(singleReq, res); - - if (res.error() != null) { - onDone(res.failedKeys() != null ? - addFailedKeys(res.failedKeys(), singleReq.topologyVersion(), res.error()) : res.error()); - } - else { - if (op == TRANSFORM) { - if (ret != null) - addInvokeResults(ret); - - onDone(opRes); - } - else { - GridCacheReturn opRes0 = opRes = ret; - - onDone(opRes0); - } - } - } - else { - GridNearAtomicUpdateRequest req = mappings.get(nodeId); - - if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft. - updateNear(req, res); - - if (res.error() != null) - addFailedKeys(req.keys(), req.topologyVersion(), res.error()); - else { - if (op == TRANSFORM) { - assert !req.fastMap(); - - if (ret != null) - addInvokeResults(ret); - } - else if (req.fastMap() && req.hasPrimary()) - opRes = ret; - } - - mappings.remove(nodeId); - } - - checkComplete(); - } + state.onResult(nodeId, res, false); } /** @@ -602,12 +345,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter /** * Maps future on ready topology. - * - * @param keys Keys to map. - * @param remap Boolean flag indicating if this is partial future remap. - * @param oldNodeId Old node ID if remap. */ - private void mapOnTopology(final Collection keys, final boolean remap, final UUID oldNodeId) { + private void mapOnTopology() { cache.topology().readLock(); AffinityTopologyVersion topVer = null; @@ -634,11 +373,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter } else { if (waitTopFut) { + assert !topLocked : this; + fut.listen(new CI1>() { @Override public void apply(IgniteInternalFuture t) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - mapOnTopology(keys, remap, oldNodeId); + mapOnTopology(); } }); } @@ -654,232 +395,543 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter cache.topology().readUnlock(); } - map0(topVer, keys, remap, oldNodeId); + state.map(topVer); } /** - * Checks if future is ready to be completed. + * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. */ - private void checkComplete() { - boolean remap = false; + private boolean storeFuture() { + return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; + } - synchronized (this) { - if (topVer != AffinityTopologyVersion.ZERO && - ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty())) { - CachePartialUpdateCheckedException err0 = err; + /** + * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near + * node and send updates in parallel to all participating nodes. + * + * @param key Key to map. + * @param topVer Topology version to map. + * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. + * @return Collection of nodes to which key is mapped. + */ + private Collection mapKey( + KeyCacheObject key, + AffinityTopologyVersion topVer, + boolean fastMap + ) { + GridCacheAffinityManager affMgr = cctx.affinity(); - if (err0 != null) - onDone(err0); - else { - if (fastMapRemap) { - assert cctx.kernalContext().clientNode(); + // If we can send updates in parallel - do it. + return fastMap ? + cctx.topology().nodes(affMgr.partition(key), topVer) : + Collections.singletonList(affMgr.primary(key, topVer)); + } - remap = true; + /** + * Maps future to single node. + * + * @param nodeId Node ID. + * @param req Request. + */ + private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { + if (cctx.localNodeId().equals(nodeId)) { + cache.updateAllAsyncInternal(nodeId, req, + new CI2() { + @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + onResult(res.nodeId(), res); } - else - onDone(opRes); - } - } + }); } + else { + try { + if (log.isDebugEnabled()) + log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - if (remap) - mapOnTopology(null, true, null); - } + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); - /** - * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. - */ - private boolean storeFuture() { - return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; + if (syncMode == FULL_ASYNC) + onDone(new GridCacheReturn(cctx, true, null, true)); + } + catch (IgniteCheckedException e) { + state.onSendError(req, e); + } + } } /** - * @param topVer Topology version. - * @param remapKeys Keys to remap or {@code null} to map all keys. - * @param remap Flag indicating if this is partial remap for this future. - * @param oldNodeId Old node ID if was remap. + * Sends messages to remote nodes and updates local cache. + * + * @param mappings Mappings to send. */ - private void map0( - AffinityTopologyVersion topVer, - @Nullable Collection remapKeys, - boolean remap, - @Nullable UUID oldNodeId) { - assert oldNodeId == null || remap || fastMapRemap; + private void doUpdate(Map mappings) { + UUID locNodeId = cctx.localNodeId(); - Collection topNodes = CU.affinityNodes(cctx, topVer); + GridNearAtomicUpdateRequest locUpdate = null; - if (F.isEmpty(topNodes)) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + - "left the grid).")); + // Send messages to remote nodes first, then run local update. + for (GridNearAtomicUpdateRequest req : mappings.values()) { + if (locNodeId.equals(req.nodeId())) { + assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate + + ", req=" + req + ']'; - return; + locUpdate = req; + } + else { + try { + if (log.isDebugEnabled()) + log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + } + catch (IgniteCheckedException e) { + state.onSendError(req, e); + } + } } - if (futVer == null) - // Assign future version in topology read lock before first exception may be thrown. - futVer = cctx.versions().next(topVer); + if (locUpdate != null) { + cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, + new CI2() { + @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + onResult(res.nodeId(), res); + } + }); + } - if (!remap && storeFuture()) - cctx.mvcc().addAtomicFuture(version(), this); + if (syncMode == FULL_ASYNC) + onDone(new GridCacheReturn(cctx, true, null, true)); + } - CacheConfiguration ccfg = cctx.config(); + /** + * + */ + private class UpdateState { + /** Current topology version. */ + private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; - // Assign version on near node in CLOCK ordering mode even if fastMap is false. - if (updVer == null) - updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; + /** */ + private GridCacheVersion updVer; - if (updVer != null && log.isDebugEnabled()) - log.debug("Assigned fast-map version for update on near node: " + updVer); + /** Topology version when got mapping error. */ + private AffinityTopologyVersion mapErrTopVer; - if (keys.size() == 1 && !fastMap && (single == null || single)) { - assert remapKeys == null || remapKeys.size() == 1 : remapKeys; + /** Mappings if operations is mapped to more than one node. */ + @GridToStringInclude + private Map mappings; - Object key = F.first(keys); + /** Error. */ + private CachePartialUpdateCheckedException err; - Object val; - GridCacheVersion conflictVer; - long conflictTtl; - long conflictExpireTime; + /** Future ID. */ + private GridCacheVersion futVer; - if (vals != null) { - // Regular PUT. - val = F.first(vals); - conflictVer = null; - conflictTtl = CU.TTL_NOT_CHANGED; - conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; - } - else if (conflictPutVals != null) { - // Conflict PUT. - GridCacheDrInfo conflictPutVal = F.first(conflictPutVals); + /** Completion future for a particular topology version. */ + private GridFutureAdapter topCompleteFut; - val = conflictPutVal.value(); - conflictVer = conflictPutVal.version(); - conflictTtl = conflictPutVal.ttl(); - conflictExpireTime = conflictPutVal.expireTime(); - } - else if (conflictRmvVals != null) { - // Conflict REMOVE. - val = null; - conflictVer = F.first(conflictRmvVals); - conflictTtl = CU.TTL_NOT_CHANGED; - conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; - } - else { - // Regular REMOVE. - val = null; - conflictVer = null; - conflictTtl = CU.TTL_NOT_CHANGED; - conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; - } + /** Keys to remap. */ + private Collection remapKeys; - // We still can get here if user pass map with single element. - if (key == null) { - NullPointerException err = new NullPointerException("Null key."); + /** Not null is operation is mapped to single node. */ + private GridNearAtomicUpdateRequest singleReq; - onDone(err); + /** Operation result. */ + private GridCacheReturn opRes; - return; - } + /** + * @return Future version. + */ + @Nullable synchronized GridCacheVersion futureVersion() { + return futVer; + } - if (val == null && op != GridCacheOperation.DELETE) { - NullPointerException err = new NullPointerException("Null value."); + /** + * @param nodeId Left node ID. + */ + void onNodeLeft(UUID nodeId) { + GridNearAtomicUpdateResponse res = null; - onDone(err); + synchronized (this) { + GridNearAtomicUpdateRequest req; + + if (singleReq != null) + req = singleReq.nodeId().equals(nodeId) ? singleReq : null; + else + req = mappings != null ? mappings.get(nodeId) : null; + + if (req != null) { + res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion()); + + res.addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before " + + "response is received: " + nodeId)); + } + } + + if (res != null) + onResult(nodeId, res, true); + } + + /** + * @param nodeId Node ID. + * @param res Response. + * @param nodeErr {@code True} if response was created on node failure. + */ + void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { + GridNearAtomicUpdateRequest req; + + AffinityTopologyVersion remapTopVer = null; + + GridCacheReturn opRes0 = null; + CachePartialUpdateCheckedException err0 = null; + + boolean rcvAll; + + GridFutureAdapter fut0 = null; + + synchronized (this) { + if (!res.futureVersion().equals(futVer)) + return; + + if (singleReq != null) { + if (!singleReq.nodeId().equals(nodeId)) + return; + + req = singleReq; + + singleReq = null; + + rcvAll = true; + } + else { + req = mappings != null ? mappings.remove(nodeId) : null; + + if (req != null) + rcvAll = mappings.isEmpty(); + else + return; + } + + assert req != null && req.topologyVersion().equals(topVer) : req; + + if (res.remapKeys() != null) { + assert !fastMap || cctx.kernalContext().clientNode(); + + if (remapKeys == null) + remapKeys = U.newHashSet(res.remapKeys().size()); + + remapKeys.addAll(res.remapKeys()); + + if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) + mapErrTopVer = req.topologyVersion(); + } + else if (res.error() != null) { + if (res.failedKeys() != null) + addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error()); + } + else { + if (!req.fastMap() || req.hasPrimary()) { + GridCacheReturn ret = res.returnValue(); + + if (op == TRANSFORM) { + if (ret != null) + addInvokeResults(ret); + } + else + opRes = ret; + } + } + + if (rcvAll) { + if (remapKeys != null) { + assert mapErrTopVer != null; + + remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1); + } + else { + if (err != null && + X.hasCause(err, CachePartialUpdateCheckedException.class) && + X.hasCause(err, ClusterTopologyCheckedException.class) && + storeFuture() && + --remapCnt > 0) { + ClusterTopologyCheckedException topErr = + X.cause(err, ClusterTopologyCheckedException.class); + + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + CachePartialUpdateCheckedException cause = + X.cause(err, CachePartialUpdateCheckedException.class); + + assert cause != null && cause.topologyVersion() != null : err; + + remapTopVer = + new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); + + err = null; + + Collection failedKeys = cause.failedKeys(); + + remapKeys = new ArrayList<>(failedKeys.size()); + + for (Object key : failedKeys) + remapKeys.add(cctx.toCacheKeyObject(key)); + + updVer = null; + } + } + } + + if (remapTopVer == null) { + err0 = err; + opRes0 = opRes; + } + else { + fut0 = topCompleteFut; + + topCompleteFut = null; + + cctx.mvcc().removeAtomicFuture(futVer); + + futVer = null; + topVer = AffinityTopologyVersion.ZERO; + } + } + } + + if (res.error() != null && res.failedKeys() == null) { + onDone(res.error()); return; } - KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + if (!nodeErr && res.remapKeys() == null) + updateNear(req, res); - if (op != TRANSFORM) - val = cctx.toCacheObject(val); + if (remapTopVer != null) { + if (fut0 != null) + fut0.onDone(); - ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); + if (!waitTopFut) { + onDone(new GridCacheTryPutFailedException()); + + return; + } + + if (topLocked) { + assert !F.isEmpty(remapKeys) : remapKeys; + + CachePartialUpdateCheckedException e = + new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + + ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( + "Failed to update keys, topology changed while execute atomic update inside transaction."); + + cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); + + e.add(remapKeys, cause); + + onDone(e); + + return; + } + + IgniteInternalFuture fut = cctx.affinity().affinityReadyFuture(remapTopVer); + + fut.listen(new CI1>() { + @Override public void apply(final IgniteInternalFuture fut) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + try { + AffinityTopologyVersion topVer = fut.get(); + + map(topVer); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + }); - if (primary == null) { + return; + } + + if (rcvAll) + onDone(opRes0, err0); + } + + /** + * @param req Request. + * @param e Error. + */ + void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { + synchronized (this) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + req.nodeId(), + req.futureVersion()); + + res.addFailedKeys(req.keys(), e); + + onResult(req.nodeId(), res, true); + } + } + + /** + * @param topVer Topology version. + */ + void map(AffinityTopologyVersion topVer) { + Collection topNodes = CU.affinityNodes(cctx, topVer); + + if (F.isEmpty(topNodes)) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + "left the grid).")); return; } - GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( - cctx.cacheId(), - primary.id(), - futVer, - fastMap, - updVer, - topVer, - topLocked, - syncMode, - op, - retval, - expiryPlc, - invokeArgs, - filter, - subjId, - taskNameHash, - skipStore, - cctx.kernalContext().clientNode()); + Exception err = null; + Map pendingMappings = null; - req.addUpdateEntry(cacheKey, - val, - conflictTtl, - conflictExpireTime, - conflictVer, - true); + int size = keys.size(); synchronized (this) { + assert futVer == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; + this.topVer = topVer; - single = true; + futVer = cctx.versions().next(topVer); + + if (storeFuture()) + cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this); + + // Assign version on near node in CLOCK ordering mode even if fastMap is false. + if (updVer == null) + updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; + + if (updVer != null && log.isDebugEnabled()) + log.debug("Assigned fast-map version for update on near node: " + updVer); + + try { + if (size == 1 && !fastMap) { + assert remapKeys == null || remapKeys.size() == 1; + + singleReq = mapSingleUpdate(); + } + else { + pendingMappings = mapUpdate(topNodes); + + if (pendingMappings.size() == 1) + singleReq = F.firstValue(pendingMappings); + else { + if (syncMode == PRIMARY_SYNC) { + mappings = U.newHashMap(pendingMappings.size()); + + for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { + if (req.hasPrimary()) + mappings.put(req.nodeId(), req); + } + } + else + mappings = new HashMap<>(pendingMappings); + + assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this; + } + } + + remapKeys = null; + } + catch (Exception e) { + err = e; + } + } + + if (err != null) { + onDone(err); + + return; } // Optimize mapping for single key. - mapSingle(primary.id(), req); + if (singleReq != null) + mapSingle(singleReq.nodeId(), singleReq); + else { + assert pendingMappings != null; - return; + if (size == 0) + onDone(new GridCacheReturn(cctx, true, null, true)); + else + doUpdate(pendingMappings); + } } - Iterator it = null; + /** + * @param topVer Topology version. + * @return Future. + */ + @Nullable synchronized GridFutureAdapter completeFuture(AffinityTopologyVersion topVer) { + if (this.topVer == AffinityTopologyVersion.ZERO) + return null; - if (vals != null) - it = vals.iterator(); + if (this.topVer.compareTo(topVer) < 0) { + if (topCompleteFut == null) + topCompleteFut = new GridFutureAdapter<>(); - Iterator conflictPutValsIt = null; + return topCompleteFut; + } - if (conflictPutVals != null) - conflictPutValsIt = conflictPutVals.iterator(); + return null; + } - Iterator conflictRmvValsIt = null; + /** + * @return Future version. + */ + GridCacheVersion onFutureDone() { + GridCacheVersion ver0; - if (conflictRmvVals != null) - conflictRmvValsIt = conflictRmvVals.iterator(); + GridFutureAdapter fut0; - Map pendingMappings = new HashMap<>(topNodes.size(), 1.0f); + synchronized (this) { + fut0 = topCompleteFut; - // Must do this in synchronized block because we need to atomically remove and add mapping. - // Otherwise checkComplete() may see empty intermediate state. - synchronized (this) { - if (oldNodeId != null) - removeMapping(oldNodeId); + topCompleteFut = null; - // For fastMap mode wait for all responses before remapping. - if (remap && fastMap && !mappings.isEmpty()) { - fastMapRemap = true; + ver0 = futVer; - return; + futVer = null; } - // Create mappings first, then send messages. - for (Object key : keys) { - if (key == null) { - NullPointerException err = new NullPointerException("Null key."); + if (fut0 != null) + fut0.onDone(); - onDone(err); + return ver0; + } - return; - } + /** + * @param topNodes Cache nodes. + * @return Mapping. + * @throws Exception If failed. + */ + private Map mapUpdate(Collection topNodes) throws Exception { + Iterator it = null; + + if (vals != null) + it = vals.iterator(); + + Iterator conflictPutValsIt = null; + + if (conflictPutVals != null) + conflictPutValsIt = conflictPutVals.iterator(); + + Iterator conflictRmvValsIt = null; + + if (conflictRmvVals != null) + conflictRmvValsIt = conflictRmvVals.iterator(); + + Map pendingMappings = U.newHashMap(topNodes.size()); + + // Create mappings first, then send messages. + for (Object key : keys) { + if (key == null) + throw new NullPointerException("Null key."); Object val; GridCacheVersion conflictVer; @@ -892,13 +944,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter conflictTtl = CU.TTL_NOT_CHANGED; conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; - if (val == null) { - NullPointerException err = new NullPointerException("Null value."); - - onDone(err); - - return; - } + if (val == null) + throw new NullPointerException("Null value."); } else if (conflictPutVals != null) { GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); @@ -934,22 +981,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter Collection affNodes = mapKey(cacheKey, topVer, fastMap); - if (affNodes.isEmpty()) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid).")); - - return; - } + if (affNodes.isEmpty()) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid)."); int i = 0; for (ClusterNode affNode : affNodes) { - if (affNode == null) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid).")); - - return; - } + if (affNode == null) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid)."); UUID nodeId = affNode.id(); @@ -976,11 +1017,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter cctx.kernalContext().clientNode()); pendingMappings.put(nodeId, mapped); - - GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped); - - assert old == null || (old != null && remap) : - "Invalid mapping state [old=" + old + ", remap=" + remap + ']'; } mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0); @@ -989,187 +1025,140 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter } } - this.topVer = topVer; - - fastMapRemap = false; - } - - if ((single == null || single) && pendingMappings.size() == 1) { - Map.Entry entry = F.first(pendingMappings.entrySet()); - - single = true; - - mapSingle(entry.getKey(), entry.getValue()); - - return; + return pendingMappings; } - else - single = false; - - doUpdate(pendingMappings); - } - - /** - * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near - * node and send updates in parallel to all participating nodes. - * - * @param key Key to map. - * @param topVer Topology version to map. - * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. - * @return Collection of nodes to which key is mapped. - */ - private Collection mapKey( - KeyCacheObject key, - AffinityTopologyVersion topVer, - boolean fastMap - ) { - GridCacheAffinityManager affMgr = cctx.affinity(); - // If we can send updates in parallel - do it. - return fastMap ? - cctx.topology().nodes(affMgr.partition(key), topVer) : - Collections.singletonList(affMgr.primary(key, topVer)); - } - - /** - * Maps future to single node. - * - * @param nodeId Node ID. - * @param req Request. - */ - private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { - singleNodeId = nodeId; - singleReq = req; - - if (cctx.localNodeId().equals(nodeId)) { - cache.updateAllAsyncInternal(nodeId, req, - new CI2() { - @Override public void apply(GridNearAtomicUpdateRequest req, - GridNearAtomicUpdateResponse res) { - assert res.futureVersion().equals(futVer) : futVer; + /** + * @return Request. + * @throws Exception If failed. + */ + private GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception { + Object key = F.first(keys); - onResult(res.nodeId(), res); - } - }); - } - else { - try { - if (log.isDebugEnabled()) - log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + Object val; + GridCacheVersion conflictVer; + long conflictTtl; + long conflictExpireTime; - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + if (vals != null) { + // Regular PUT. + val = F.first(vals); + conflictVer = null; + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + } + else if (conflictPutVals != null) { + // Conflict PUT. + GridCacheDrInfo conflictPutVal = F.first(conflictPutVals); - if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) - onDone(new GridCacheReturn(cctx, true, null, true)); + val = conflictPutVal.value(); + conflictVer = conflictPutVal.version(); + conflictTtl = conflictPutVal.ttl(); + conflictExpireTime = conflictPutVal.expireTime(); } - catch (IgniteCheckedException e) { - onDone(addFailedKeys(req.keys(), req.topologyVersion(), e)); + else if (conflictRmvVals != null) { + // Conflict REMOVE. + val = null; + conflictVer = F.first(conflictRmvVals); + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + } + else { + // Regular REMOVE. + val = null; + conflictVer = null; + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; } - } - } - - /** - * Sends messages to remote nodes and updates local cache. - * - * @param mappings Mappings to send. - */ - private void doUpdate(Map mappings) { - UUID locNodeId = cctx.localNodeId(); - GridNearAtomicUpdateRequest locUpdate = null; + // We still can get here if user pass map with single element. + if (key == null) + throw new NullPointerException("Null key."); - // Send messages to remote nodes first, then run local update. - for (GridNearAtomicUpdateRequest req : mappings.values()) { - if (locNodeId.equals(req.nodeId())) { - assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate + - ", req=" + req + ']'; + if (val == null && op != GridCacheOperation.DELETE) + throw new NullPointerException("Null value."); - locUpdate = req; - } - else { - try { - if (log.isDebugEnabled()) - log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); - } - catch (IgniteCheckedException e) { - addFailedKeys(req.keys(), req.topologyVersion(), e); + if (op != TRANSFORM) + val = cctx.toCacheObject(val); - removeMapping(req.nodeId()); - } + ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); - if (syncMode == PRIMARY_SYNC && !req.hasPrimary()) - removeMapping(req.nodeId()); - } - } + if (primary == null) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + + "left the grid)."); - if (syncMode == FULL_ASYNC) - // In FULL_ASYNC mode always return (null, true). - opRes = new GridCacheReturn(cctx, true, null, true); + GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( + cctx.cacheId(), + primary.id(), + futVer, + fastMap, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + cctx.kernalContext().clientNode()); - if (locUpdate != null) { - cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, - new CI2() { - @Override public void apply(GridNearAtomicUpdateRequest req, - GridNearAtomicUpdateResponse res) { - assert res.futureVersion().equals(futVer) : futVer; + req.addUpdateEntry(cacheKey, + val, + conflictTtl, + conflictExpireTime, + conflictVer, + true); - onResult(res.nodeId(), res); - } - }); + return req; } - checkComplete(); - } - - /** - * Removes mapping from future mappings map. - * - * @param nodeId Node ID to remove mapping for. - */ - private void removeMapping(UUID nodeId) { - mappings.remove(nodeId); - } - - /** - * @param ret Result from single node. - */ - @SuppressWarnings("unchecked") - private synchronized void addInvokeResults(GridCacheReturn ret) { - assert op == TRANSFORM : op; - assert ret.value() == null || ret.value() instanceof Map : ret.value(); - - if (ret.value() != null) { - if (opRes != null) - opRes.mergeEntryProcessResults(ret); - else - opRes = ret; + /** + * @param ret Result from single node. + */ + @SuppressWarnings("unchecked") + private void addInvokeResults(GridCacheReturn ret) { + assert op == TRANSFORM : op; + assert ret.value() == null || ret.value() instanceof Map : ret.value(); + + if (ret.value() != null) { + if (opRes != null) + opRes.mergeEntryProcessResults(ret); + else + opRes = ret; + } } - } - /** - * @param failedKeys Failed keys. - * @param topVer Topology version for failed update. - * @param err Error cause. - * @return Root {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException}. - */ - private synchronized IgniteCheckedException addFailedKeys(Collection failedKeys, - AffinityTopologyVersion topVer, - Throwable err) { - CachePartialUpdateCheckedException err0 = this.err; + /** + * @param failedKeys Failed keys. + * @param topVer Topology version for failed update. + * @param err Error cause. + */ + private void addFailedKeys(Collection failedKeys, + AffinityTopologyVersion topVer, + Throwable err) { + CachePartialUpdateCheckedException err0 = this.err; - if (err0 == null) - err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + if (err0 == null) + err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); - Collection keys = new ArrayList<>(failedKeys.size()); + Collection keys = new ArrayList<>(failedKeys.size()); - for (KeyCacheObject key : failedKeys) - keys.add(key.value(cctx.cacheObjectContext(), false)); + for (KeyCacheObject key : failedKeys) + keys.add(key.value(cctx.cacheObjectContext(), false)); - err0.add(keys, err, topVer); + err0.add(keys, err, topVer); + } - return err0; + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return S.toString(UpdateState.class, this); + } } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index 2064338..d56ed7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -128,7 +128,8 @@ public class GridCompoundFuture extends GridFutureAdapter { /** * @param ignoreChildFailures Flag indicating whether compound future should ignore child futures failures. */ - public void ignoreChildFailures(Class... ignoreChildFailures) { + @SafeVarargs + public final void ignoreChildFailures(Class... ignoreChildFailures) { this.ignoreChildFailures = ignoreChildFailures; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java index d5c5314..fc9dad0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java @@ -421,6 +421,7 @@ public final class X { * @return {@code True} if one of the causing exception is an instance of passed in classes, * {@code false} otherwise. */ + @SafeVarargs public static boolean hasCause(@Nullable Throwable t, @Nullable Class... cls) { if (t == null || F.isEmpty(cls)) return false; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java index f681e59..97590b2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.configuration.*; + /** - * Tests expire events when {@link GridCacheConfiguration#isEagerTtl()} is disabled. + * Tests expire events when {@link CacheConfiguration#isEagerTtl()} is disabled. */ public class IgniteCacheEntryListenerEagerTtlDisabledTest extends IgniteCacheEntryListenerTxTest { /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java new file mode 100644 index 0000000..1669404 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public abstract class CacheAsyncOperationsFailoverAbstractTest extends GridCacheAbstractSelfTest { + /** */ + private static final int NODE_CNT = 4; + + /** */ + private static final long TEST_TIME = 60_000; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return NODE_CNT; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIME + 60_000; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(gridName); + + ccfg.setCacheStoreFactory(null); + ccfg.setReadThrough(false); + ccfg.setWriteThrough(false); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** + * @throws Exception If failed. + */ + public void testPutAllAsyncFailover() throws Exception { + putAllAsyncFailover(5, 10); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllAsyncFailoverManyThreads() throws Exception { + putAllAsyncFailover(ignite(0).configuration().getSystemThreadPoolSize() * 2, 3); + } + + /** + * @throws Exception If failed. + */ + public void testAsyncFailover() throws Exception { + for (int i = 0; i < 3; i++) { + log.info("Iteration: " + i); + + startGrid(NODE_CNT); + + final IgniteCache cache = ignite(0).cache(null).withAsync(); + + int ops = cache.getConfiguration(CacheConfiguration.class).getMaxConcurrentAsyncOperations(); + + log.info("Max concurrent async operations: " + ops); + + assertTrue(ops > 0); + + final List> futs = Collections.synchronizedList(new ArrayList>(ops)); + + final AtomicInteger left = new AtomicInteger(ops); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Object call() throws Exception { + List> futs0 = new ArrayList<>(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (left.getAndDecrement() > 0) { + TreeMap map = new TreeMap<>(); + + int keys = 50; + + for (int k = 0; k < keys; k++) + map.put(new TestKey(rnd.nextInt(10_000)), new TestValue(k)); + + cache.putAll(map); + + IgniteFuture fut = cache.future(); + + assertNotNull(fut); + + futs0.add(fut); + } + + futs.addAll(futs0); + + return null; + } + }, 10, "put-thread"); + + stopGrid(NODE_CNT); + + assertEquals(ops, futs.size()); + + for (IgniteFuture fut : futs) + fut.get(); + } + } + + /** + * @param threads Number of threads. + * @param opsPerThread Number of concurrent async operations per thread. + * @throws Exception If failed. + */ + private void putAllAsyncFailover(final int threads, final int opsPerThread) throws Exception { + log.info("Start test [threads=" + threads + ", opsPerThread=" + opsPerThread + ']'); + + final AtomicBoolean finished = new AtomicBoolean(); + + final long endTime = System.currentTimeMillis() + TEST_TIME; + + IgniteInternalFuture restartFut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("restart-thread"); + + while (!finished.get() && System.currentTimeMillis() < endTime) { + startGrid(NODE_CNT); + + U.sleep(500); + + stopGrid(NODE_CNT); + } + + return null; + } + }); + + try { + final IgniteCache cache = ignite(0).cache(null).withAsync(); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Object call() throws Exception { + int iter = 0; + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + long time; + + long lastInfo = 0; + + while ((time = System.currentTimeMillis()) < endTime) { + if (time - lastInfo > 5000) + log.info("Starting operations [iter=" + iter + ']'); + + List> futs = new ArrayList<>(opsPerThread); + + for (int i = 0; i < opsPerThread; i++) { + TreeMap map = new TreeMap<>(); + + int keys = rnd.nextInt(1, 50); + + for (int k = 0; k < keys; k++) + map.put(new TestKey(rnd.nextInt(10_000)), new TestValue(iter)); + + cache.putAll(map); + + IgniteFuture fut = cache.future(); + + assertNotNull(fut); + + futs.add(fut); + } + + if (time - lastInfo > 5000) { + log.info("Waiting for futures [iter=" + iter + ']'); + + lastInfo = time; + } + + for (IgniteFuture fut : futs) + fut.get(); + + iter++; + } + + return null; + } + }, threads, "update-thread"); + + finished.set(true); + + restartFut.get(); + } + finally { + finished.set(true); + } + } + + /** + * + */ + private static class TestKey implements Serializable, Comparable { + /** */ + private long key; + + /** + * @param key Key. + */ + public TestKey(long key) { + this.key = key; + } + + /** + * @return Key. + */ + public long key() { + return key; + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull TestKey other) { + return ((Long)key).compareTo(other.key); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestKey other = (TestKey)o; + + return key == other.key; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(key ^ (key >>> 32)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestKey.class, this); + } + } + + /** + * + */ + private static class TestValue implements Serializable { + /** */ + private long val; + + /** + * @param val Value. + */ + public TestValue(long val) { + this.val = val; + } + + /** + * @return Value. + */ + public long value() { + return val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue other = (TestValue)o; + + return val == other.val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java new file mode 100644 index 0000000..6e01a4a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; + +/** + * + */ +public class CacheAsyncOperationsFailoverAtomicTest extends CacheAsyncOperationsFailoverAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java new file mode 100644 index 0000000..ba3ad7a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; + +/** + * + */ +public class CacheAsyncOperationsFailoverTxTest extends CacheAsyncOperationsFailoverAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java index 5d0cacc..caf4699 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -27,12 +27,14 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; +import javax.cache.processor.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.*; /** * @@ -46,7 +48,9 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr /** * @return Keys count for the test. */ - protected abstract int keysCount(); + private int keysCount() { + return 10_000; + } /** {@inheritDoc} */ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { @@ -54,7 +58,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr cfg.setAtomicWriteOrderMode(writeOrderMode()); cfg.setBackups(1); - cfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cfg.setRebalanceMode(SYNC); return cfg; } @@ -78,25 +82,47 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr protected CacheAtomicWriteOrderMode writeOrderMode() { return CLOCK; } + /** * @throws Exception If failed. */ public void testPut() throws Exception { - checkPut(false); + checkRetry(Test.PUT); + } + + /** + * @throws Exception If failed. + */ + public void testPutAll() throws Exception { + checkRetry(Test.PUT_ALL); } /** * @throws Exception If failed. */ public void testPutAsync() throws Exception { - checkPut(true); + checkRetry(Test.PUT_ASYNC); } /** - * @param async If {@code true} tests asynchronous put. * @throws Exception If failed. */ - private void checkPut(boolean async) throws Exception { + public void testInvoke() throws Exception { + checkRetry(Test.INVOKE); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAll() throws Exception { + checkRetry(Test.INVOKE_ALL); + } + + /** + * @param test Test type. + * @throws Exception If failed. + */ + private void checkRetry(Test test) throws Exception { final AtomicBoolean finished = new AtomicBoolean(); int keysCnt = keysCount(); @@ -115,52 +141,151 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr } }); + IgniteCache cache = ignite(0).cache(null); - IgniteCache cache = ignite(0).cache(null); + int iter = 0; - if (atomicityMode() == ATOMIC) - assertEquals(writeOrderMode(), cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); + try { + if (atomicityMode() == ATOMIC) + assertEquals(writeOrderMode(), cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); - int iter = 0; + long stopTime = System.currentTimeMillis() + 60_000; - long stopTime = System.currentTimeMillis() + 60_000; + switch (test) { + case PUT: { + while (System.currentTimeMillis() < stopTime) { + Integer val = ++iter; - if (async) { - IgniteCache cache0 = cache.withAsync(); + for (int i = 0; i < keysCnt; i++) + cache.put(i, val); - while (System.currentTimeMillis() < stopTime) { - Integer val = ++iter; + for (int i = 0; i < keysCnt; i++) + assertEquals(val, cache.get(i)); + } - for (int i = 0; i < keysCnt; i++) { - cache0.put(i, val); + break; + } + + case PUT_ALL: { + while (System.currentTimeMillis() < stopTime) { + Integer val = ++iter; + + Map map = new LinkedHashMap<>(); + + for (int i = 0; i < keysCnt; i++) { + map.put(i, val); + + if (map.size() == 100 || i == keysCnt - 1) { + cache.putAll(map); + + map.clear(); + } + } - cache0.future().get(); + for (int i = 0; i < keysCnt; i++) + assertEquals(val, cache.get(i)); + } } - for (int i = 0; i < keysCnt; i++) { - cache0.get(i); + case PUT_ASYNC: { + IgniteCache cache0 = cache.withAsync(); + + while (System.currentTimeMillis() < stopTime) { + Integer val = ++iter; + + for (int i = 0; i < keysCnt; i++) { + cache0.put(i, val); + + cache0.future().get(); + } + + for (int i = 0; i < keysCnt; i++) { + cache0.get(i); + + assertEquals(val, cache0.future().get()); + } + } - assertEquals(val, cache0.future().get()); + break; } - } - } - else { - while (System.currentTimeMillis() < stopTime) { - Integer val = ++iter; - for (int i = 0; i < keysCnt; i++) - cache.put(i, val); + case INVOKE: { + while (System.currentTimeMillis() < stopTime) { + Integer val = ++iter; + + Integer expOld = iter - 1; + + for (int i = 0; i < keysCnt; i++) { + Integer old = cache.invoke(i, new SetEntryProcessor(val)); + + assertNotNull(old); + assertTrue(old.equals(expOld) || old.equals(val)); + } + + for (int i = 0; i < keysCnt; i++) + assertEquals(val, cache.get(i)); + } + + break; + } - for (int i = 0; i < keysCnt; i++) - assertEquals(val, cache.get(i)); + case INVOKE_ALL: { + while (System.currentTimeMillis() < stopTime) { + Integer val = ++iter; + + Integer expOld = iter - 1; + + Set keys = new LinkedHashSet<>(); + + for (int i = 0; i < keysCnt; i++) { + keys.add(i); + + if (keys.size() == 100 || i == keysCnt - 1) { + Map> resMap = + cache.invokeAll(keys, new SetEntryProcessor(val)); + + for (Integer key : keys) { + EntryProcessorResult res = resMap.get(key); + + assertNotNull(res); + + Integer old = res.get(); + + assertTrue(old.equals(expOld) || old.equals(val)); + } + + assertEquals(keys.size(), resMap.size()); + + keys.clear(); + } + } + + for (int i = 0; i < keysCnt; i++) + assertEquals(val, cache.get(i)); + } + + break; + } + + default: + assert false : test; } } - - finished.set(true); - fut.get(); + finally { + finished.set(true); + fut.get(); + } for (int i = 0; i < keysCnt; i++) - assertEquals(iter, cache.get(i)); + assertEquals((Integer)iter, cache.get(i)); + + for (int i = 0; i < gridCount(); i++) { + IgniteKernal ignite = (IgniteKernal)grid(i); + + Collection futs = ignite.context().cache().context().mvcc().atomicFutures(); + + assertTrue("Unexpected atomic futures: " + futs, futs.isEmpty()); + } } /** @@ -201,34 +326,41 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr try { int keysCnt = keysCount(); - boolean eThrown = false; + boolean eThrown = false; - IgniteCache cache = ignite(0).cache(null).withNoRetries(); + IgniteCache cache = ignite(0).cache(null).withNoRetries(); - if (async) - cache = cache.withAsync(); + if (async) + cache = cache.withAsync(); - for (int i = 0; i < keysCnt; i++) { - try { - if (async) { - cache.put(i, i); + long stopTime = System.currentTimeMillis() + 60_000; - cache.future().get(); + while (System.currentTimeMillis() < stopTime) { + for (int i = 0; i < keysCnt; i++) { + try { + if (async) { + cache.put(i, i); + + cache.future().get(); + } + else + cache.put(i, i); + } + catch (Exception e) { + assertTrue("Invalid exception: " + e, + X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class)); + + eThrown = true; + + break; + } } - else - cache.put(i, i); - } - catch (Exception e) { - assertTrue("Invalid exception: " + e, - X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class)); - - eThrown = true; + if (eThrown) break; - } } - assertTrue(eThrown); + assertTrue(eThrown); finished.set(true); @@ -243,4 +375,48 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr @Override protected long getTestTimeout() { return 3 * 60 * 1000; } + + /** + * + */ + enum Test { + /** */ + PUT, + + /** */ + PUT_ALL, + + /** */ + PUT_ASYNC, + + /** */ + INVOKE, + + /** */ + INVOKE_ALL + } + + /** + * + */ + class SetEntryProcessor implements CacheEntryProcessor { + /** */ + private Integer val; + + /** + * @param val Value. + */ + public SetEntryProcessor(Integer val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public Integer process(MutableEntry e, Object... args) { + Integer old = e.getValue(); + + e.setValue(val); + + return old == null ? 0 : old; + } + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java index e76663a..be442d2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java @@ -16,7 +16,21 @@ */ package org.apache.ignite.internal.processors.cache.distributed.dht; +import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.transactions.*; + +import javax.cache.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; /** * @@ -24,11 +38,66 @@ import org.apache.ignite.cache.*; public class IgniteCachePutRetryAtomicSelfTest extends IgniteCachePutRetryAbstractSelfTest { /** {@inheritDoc} */ @Override protected CacheAtomicityMode atomicityMode() { - return CacheAtomicityMode.ATOMIC; + return ATOMIC; } - /** {@inheritDoc} */ - @Override protected int keysCount() { - return 60_000; + /** + * @throws Exception If failed. + */ + public void testPutInsideTransaction() throws Exception { + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setName("tx-cache"); + ccfg.setAtomicityMode(TRANSACTIONAL); + + try (IgniteCache txCache = ignite(0).getOrCreateCache(ccfg)) { + final AtomicBoolean finished = new AtomicBoolean(); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + while (!finished.get()) { + stopGrid(3); + + U.sleep(300); + + startGrid(3); + } + + return null; + } + }); + + try { + IgniteTransactions txs = ignite(0).transactions(); + + IgniteCache cache = ignite(0).cache(null); + + long stopTime = System.currentTimeMillis() + 60_000; + + while (System.currentTimeMillis() < stopTime) { + for (int i = 0; i < 10_000; i++) { + try { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + txCache.put(0, 0); + + cache.put(i, i); + + tx.commit(); + } + } + catch (IgniteException | CacheException e) { + log.info("Ignore exception: " + e); + } + } + } + + finished.set(true); + + fut.get(); + } + finally { + finished.set(true); + } + } } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index 0ab5729..e113fcc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -32,6 +33,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; import static org.apache.ignite.transactions.TransactionIsolation.*; @@ -44,12 +46,12 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr /** {@inheritDoc} */ @Override protected CacheAtomicityMode atomicityMode() { - return CacheAtomicityMode.TRANSACTIONAL; + return TRANSACTIONAL; } /** {@inheritDoc} */ - @Override protected int keysCount() { - return 20_000; + @Override protected NearCacheConfiguration nearConfiguration() { + return null; } /** @@ -74,7 +76,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr } }); - int keysCnt = keysCount(); + final int keysCnt = 20_000; try { for (int i = 0; i < keysCnt; i++) @@ -90,6 +92,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") public void testExplicitTransactionRetries() throws Exception { final AtomicInteger idx = new AtomicInteger(); int threads = 8; @@ -97,8 +100,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr final AtomicReferenceArray err = new AtomicReferenceArray<>(threads); IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { - @Override - public Object call() throws Exception { + @Override public Object call() throws Exception { int th = idx.getAndIncrement(); int base = th * FACTOR; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java index 68c7fbb..f556023 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java @@ -26,11 +26,6 @@ import static org.apache.ignite.cache.CacheAtomicityMode.*; */ public class IgniteCacheAtomicReplicatedNodeRestartSelfTest extends GridCacheReplicatedNodeRestartSelfTest { /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1124"); - } - - /** {@inheritDoc} */ @Override protected CacheAtomicityMode atomicityMode() { return ATOMIC; } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java index 5b9af4f..5bb1706 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java @@ -22,6 +22,8 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.testframework.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; + /** * */ @@ -37,11 +39,6 @@ public class IgniteCacheSslStartStopSelfTest extends IgniteCachePutRetryAbstract /** {@inheritDoc} */ @Override protected CacheAtomicityMode atomicityMode() { - return CacheAtomicityMode.ATOMIC; - } - - /** {@inheritDoc} */ - @Override protected int keysCount() { - return 60_000; + return ATOMIC; } } diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java index f3fac23..ba510f3 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java @@ -19,6 +19,7 @@ package org.apache.ignite.testsuites; import junit.framework.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; @@ -48,6 +49,9 @@ public class IgniteCacheFailoverTestSuite2 { suite.addTestSuite(IgniteCacheCrossCacheTxFailoverTest.class); + suite.addTestSuite(CacheAsyncOperationsFailoverAtomicTest.class); + suite.addTestSuite(CacheAsyncOperationsFailoverTxTest.class); + return suite; } } -- 1.8.1.msysgit.1