diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 30e3bc7..31cfa1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1862,7 +1862,7 @@ public abstract class GridCacheAdapter implements GridCache, } } - if (!skipVals && misses != null && readThrough && ctx.readThrough()) { + if (!skipVals && misses != null && readThrough && ctx.readThrough() && !ctx.skipStore()) { final Map loadKeys = misses; final IgniteTxLocalAdapter tx0 = tx; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index b78e7e0..8e689f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -153,10 +153,9 @@ public abstract class GridDistributedCacheAdapter extends GridCacheAdapter // Send job to all data nodes. Collection nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); - if (!nodes.isEmpty()) { + if (!nodes.isEmpty()) ctx.closures().callAsyncNoFailover(BROADCAST, - new GlobalRemoveAllCallable<>(name(), topVer), nodes, true).get(); - } + new GlobalRemoveAllCallable<>(name(), topVer, ctx.skipStore()), nodes, true).get(); } while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) > 0); } @@ -186,7 +185,7 @@ public abstract class GridDistributedCacheAdapter extends GridCacheAdapter if (!nodes.isEmpty()) { IgniteInternalFuture rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST, - new GlobalRemoveAllCallable<>(name(), topVer), nodes, true); + new GlobalRemoveAllCallable<>(name(), topVer, ctx.skipStore()), nodes, true); rmvFut.listen(new IgniteInClosure>() { @Override public void apply(IgniteInternalFuture fut) { @@ -241,6 +240,9 @@ public abstract class GridDistributedCacheAdapter extends GridCacheAdapter /** Topology version. */ private AffinityTopologyVersion topVer; + /** Skip store flag. */ + private boolean skipStore; + /** Injected grid instance. */ @IgniteInstanceResource private Ignite ignite; @@ -256,9 +258,10 @@ public abstract class GridDistributedCacheAdapter extends GridCacheAdapter * @param cacheName Cache name. * @param topVer Topology version. */ - private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer) { + private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) { this.cacheName = cacheName; this.topVer = topVer; + this.skipStore = skipStore; } /** @@ -291,6 +294,8 @@ public abstract class GridDistributedCacheAdapter extends GridCacheAdapter (DataStreamerImpl)ignite.dataStreamer(cacheName)) { ((DataStreamerImpl)dataLdr).maxRemapCount(0); + dataLdr.skipStore(skipStore); + dataLdr.receiver(DataStreamerCacheUpdaters.batched()); for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) { @@ -333,12 +338,14 @@ public abstract class GridDistributedCacheAdapter extends GridCacheAdapter @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, cacheName); out.writeObject(topVer); + out.writeBoolean(skipStore); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { cacheName = U.readString(in); topVer = (AffinityTopologyVersion)in.readObject(); + skipStore = in.readBoolean(); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index 37b34e7..51eec3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -88,6 +88,12 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { private boolean partLock; /** + * Additional flags. + * Bit 1 - for skipStore flag value. + */ + private byte flags; + + /** * Empty constructor. */ public GridDistributedLockRequest() { @@ -110,6 +116,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { * @param grpLockKey Group lock key if this is a group-lock transaction. * @param partLock {@code True} if this is a group-lock transaction request and whole partition is * locked. + * @param skipStore Skip store flag. */ public GridDistributedLockRequest( int cacheId, @@ -126,7 +133,8 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { int keyCnt, int txSize, @Nullable IgniteTxKey grpLockKey, - boolean partLock + boolean partLock, + boolean skipStore ) { super(lockVer, keyCnt); @@ -149,6 +157,8 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { this.partLock = partLock; retVals = new boolean[keyCnt]; + + skipStore(skipStore); } /** @@ -218,6 +228,20 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { } /** + * Sets skip store flag value. + * + * @param skipStore Skip store flag. + */ + private void skipStore(boolean skipStore){ + flags = skipStore ? (byte)(flags | 0x1) : (byte)(flags & 0xFE); + } + + /** + * @return Skip store flag. + */ + public boolean skipStore() { return (flags & 0x1) == 1; }; + + /** * @return Transaction isolation or null if not in transaction. */ public TransactionIsolation isolation() { @@ -334,84 +358,90 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { switch (writer.state()) { case 8: - if (!writer.writeIgniteUuid("futId", futId)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 9: - if (!writer.writeMessage("grpLockKey", grpLockKey)) + if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); case 10: - if (!writer.writeBoolean("isInTx", isInTx)) + if (!writer.writeMessage("grpLockKey", grpLockKey)) return false; writer.incrementState(); case 11: - if (!writer.writeBoolean("isInvalidate", isInvalidate)) + if (!writer.writeBoolean("isInTx", isInTx)) return false; writer.incrementState(); case 12: - if (!writer.writeBoolean("isRead", isRead)) + if (!writer.writeBoolean("isInvalidate", isInvalidate)) return false; writer.incrementState(); case 13: - if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) + if (!writer.writeBoolean("isRead", isRead)) return false; writer.incrementState(); case 14: - if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) + if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) return false; writer.incrementState(); case 15: - if (!writer.writeMessage("nearXidVer", nearXidVer)) + if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 16: - if (!writer.writeUuid("nodeId", nodeId)) + if (!writer.writeMessage("nearXidVer", nearXidVer)) return false; writer.incrementState(); case 17: - if (!writer.writeBoolean("partLock", partLock)) + if (!writer.writeUuid("nodeId", nodeId)) return false; writer.incrementState(); case 18: - if (!writer.writeBooleanArray("retVals", retVals)) + if (!writer.writeBoolean("partLock", partLock)) return false; writer.incrementState(); case 19: - if (!writer.writeLong("threadId", threadId)) + if (!writer.writeBooleanArray("retVals", retVals)) return false; writer.incrementState(); case 20: - if (!writer.writeLong("timeout", timeout)) + if (!writer.writeLong("threadId", threadId)) return false; writer.incrementState(); case 21: + if (!writer.writeLong("timeout", timeout)) + return false; + + writer.incrementState(); + + case 22: if (!writer.writeInt("txSize", txSize)) return false; @@ -434,7 +464,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { switch (reader.state()) { case 8: - futId = reader.readIgniteUuid("futId"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -442,7 +472,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); case 9: - grpLockKey = reader.readMessage("grpLockKey"); + futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) return false; @@ -450,7 +480,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); case 10: - isInTx = reader.readBoolean("isInTx"); + grpLockKey = reader.readMessage("grpLockKey"); if (!reader.isLastRead()) return false; @@ -458,7 +488,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); case 11: - isInvalidate = reader.readBoolean("isInvalidate"); + isInTx = reader.readBoolean("isInTx"); if (!reader.isLastRead()) return false; @@ -466,7 +496,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); case 12: - isRead = reader.readBoolean("isRead"); + isInvalidate = reader.readBoolean("isInvalidate"); if (!reader.isLastRead()) return false; @@ -474,6 +504,14 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); case 13: + isRead = reader.readBoolean("isRead"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 14: byte isolationOrd; isolationOrd = reader.readByte("isolation"); @@ -485,7 +523,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 14: + case 15: keys = reader.readCollection("keys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -493,7 +531,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 15: + case 16: nearXidVer = reader.readMessage("nearXidVer"); if (!reader.isLastRead()) @@ -501,7 +539,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 16: + case 17: nodeId = reader.readUuid("nodeId"); if (!reader.isLastRead()) @@ -509,7 +547,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 17: + case 18: partLock = reader.readBoolean("partLock"); if (!reader.isLastRead()) @@ -517,7 +555,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 18: + case 19: retVals = reader.readBooleanArray("retVals"); if (!reader.isLastRead()) @@ -525,7 +563,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 19: + case 20: threadId = reader.readLong("threadId"); if (!reader.isLastRead()) @@ -533,7 +571,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 20: + case 21: timeout = reader.readLong("timeout"); if (!reader.isLastRead()) @@ -541,7 +579,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 21: + case 22: txSize = reader.readInt("txSize"); if (!reader.isLastRead()) @@ -561,7 +599,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 22; + return 23; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 82e7f83..f247c0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -830,7 +830,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture it = dhtMapping.listIterator(); it.hasNext();) { @@ -956,7 +957,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture extends GridDhtCach txKey, null, null, - req.accessTtl()); + req.accessTtl(), + req.skipStore()); if (req.groupLock()) tx.groupLockKey(txKey); @@ -840,7 +841,8 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach req.messageId(), req.txRead(), req.needReturnValue(), - req.accessTtl()); + req.accessTtl(), + req.skipStore()); final GridDhtTxLocal t = tx; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 8b70f59..1bfc3d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -530,6 +530,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param msgId Message ID. * @param read Read flag. * @param accessTtl TTL for read operation. + * @param skipStore Skip store flag. * @return Lock future. */ @SuppressWarnings("ForLoopReplaceableByForEach") @@ -540,7 +541,8 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { long msgId, final boolean read, final boolean needRetVal, - long accessTtl + long accessTtl, + boolean skipStore ) { try { checkValid(); @@ -593,6 +595,8 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { -1L, null); + txEntry.skipStore(skipStore); + if (read) txEntry.ttl(accessTtl); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index ffbc0a2..2673ed5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -286,13 +286,15 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { * @param val Value. * @param entryProcessors Entry processors. * @param ttl TTL. + * @param skipStore Skip store flag. */ public void addWrite(GridCacheContext cacheCtx, GridCacheOperation op, IgniteTxKey key, @Nullable CacheObject val, @Nullable Collection, Object[]>> entryProcessors, - long ttl) { + long ttl, + boolean skipStore) { checkInternal(key); if (isSystemInvalidate()) @@ -307,7 +309,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { ttl, -1L, cached, - null); + null, + skipStore); txEntry.entryProcessors(entryProcessors); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index da2105d..ec58075 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -372,7 +372,8 @@ public class GridPartitionedGetFuture extends GridCompoundIdentityFuture extends GridDhtCacheAdapter { GridCacheReturn retVal = null; + //TODO: support skipStore for putAll if (keys.size() > 1 && // Several keys ... - writeThrough() && // and store is enabled ... + writeThrough() && !req.skipStore() && // and store is enabled ... !ctx.store().isLocal() && // and this is not local store ... !ctx.dr().receiveEnabled() // and no DR. ) { @@ -1703,7 +1704,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { op, writeVal, req.invokeArguments(), - primary && writeThrough(), + primary && writeThrough() && !req.skipStore(), req.returnValue(), expiry, true, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ac4ae2c2..547d156 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -585,7 +585,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter invokeArgs, filter, subjId, - taskNameHash); + taskNameHash, + cctx.skipStore()); req.addUpdateEntry(cacheKey, val, @@ -707,7 +708,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter invokeArgs, filter, subjId, - taskNameHash); + taskNameHash, + cctx.skipStore()); pendingMappings.put(nodeId, mapped); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 3f68a46..e5229bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -126,6 +126,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** Task name hash. */ private int taskNameHash; + /** Skip write-through to a persistent storage. */ + private boolean skipStore; + /** * Empty constructor required by {@link Externalizable}. */ @@ -150,6 +153,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri * @param filter Optional filter for atomic check. * @param subjId Subject ID. * @param taskNameHash Task name hash code. + * @param skipStore Skip write-through to a persistent storage. */ public GridNearAtomicUpdateRequest( int cacheId, @@ -165,7 +169,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri @Nullable Object[] invokeArgs, @Nullable CacheEntryPredicate[] filter, @Nullable UUID subjId, - int taskNameHash + int taskNameHash, + boolean skipStore ) { this.cacheId = cacheId; this.nodeId = nodeId; @@ -182,6 +187,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri this.filter = filter; this.subjId = subjId; this.taskNameHash = taskNameHash; + this.skipStore = skipStore; keys = new ArrayList<>(); } @@ -276,6 +282,11 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** + * @return Skip write-through to a persistent storage. + */ + public boolean skipStore() {return skipStore;} + + /** * @param key Key to add. * @param val Optional update value. * @param conflictTtl Conflict TTL (optional). @@ -627,36 +638,42 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); case 16: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("skipStore", skipStore)) return false; writer.incrementState(); case 17: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 18: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 19: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("updateVer", updateVer)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 21: + if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + + case 22: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; @@ -787,7 +804,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 16: - subjId = reader.readUuid("subjId"); + skipStore = reader.readBoolean("skipStore"); if (!reader.isLastRead()) return false; @@ -795,6 +812,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 17: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 18: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -806,7 +831,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 18: + case 19: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -814,7 +839,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 19: + case 20: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -822,7 +847,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 20: + case 21: updateVer = reader.readMessage("updateVer"); if (!reader.isLastRead()) @@ -830,7 +855,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 21: + case 22: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 7b05065..bb464d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -732,7 +732,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentity inTx() && tx.partitionLock(), inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, - read ? accessTtl : -1L); + read ? accessTtl : -1L, + cctx.skipStore()); mapping.request(req); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index b1017d8..ece64a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -372,7 +372,8 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture 0; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index bfd0177..7071597 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -311,7 +311,12 @@ public class GridNearTransactionalCache extends GridNearCacheAdapter "(transaction has been completed): " + req.version()); } - tx.addEntry(ctx, txKey, GridCacheOperation.NOOP, /*Value.*/null, /*dr version*/null); + tx.addEntry(ctx, + txKey, + GridCacheOperation.NOOP, + null /*Value.*/, + null /*dr version*/, + req.skipStore()); } // Add remote candidate before reordering. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index 8c597e2..b6b6017 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -326,6 +326,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { * @param key Key to add to read set. * @param val Value. * @param drVer Data center replication version. + * @param skipStore Skip store flag. * @throws IgniteCheckedException If failed. * @return {@code True} if entry has been enlisted. */ @@ -334,7 +335,8 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { IgniteTxKey key, GridCacheOperation op, CacheObject val, - @Nullable GridCacheVersion drVer + @Nullable GridCacheVersion drVer, + boolean skipStore ) throws IgniteCheckedException { checkInternal(key); @@ -366,7 +368,8 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { -1L, -1L, cached, - drVer); + drVer, + skipStore); writeMap.put(key, txEntry); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 95d3527..05d660c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -124,7 +124,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { @GridDirectTransient private AtomicBoolean prepared = new AtomicBoolean(); - /** Lock flag for colocated cache. */ + /** Lock flag for collocated cache. */ @GridDirectTransient private transient boolean locked; @@ -151,6 +151,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { private byte[] expiryPlcBytes; /** + * Additional flags. + * Bit 1 - for skipStore flag value. + */ + private byte flags; + + /** * Required by {@link Externalizable} */ public IgniteTxEntry() { @@ -168,6 +174,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { * @param conflictExpireTime DR expire time. * @param entry Cache entry. * @param conflictVer Data center replication version. + * @param skipStore Skip store flag. */ public IgniteTxEntry(GridCacheContext ctx, IgniteInternalTx tx, @@ -176,7 +183,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { long ttl, long conflictExpireTime, GridCacheEntryEx entry, - @Nullable GridCacheVersion conflictVer) { + @Nullable GridCacheVersion conflictVer, + boolean skipStore) { assert ctx != null; assert tx != null; assert op != null; @@ -190,12 +198,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { this.conflictExpireTime = conflictExpireTime; this.conflictVer = conflictVer; + skipStore(skipStore); + key = entry.key(); cacheId = entry.context().cacheId(); } - /** + /** * This constructor is meant for local transactions. * * @param ctx Cache registry. @@ -208,6 +218,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { * @param entry Cache entry. * @param filters Put filters. * @param conflictVer Data center replication version. + * @param skipStore Skip store flag. */ public IgniteTxEntry(GridCacheContext ctx, IgniteInternalTx tx, @@ -218,7 +229,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { long ttl, GridCacheEntryEx entry, CacheEntryPredicate[] filters, - GridCacheVersion conflictVer) { + GridCacheVersion conflictVer, + boolean skipStore) { assert ctx != null; assert tx != null; assert op != null; @@ -232,6 +244,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { this.filters = filters; this.conflictVer = conflictVer; + skipStore(skipStore); + if (entryProcessor != null) addEntryProcessor(entryProcessor, invokeArgs); @@ -404,6 +418,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { } /** + * Sets skip store flag value. + * + * @param skipStore Skip store flag. + */ + public void skipStore(boolean skipStore){ + flags = skipStore ? (byte)(flags | 0x1) : (byte)(flags & 0xFE); + } + + /** + * @return Skip store flag. + */ + public boolean skipStore() { return (flags & 0x1) == 1; }; + + /** * @return Tx key. */ public IgniteTxKey txKey() { @@ -813,30 +841,36 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { writer.incrementState(); case 6: - if (!writer.writeBoolean("grpLock", grpLock)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 7: - if (!writer.writeMessage("key", key)) + if (!writer.writeBoolean("grpLock", grpLock)) return false; writer.incrementState(); case 8: - if (!writer.writeByteArray("transformClosBytes", transformClosBytes)) + if (!writer.writeMessage("key", key)) return false; writer.incrementState(); case 9: - if (!writer.writeLong("ttl", ttl)) + if (!writer.writeByteArray("transformClosBytes", transformClosBytes)) return false; writer.incrementState(); case 10: + if (!writer.writeLong("ttl", ttl)) + return false; + + writer.incrementState(); + + case 11: if (!writer.writeMessage("val", val)) return false; @@ -902,9 +936,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { return false; reader.incrementState(); - + case 6: - grpLock = reader.readBoolean("grpLock"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -912,7 +946,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 7: - key = reader.readMessage("key"); + grpLock = reader.readBoolean("grpLock"); if (!reader.isLastRead()) return false; @@ -920,7 +954,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 8: - transformClosBytes = reader.readByteArray("transformClosBytes"); + key = reader.readMessage("key"); if (!reader.isLastRead()) return false; @@ -928,7 +962,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 9: - ttl = reader.readLong("ttl"); + transformClosBytes = reader.readByteArray("transformClosBytes"); if (!reader.isLastRead()) return false; @@ -936,6 +970,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 10: + ttl = reader.readLong("ttl"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: val = reader.readMessage("val"); if (!reader.isLastRead()) @@ -955,7 +997,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 12; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 29eabf3..8a1f8bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -506,7 +506,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter boolean skipNear = near() && store.isWriteToStoreFromDht(); for (IgniteTxEntry e : writeEntries) { - if (skipNear && e.cached().isNear()) + if ((skipNear && e.cached().isNear()) || e.skipStore()) continue; boolean intercept = e.context().config().getInterceptor() != null; @@ -1709,6 +1709,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter deserializePortable, false); } + else if (txEntry.skipStore()) { + missed.remove(cacheKey); + } // Even though we bring the value back from lock acquisition, // we still need to recheck primary node for consistent values @@ -3229,7 +3232,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter hasDrTtl ? drTtl : -1L, entry, filter, - drVer); + drVer, + entry.context().skipStore()); txEntry.conflictExpireTime(drExpireTime); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 30c2515..c48321e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -4267,9 +4267,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @throws Exception If failed. */ public void testWithSkipStore() throws Exception { - if (gridCount() > 1) // TODO IGNITE-656 (test primary/backup/near keys with multiple nodes). - return; - IgniteCache cache = grid(0).cache(null); IgniteCache cacheSkipStore = cache.withSkipStore(); @@ -4293,6 +4290,106 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertNotNull(cache.get(key)); } + cache.removeAll(new HashSet<>(keys)); + + for (String key : keys) + assertNull(cache.get(key)); + + // Put/remove data from multiple nodes. + + keys = new ArrayList<>(1000); + + for (int i = 0; i < 1000; i++) + keys.add("key_" + i); + + for (int i = 0; i < keys.size(); ++i) + cache.put(keys.get(i), i); + + for (String key : keys) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertTrue(map.containsKey(key)); + } + + for (String key : keys) { + cacheSkipStore.remove(key); + + assertNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertTrue(map.containsKey(key)); + } + + for (String key : keys) { + cache.remove(key); + + assertNull(cacheSkipStore.get(key)); + assertNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + assertFalse(cacheSkipStore.iterator().hasNext()); + assertTrue(map.size() == 0); + assertTrue(cache.size(CachePeekMode.ALL) == 0); + + // putAll/removeAll from multiple nodes. + + Map data = new HashMap<>(); + + for (int i = 0; i < keys.size(); i++) + data.put(keys.get(i), i); + + cacheSkipStore.putAll(data); + + for (String key : keys) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + cache.putAll(data); + + for (String key : keys) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertTrue(map.containsKey(key)); + } + + cacheSkipStore.removeAll(data.keySet()); + + for (String key : keys) { + assertNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertTrue(map.containsKey(key)); + } + + cacheSkipStore.putAll(data); + + for (String key : keys) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertTrue(map.containsKey(key)); + } + + cacheSkipStore.removeAll(data.keySet()); + + for (String key : keys) { + assertNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertTrue(map.containsKey(key)); + } + + cache.removeAll(data.keySet()); + + for (String key : keys) { + assertNull(cacheSkipStore.get(key)); + assertNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + assertTrue(map.size() == 0); + + // Miscellaneous checks + String newKey = "New key"; assertFalse(map.containsKey(newKey)); @@ -4311,7 +4408,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract Cache.Entry entry = it.next(); - String rmvKey = entry.getKey(); + String rmvKey = entry.getKey(); assertTrue(map.containsKey(rmvKey)); @@ -4320,6 +4417,241 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertNull(cacheSkipStore.get(rmvKey)); assertTrue(map.containsKey(rmvKey)); + + assertTrue(cache.size(CachePeekMode.ALL) == 0); + assertTrue(cacheSkipStore.size(CachePeekMode.ALL) == 0); + + cache.remove(rmvKey); + + assertTrue(map.size() == 0); + + if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) { + checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, TransactionConcurrency.OPTIMISTIC, + TransactionIsolation.READ_COMMITTED); + checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, TransactionConcurrency.OPTIMISTIC, + TransactionIsolation.REPEATABLE_READ); + checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, TransactionConcurrency.OPTIMISTIC, + TransactionIsolation.SERIALIZABLE); + + checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, TransactionConcurrency.PESSIMISTIC, + TransactionIsolation.READ_COMMITTED); + checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, TransactionConcurrency.PESSIMISTIC, + TransactionIsolation.REPEATABLE_READ); + checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, TransactionConcurrency.PESSIMISTIC, + TransactionIsolation.SERIALIZABLE); + } + } + + /** + * @throws Exception If failed. + */ + public void testWithSkipStoreRemoveAll() throws Exception { + if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL || + (atomicityMode() == CacheAtomicityMode.ATOMIC && nearEnabled())) + fail("https://issues.apache.org/jira/browse/IGNITE-373"); + + IgniteCache cache = grid(0).cache(null); + + IgniteCache cacheSkipStore = cache.withSkipStore(); + + Map data = new HashMap<>(); + + for (int i = 0; i < 100; i++) + data.put("key_" + i, i); + + cache.putAll(data); + + for (String key : data.keySet()) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertTrue(map.containsKey(key)); + } + + cacheSkipStore.removeAll(); + + for (String key : data.keySet()) { + assertNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertTrue(map.containsKey(key)); + } + + cache.removeAll(); + + for (String key : data.keySet()) { + assertNull(cacheSkipStore.get(key)); + assertNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + } + + /** + * @param cache Cache instance. + * @param cacheSkipStore Cache skip store projection. + * @param data Data set. + * @param keys Keys list. + * @param txConcurrency Concurrency mode. + * @param txIsolation Isolation mode. + * + * @throws Exception If failed. + */ + private void checkSkipStoreWithTransaction(IgniteCache cache, + IgniteCache cacheSkipStore, + Map data, + List keys, + TransactionConcurrency txConcurrency, + TransactionIsolation txIsolation) throws Exception { + + cache.removeAll(data.keySet()); + checkEmpty(cache, cacheSkipStore); + + IgniteTransactions txs = grid(0).transactions(); + + // Several put check + Transaction tx = txs.txStart(txConcurrency, txIsolation); + + for (int i = 0; i < keys.size(); i++) + cacheSkipStore.put(keys.get(i), i); + + for (String key: keys) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + tx.commit(); + + // cacheSkipStore putAll(..)/removeAll(..) check + tx = txs.txStart(txConcurrency, txIsolation); + + cacheSkipStore.putAll(data); + + for (String key: keys) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + cacheSkipStore.removeAll(data.keySet()); + + for (String key: keys) { + assertNull(cacheSkipStore.get(key)); + assertNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + tx.commit(); + + assertTrue(map.size() == 0); + + // cache putAll(..)/removeAll(..) check + tx = txs.txStart(txConcurrency, txIsolation); + + cache.putAll(data); + + for (String key: keys) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + cache.removeAll(data.keySet()); + + for (String key: keys) { + assertNull(cacheSkipStore.get(key)); + assertNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + tx.commit(); + + assertTrue(map.size() == 0); + + // putAll(..) from both cacheSkipStore and cache + tx = txs.txStart(txConcurrency, txIsolation); + + Map subMap = new HashMap<>(); + + for (int i = 0; i < keys.size() / 2; i++) + subMap.put(keys.get(i), i); + + cacheSkipStore.putAll(subMap); + + subMap.clear(); + for (int i = keys.size() / 2; i < keys.size(); i++) + subMap.put(keys.get(i), i); + + cache.putAll(subMap); + + for (String key: keys) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + tx.commit(); + + for (int i = 0; i < keys.size() / 2; i++) { + String key = keys.get(i); + + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + for (int i = keys.size() / 2; i < keys.size(); i++) { + String key = keys.get(i); + + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertTrue(map.containsKey(key)); + } + + cache.removeAll(data.keySet()); + + for (String key: keys) { + assertNull(cacheSkipStore.get(key)); + assertNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + // Check that read-through is disabled when cacheSkipStore is used + for (int i = 0; i < keys.size(); i++) + putToStore(keys.get(i), i); + + assertTrue(cacheSkipStore.size(CachePeekMode.ALL) == 0); + assertTrue(cache.size(CachePeekMode.ALL) == 0); + assertTrue(map.size() != 0); + + tx = txs.txStart(txConcurrency, txIsolation); + + assertTrue(cacheSkipStore.getAll(data.keySet()).size() == 0); + + for (String key : keys) { + assertNull(cacheSkipStore.get(key)); + + if (txIsolation == READ_COMMITTED) { + assertNotNull(cache.get(key)); + assertNotNull(cacheSkipStore.get(key)); + } + } + + tx.commit(); + + cache.removeAll(data.keySet()); + checkEmpty(cache, cacheSkipStore); + } + + /** + * @param cache Cache instance. + * @param cacheSkipStore Cache skip store projection. + * + * @throws Exception If failed. + */ + private void checkEmpty(IgniteCache cache, IgniteCache cacheSkipStore) + throws Exception { + assertTrue(cache.size(CachePeekMode.ALL) == 0); + assertTrue(cacheSkipStore.size(CachePeekMode.ALL) == 0); + assertTrue(map.size() == 0); } /**