From 01c02465ff6924842644bf5b3447d324966cc5f9 Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Thu, 30 Jul 2015 13:50:40 +0300 Subject: [PATCH 01/15] ignite-946: introduced VersionedEntry, supported versioned entry for Cache.invoke/randomEntry/localEntries methods --- .../ignite/cache/version/VersionedEntry.java | 73 +++++++++++++++ .../apache/ignite/cache/version/package-info.java | 21 +++++ .../internal/processors/cache/CacheEntryImpl.java | 20 ++++ .../processors/cache/CacheInvokeEntry.java | 24 ++++- .../processors/cache/CacheVersionedEntryImpl.java | 80 ---------------- .../processors/cache/GridCacheAdapter.java | 13 ++- .../processors/cache/GridCacheMapEntry.java | 42 ++++++--- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- .../distributed/dht/atomic/GridDhtAtomicCache.java | 3 +- .../cache/local/atomic/GridLocalAtomicCache.java | 3 +- .../cache/transactions/IgniteTxAdapter.java | 2 +- .../cache/transactions/IgniteTxEntry.java | 3 +- .../cache/transactions/IgniteTxLocalAdapter.java | 3 +- .../cache/version/CacheVersionedEntryImpl.java | 102 +++++++++++++++++++++ .../main/resources/META-INF/classnames.properties | 2 +- 15 files changed, 287 insertions(+), 106 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java create mode 100644 modules/core/src/main/java/org/apache/ignite/cache/version/package-info.java delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryImpl.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java diff --git a/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java new file mode 100644 index 0000000..6f9d8f6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.version; + +import javax.cache.*; +import java.util.*; + +/** + * Cache entry along with version information. + */ +public interface VersionedEntry extends Cache.Entry { + /** + * Versions comparator. + */ + public static final Comparator VERSIONS_COMPARATOR = new Comparator() { + @Override public int compare(VersionedEntry o1, VersionedEntry o2) { + int res = Integer.compare(o1.topologyVersion(), o2.topologyVersion()); + + if (res != 0) + return res; + + res = Long.compare(o1.order(), o2.order()); + + if (res != 0) + return res; + + return Integer.compare(o1.nodeOrder(), o2.nodeOrder()); + } + }; + + /** + * Gets entry's topology version. + * + * @return Topology version plus number of seconds from the start time of the first grid node. + */ + public int topologyVersion(); + + /** + * Gets entry's order. + * + * @return Version order. + */ + public long order(); + + /** + * Gets entry's node order. + * + * @return Node order on which this version was assigned. + */ + public int nodeOrder(); + + /** + * Gets entry's global time. + * + * @return Adjusted time. + */ + public long globalTime(); +} diff --git a/modules/core/src/main/java/org/apache/ignite/cache/version/package-info.java b/modules/core/src/main/java/org/apache/ignite/cache/version/package-info.java new file mode 100644 index 0000000..9aeaba2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/version/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains cache version based implementations. + */ +package org.apache.ignite.cache.version; \ No newline at end of file diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java index 3bd7ef4..98f3616 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.cache.version.*; +import org.apache.ignite.internal.processors.cache.version.*; + import javax.cache.*; import java.io.*; @@ -33,6 +36,9 @@ public class CacheEntryImpl implements Cache.Entry, Externalizable { /** */ private V val; + /** Entry version. */ + private GridCacheVersion ver; + /** * Required by {@link Externalizable}. */ @@ -49,6 +55,17 @@ public class CacheEntryImpl implements Cache.Entry, Externalizable { this.val = val; } + /** + * @param key Key. + * @param val Value. + * @param ver Entry version. + */ + public CacheEntryImpl(K key, V val, GridCacheVersion ver) { + this.key = key; + this.val = val; + this.ver = ver; + } + /** {@inheritDoc} */ @Override public K getKey() { return key; @@ -65,6 +82,9 @@ public class CacheEntryImpl implements Cache.Entry, Externalizable { if(cls.isAssignableFrom(getClass())) return cls.cast(this); + if (ver != null && cls.isAssignableFrom(VersionedEntry.class)) + return (T)new CacheVersionedEntryImpl<>(key, val, ver); + throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java index 2817748..e6f8d4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.cache.version.*; +import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -35,17 +37,23 @@ public class CacheInvokeEntry extends CacheLazyEntry implements Muta /** */ private V oldVal; + /** Entry version. */ + private GridCacheVersion ver; + /** * @param cctx Cache context. * @param keyObj Key cache object. * @param valObj Cache object value. + * @param ver Entry version. */ public CacheInvokeEntry(GridCacheContext cctx, KeyCacheObject keyObj, - @Nullable CacheObject valObj) { + @Nullable CacheObject valObj, + GridCacheVersion ver) { super(cctx, keyObj, valObj); this.hadVal = valObj != null; + this.ver = ver; } /** @@ -54,15 +62,18 @@ public class CacheInvokeEntry extends CacheLazyEntry implements Muta * @param key Key value. * @param valObj Value cache object. * @param val Value. + * @param ver Entry version. */ public CacheInvokeEntry(GridCacheContext ctx, KeyCacheObject keyObj, @Nullable K key, @Nullable CacheObject valObj, - @Nullable V val) { + @Nullable V val, + GridCacheVersion ver) { super(ctx, keyObj, key, valObj, val); this.hadVal = valObj != null || val != null; + this.ver = ver; } /** {@inheritDoc} */ @@ -108,6 +119,15 @@ public class CacheInvokeEntry extends CacheLazyEntry implements Muta } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public T unwrap(Class cls) { + if (cls.isAssignableFrom(VersionedEntry.class)) + return (T)new CacheVersionedEntryImpl<>(getKey(), getValue(), ver); + + return super.unwrap(cls); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheInvokeEntry.class, this); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryImpl.java deleted file mode 100644 index 59394f5..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryImpl.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * - */ -public class CacheVersionedEntryImpl extends CacheEntryImpl { - /** */ - private static final long serialVersionUID = 0L; - - /** Version. */ - private Object ver; - - /** - * Required by {@link Externalizable}. - */ - public CacheVersionedEntryImpl() { - // No-op. - } - - /** - * @param key Key. - * @param val Value (always null). - * @param ver Version. - */ - public CacheVersionedEntryImpl(K key, V val, Object ver) { - super(key, val); - - assert val == null; - - this.ver = ver; - } - - /** - * @return Version. - */ - @Nullable public Object version() { - return ver; - } - - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - out.writeObject(ver); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - - ver = in.readObject(); - } - - /** {@inheritDoc} */ - public String toString() { - return "VersionedEntry [key=" + getKey() + ", val=" + getValue() + ", ver=" + ver + ']'; - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 94bcc93..d125382 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3691,7 +3691,16 @@ public abstract class GridCacheAdapter implements IgniteInternalCache(lazyEntry.getKey(), val); + GridCacheVersion ver = null; + + try { + ver = lazyEntry.unwrap(GridCacheVersion.class); + } + catch (IllegalArgumentException e) { + log.error("Failed to unwrap entry version information", e); + } + + return new CacheEntryImpl<>(lazyEntry.getKey(), val, ver); } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); @@ -4614,7 +4623,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache((K)key0, (V)val0); + return new CacheEntryImpl<>((K)key0, (V)val0, entry.version()); } catch (GridCacheFilterFailedException ignore) { assert false; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index f85a18b..45ff619 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -609,16 +609,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable IgniteCacheExpiryPolicy expirePlc) throws IgniteCheckedException, GridCacheEntryRemovedException { return innerGet0(tx, - readSwap, - readThrough, - evt, - unmarshal, - updateMetrics, - tmp, - subjId, - transformClo, - taskName, - expirePlc); + readSwap, + readThrough, + evt, + unmarshal, + updateMetrics, + tmp, + subjId, + transformClo, + taskName, + expirePlc); } /** {@inheritDoc} */ @@ -1385,7 +1385,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert entryProcessor != null; - CacheInvokeEntry entry = new CacheInvokeEntry<>(cctx, key, old); + CacheInvokeEntry entry = new CacheInvokeEntry<>(cctx, key, old, this.ver); try { Object computed = entryProcessor.process(entry, invokeArgs); @@ -1653,7 +1653,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme oldVal = rawGetOrUnmarshalUnlocked(true); - CacheInvokeEntry entry = new CacheInvokeEntry(cctx, key, oldVal); + CacheInvokeEntry entry = new CacheInvokeEntry(cctx, key, oldVal, this.ver); try { Object computed = entryProcessor.process(entry, invokeArgs); @@ -1878,7 +1878,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme EntryProcessor entryProcessor = (EntryProcessor)writeObj; - CacheInvokeEntry entry = new CacheInvokeEntry(cctx, key, oldVal); + CacheInvokeEntry entry = new CacheInvokeEntry(cctx, key, oldVal, this.ver); try { Object computed = entryProcessor.process(entry, invokeArgs); @@ -3531,7 +3531,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme val = rawGetOrUnmarshal(false); return new CacheEntryImpl<>(key.value(cctx.cacheObjectContext(), false), - CU.value(val, cctx, false)); + CU.value(val, cctx, false), ver); } catch (GridCacheFilterFailedException ignored) { throw new IgniteException("Should never happen."); @@ -3593,6 +3593,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return new CacheVersionedEntryImpl<>(key.value(cctx.cacheObjectContext(), false), null, ver); } + /** + * @return Entry which holds key, value and version. + */ + private synchronized CacheVersionedEntryImpl wrapVersionedWithValue() { + V val = this.val == null ? null : this.val.value(cctx.cacheObjectContext(), false); + + return new CacheVersionedEntryImpl<>(key.value(cctx.cacheObjectContext(), false), val, ver); + } + /** {@inheritDoc} */ @Override public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { @@ -4020,7 +4029,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return (T)wrapEviction(); if (cls.isAssignableFrom(CacheVersionedEntryImpl.class)) - return (T)wrapVersioned(); + return cls == CacheVersionedEntryImpl.class ? (T)wrapVersioned() : (T)wrapVersionedWithValue(); + + if (cls.isAssignableFrom(GridCacheVersion.class)) + return (T)ver; if (cls.isAssignableFrom(GridCacheMapEntry.this.getClass())) return (T)GridCacheMapEntry.this; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index fbc8c84..9bd5de2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -324,7 +324,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture, Object[]> t : txEntry.entryProcessors()) { try { CacheInvokeEntry invokeEntry = - new CacheInvokeEntry<>(txEntry.context(), key, val); + new CacheInvokeEntry<>(txEntry.context(), key, val, txEntry.cached().version()); EntryProcessor processor = t.get1(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 0a21979..5dff4ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1313,7 +1313,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { Object oldVal = null; Object updatedVal = null; - CacheInvokeEntry invokeEntry = new CacheInvokeEntry(ctx, entry.key(), old); + CacheInvokeEntry invokeEntry = new CacheInvokeEntry(ctx, entry.key(), old, + entry.version()); CacheObject updated; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index bcbdec4..8dd3276 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -1057,7 +1057,8 @@ public class GridLocalAtomicCache extends GridCacheAdapter { Object oldVal = null; - CacheInvokeEntry invokeEntry = new CacheInvokeEntry<>(ctx, entry.key(), old); + CacheInvokeEntry invokeEntry = new CacheInvokeEntry<>(ctx, entry.key(), old, + entry.version()); CacheObject updated; Object updatedVal = null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 7190249..0d14012 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1230,7 +1230,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter for (T2, Object[]> t : txEntry.entryProcessors()) { CacheInvokeEntry invokeEntry = new CacheInvokeEntry(txEntry.context(), - txEntry.key(), key, cacheVal, val); + txEntry.key(), key, cacheVal, val, txEntry.cached().version()); try { EntryProcessor processor = t.get1(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 247d350..7f06380 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -573,7 +573,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { for (T2, Object[]> t : entryProcessors()) { try { - CacheInvokeEntry invokeEntry = new CacheInvokeEntry(ctx, key, keyVal, cacheVal, val); + CacheInvokeEntry invokeEntry = new CacheInvokeEntry(ctx, key, keyVal, cacheVal, val, + entry.version()); EntryProcessor processor = t.get1(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 0a61b1a..d8797fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2522,7 +2522,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter for (T2, Object[]> t : txEntry.entryProcessors()) { CacheInvokeEntry invokeEntry = - new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0); + new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0, + txEntry.cached().version()); EntryProcessor entryProcessor = t.get1(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java new file mode 100644 index 0000000..6d1e0c9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.version.*; +import org.apache.ignite.internal.processors.cache.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * + */ +public class CacheVersionedEntryImpl extends CacheEntryImpl implements VersionedEntry { + /** */ + private static final long serialVersionUID = 0L; + + /** Version. */ + private GridCacheVersion ver; + + /** + * Required by {@link Externalizable}. + */ + public CacheVersionedEntryImpl() { + // No-op. + } + + /** + * @param key Key. + * @param val Value (always null). + * @param ver Version. + */ + public CacheVersionedEntryImpl(K key, V val, GridCacheVersion ver) { + super(key, val); + + assert val == null; + + this.ver = ver; + } + + /** + * @return Version. + */ + @Nullable public GridCacheVersion version() { + return ver; + } + + /** {@inheritDoc} */ + @Override public int topologyVersion() { + return ver.topologyVersion(); + } + + /** {@inheritDoc} */ + @Override public int nodeOrder() { + return ver.nodeOrder(); + } + + /** {@inheritDoc} */ + @Override public long order() { + return ver.order(); + } + + /** {@inheritDoc} */ + @Override public long globalTime() { + return ver.globalTime(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeObject(ver); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + ver = (GridCacheVersion)in.readObject(); + } + + /** {@inheritDoc} */ + public String toString() { + return "VersionedEntry [key=" + getKey() + ", val=" + getValue() + ", topVer=" + ver.topologyVersion() + + ", nodeOrder=" + ver.nodeOrder() + ", order=" + ver.order() + ", globalTime=" + ver.globalTime() + ']'; + } +} diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index b3eed46..ff75b02 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -289,7 +289,7 @@ org.apache.ignite.internal.processors.cache.CacheOperationContext org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException org.apache.ignite.internal.processors.cache.CacheType -org.apache.ignite.internal.processors.cache.CacheVersionedEntryImpl +org.apache.ignite.internal.processors.cache.version.CacheVersionedEntryImpl org.apache.ignite.internal.processors.cache.CacheWeakQueryIteratorsHolder$WeakQueryFutureIterator org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest -- 1.9.5.msysgit.0 From 325d06d3127e73554cfcf40a549d2ae9343dca50 Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Thu, 30 Jul 2015 16:03:18 +0300 Subject: [PATCH 02/15] ignite-946: added tests --- .../cache/version/CacheVersionedEntryImpl.java | 2 - .../version/CacheVersionedEntryAbstractTest.java | 184 +++++++++++++++++++++ .../CacheVersionedEntryLocalAtomicSelfTest.java | 40 +++++ ...heVersionedEntryLocalTransactionalSelfTest.java | 40 +++++ ...ionedEntryPartitionedAtomicOffHeapSelfTest.java | 35 ++++ ...cheVersionedEntryPartitionedAtomicSelfTest.java | 35 ++++ ...tryPartitionedTransactionalOffHeapSelfTest.java | 36 ++++ ...ionedEntryPartitionedTransactionalSelfTest.java | 35 ++++ ...sionedEntryReplicatedAtomicOffHeapSelfTest.java | 35 ++++ ...acheVersionedEntryReplicatedAtomicSelfTest.java | 35 ++++ ...ntryReplicatedTransactionalOffHeapSelfTest.java | 36 ++++ ...sionedEntryReplicatedTransactionalSelfTest.java | 35 ++++ .../ignite/testsuites/IgniteCacheTestSuite4.java | 13 ++ 13 files changed, 559 insertions(+), 2 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalAtomicSelfTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalTransactionalSelfTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedAtomicOffHeapSelfTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedAtomicSelfTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedTransactionalOffHeapSelfTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedTransactionalSelfTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedAtomicOffHeapSelfTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedAtomicSelfTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedTransactionalOffHeapSelfTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedTransactionalSelfTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java index 6d1e0c9..924eff9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java @@ -48,8 +48,6 @@ public class CacheVersionedEntryImpl extends CacheEntryImpl implemen public CacheVersionedEntryImpl(K key, V val, GridCacheVersion ver) { super(key, val); - assert val == null; - this.ver = ver; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java new file mode 100644 index 0000000..951d05a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.version.*; +import org.apache.ignite.internal.processors.cache.*; + +import javax.cache.*; +import javax.cache.processor.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Versioned entry abstract test. + */ +public abstract class CacheVersionedEntryAbstractTest extends GridCacheAbstractSelfTest { + /** Entries number to store in a cache. */ + private static final int ENTRIES_NUM = 1000; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + Cache cache = grid(0).cache(null); + + for (int i = 0 ; i < ENTRIES_NUM; i++) + cache.put(i, "value_" + i); + } + + /** + * @throws Exception If failed. + */ + public void testInvoke() throws Exception { + Cache cache = grid(0).cache(null); + + final AtomicBoolean invoked = new AtomicBoolean(false); + + cache.invoke(100, new EntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) + throws EntryProcessorException { + + invoked.set(true); + + VersionedEntry verEntry = entry.unwrap(VersionedEntry.class); + + checkVersionedEntry(verEntry); + + return entry; + } + }); + + assertTrue(invoked.get()); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAll() throws Exception { + Cache cache = grid(0).cache(null); + + Set keys = new HashSet<>(); + + for (int i = 0; i < ENTRIES_NUM; i++) + keys.add(i); + + final AtomicInteger invoked = new AtomicInteger(); + + cache.invokeAll(keys, new EntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) + throws EntryProcessorException { + + invoked.incrementAndGet(); + + VersionedEntry verEntry = entry.unwrap(VersionedEntry.class); + + checkVersionedEntry(verEntry); + + return null; + } + }); + + assert invoked.get() > 0; + } + + /** + * @throws Exception If failed. + */ + public void testRandomEntry() throws Exception { + IgniteCache cache = grid(0).cache(null); + + for (int i = 0; i < 5; i++) + checkVersionedEntry(cache.randomEntry().unwrap(VersionedEntry.class)); + } + + /** + * @throws Exception If failed. + */ + public void testIterator() throws Exception { + IgniteCache cache = grid(0).cache(null); + + Iterator> entries = cache.iterator(); + + while (entries.hasNext()) + checkVersionedEntry(entries.next().unwrap(VersionedEntry.class)); + } + + /** + * @throws Exception If failed. + */ + public void testLocalPeek() throws Exception { + IgniteCache cache = grid(0).cache(null); + + Iterable> entries = offheapTiered(cache) ? + cache.localEntries(CachePeekMode.SWAP, CachePeekMode.OFFHEAP) : + cache.localEntries(CachePeekMode.ONHEAP); + + for (Cache.Entry entry : entries) + checkVersionedEntry(entry.unwrap(VersionedEntry.class)); + } + + /** + * @throws Exception If failed. + */ + public void testVersionComparision() throws Exception { + IgniteCache cache = grid(0).cache(null); + + VersionedEntry ver1 = cache.invoke(100, + new EntryProcessor>() { + @Override public VersionedEntry process(MutableEntry entry, + Object... arguments) throws EntryProcessorException { + return entry.unwrap(VersionedEntry.class); + } + }); + + cache.put(100, "new value 100"); + + VersionedEntry ver2 = cache.invoke(100, + new EntryProcessor>() { + @Override public VersionedEntry process(MutableEntry entry, + Object... arguments) throws EntryProcessorException { + return entry.unwrap(VersionedEntry.class); + } + }); + + assert VersionedEntry.VERSIONS_COMPARATOR.compare(ver1, ver2) < 0; + } + + /** + * @param entry Versioned entry. + */ + private void checkVersionedEntry(VersionedEntry entry) { + assertNotNull(entry); + + assert entry.topologyVersion() > 0; + assert entry.order() > 0; + assert entry.nodeOrder() > 0; + assert entry.globalTime() > 0; + + assertNotNull(entry.getKey()); + assertNotNull(entry.getValue()); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalAtomicSelfTest.java new file mode 100644 index 0000000..a340413 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalAtomicSelfTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.*; + +/** + * + */ +public class CacheVersionedEntryLocalAtomicSelfTest extends CacheVersionedEntryAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.LOCAL; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalTransactionalSelfTest.java new file mode 100644 index 0000000..4833c2c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalTransactionalSelfTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.*; + +/** + * + */ +public class CacheVersionedEntryLocalTransactionalSelfTest extends CacheVersionedEntryAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.LOCAL; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedAtomicOffHeapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedAtomicOffHeapSelfTest.java new file mode 100644 index 0000000..2f33d84 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedAtomicOffHeapSelfTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; + +/** + * + */ +public class CacheVersionedEntryPartitionedAtomicOffHeapSelfTest extends CacheVersionedEntryPartitionedAtomicSelfTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); + + return cfg; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedAtomicSelfTest.java new file mode 100644 index 0000000..f2197ad --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedAtomicSelfTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.*; + +/** + * + */ +public class CacheVersionedEntryPartitionedAtomicSelfTest extends CacheVersionedEntryAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedTransactionalOffHeapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedTransactionalOffHeapSelfTest.java new file mode 100644 index 0000000..7494690 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedTransactionalOffHeapSelfTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; + +/** + * + */ +public class CacheVersionedEntryPartitionedTransactionalOffHeapSelfTest extends + CacheVersionedEntryPartitionedTransactionalSelfTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); + + return cfg; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedTransactionalSelfTest.java new file mode 100644 index 0000000..95a63c8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryPartitionedTransactionalSelfTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.*; + +/** + * + */ +public class CacheVersionedEntryPartitionedTransactionalSelfTest extends CacheVersionedEntryAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedAtomicOffHeapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedAtomicOffHeapSelfTest.java new file mode 100644 index 0000000..dd3fd0c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedAtomicOffHeapSelfTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; + +/** + * + */ +public class CacheVersionedEntryReplicatedAtomicOffHeapSelfTest extends CacheVersionedEntryReplicatedAtomicSelfTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); + + return cfg; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedAtomicSelfTest.java new file mode 100644 index 0000000..fd9617d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedAtomicSelfTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.*; + +/** + * + */ +public class CacheVersionedEntryReplicatedAtomicSelfTest extends CacheVersionedEntryAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedTransactionalOffHeapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedTransactionalOffHeapSelfTest.java new file mode 100644 index 0000000..d1bc5c3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedTransactionalOffHeapSelfTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; + +/** + * + */ +public class CacheVersionedEntryReplicatedTransactionalOffHeapSelfTest extends + CacheVersionedEntryReplicatedTransactionalSelfTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); + + return cfg; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedTransactionalSelfTest.java new file mode 100644 index 0000000..8d37e7b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryReplicatedTransactionalSelfTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.*; + +/** + * + */ +public class CacheVersionedEntryReplicatedTransactionalSelfTest extends CacheVersionedEntryAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 18b2409..dd9c799 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.integration.*; +import org.apache.ignite.internal.processors.cache.version.*; /** * Test suite. @@ -153,6 +154,18 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CacheReadThroughLocalAtomicRestartSelfTest.class); suite.addTestSuite(CacheReadThroughAtomicRestartSelfTest.class); + // Versioned entry tests + suite.addTestSuite(CacheVersionedEntryLocalAtomicSelfTest.class); + suite.addTestSuite(CacheVersionedEntryLocalTransactionalSelfTest.class); + suite.addTestSuite(CacheVersionedEntryPartitionedAtomicSelfTest.class); + suite.addTestSuite(CacheVersionedEntryPartitionedTransactionalSelfTest.class); + suite.addTestSuite(CacheVersionedEntryPartitionedAtomicOffHeapSelfTest.class); + suite.addTestSuite(CacheVersionedEntryPartitionedTransactionalOffHeapSelfTest.class); + suite.addTestSuite(CacheVersionedEntryReplicatedAtomicSelfTest.class); + suite.addTestSuite(CacheVersionedEntryReplicatedTransactionalSelfTest.class); + suite.addTestSuite(CacheVersionedEntryReplicatedAtomicOffHeapSelfTest.class); + suite.addTestSuite(CacheVersionedEntryPartitionedTransactionalOffHeapSelfTest.class); + return suite; } } -- 1.9.5.msysgit.0 From 4b9f4bad4191c357418996abc88748208461f18a Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Fri, 31 Jul 2015 10:19:46 +0300 Subject: [PATCH 03/15] ignite-946: improving tests --- .../cache/query/GridCacheQueryManager.java | 1 + .../version/CacheVersionedEntryAbstractTest.java | 24 ++++++------ .../CacheVersionedEntryLocalAtomicSelfTest.java | 40 ------------------- ...sionedEntryLocalAtomicSwapDisabledSelfTest.java | 45 ++++++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite4.java | 2 +- 5 files changed, 60 insertions(+), 52 deletions(-) delete mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalAtomicSelfTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalAtomicSwapDisabledSelfTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 5d3f6a3..400d997 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -845,6 +845,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte try { val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc); + } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java index 951d05a..4cfacb7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java @@ -55,23 +55,25 @@ public abstract class CacheVersionedEntryAbstractTest extends GridCacheAbstractS public void testInvoke() throws Exception { Cache cache = grid(0).cache(null); - final AtomicBoolean invoked = new AtomicBoolean(false); + final AtomicInteger invoked = new AtomicInteger(); - cache.invoke(100, new EntryProcessor() { - @Override public Object process(MutableEntry entry, Object... arguments) - throws EntryProcessorException { + for (int i = 0; i < ENTRIES_NUM; i++) { + cache.invoke(i, new EntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) + throws EntryProcessorException { - invoked.set(true); + invoked.incrementAndGet(); - VersionedEntry verEntry = entry.unwrap(VersionedEntry.class); + VersionedEntry verEntry = entry.unwrap(VersionedEntry.class); - checkVersionedEntry(verEntry); + checkVersionedEntry(verEntry); - return entry; - } - }); + return entry; + } + }); + } - assertTrue(invoked.get()); + assert invoked.get() > 0; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalAtomicSelfTest.java deleted file mode 100644 index a340413..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalAtomicSelfTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.version; - -import org.apache.ignite.cache.*; - -/** - * - */ -public class CacheVersionedEntryLocalAtomicSelfTest extends CacheVersionedEntryAbstractTest { - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 1; - } - - /** {@inheritDoc} */ - @Override protected CacheMode cacheMode() { - return CacheMode.LOCAL; - } - - /** {@inheritDoc} */ - @Override protected CacheAtomicityMode atomicityMode() { - return CacheAtomicityMode.ATOMIC; - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalAtomicSwapDisabledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalAtomicSwapDisabledSelfTest.java new file mode 100644 index 0000000..b0035d1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryLocalAtomicSwapDisabledSelfTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.*; + +/** + * + */ +public class CacheVersionedEntryLocalAtomicSwapDisabledSelfTest extends CacheVersionedEntryAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.LOCAL; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected boolean swapEnabled() { + return false; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index dd9c799..3ac7879 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -155,7 +155,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CacheReadThroughAtomicRestartSelfTest.class); // Versioned entry tests - suite.addTestSuite(CacheVersionedEntryLocalAtomicSelfTest.class); + suite.addTestSuite(CacheVersionedEntryLocalAtomicSwapDisabledSelfTest.class); suite.addTestSuite(CacheVersionedEntryLocalTransactionalSelfTest.class); suite.addTestSuite(CacheVersionedEntryPartitionedAtomicSelfTest.class); suite.addTestSuite(CacheVersionedEntryPartitionedTransactionalSelfTest.class); -- 1.9.5.msysgit.0 From 719161f2d02dd1c589155800f60e02f86c469de4 Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Fri, 31 Jul 2015 10:20:22 +0300 Subject: [PATCH 04/15] ignite-946: improving tests --- .../ignite/internal/processors/cache/query/GridCacheQueryManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 400d997..5d3f6a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -845,7 +845,6 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte try { val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc); - } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) -- 1.9.5.msysgit.0 From 2d200a31b9903a165c9d70ec84b687e7bcc55c44 Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Fri, 31 Jul 2015 11:39:47 +0300 Subject: [PATCH 05/15] ignite-946: supported VersionedEntry for IgniteCache.localEntries() when OFF_HEAP mode is used --- .../internal/processors/cache/CacheEntryImpl0.java | 5 ++++ .../processors/cache/GridCacheSwapManager.java | 8 ++++- .../cache/version/GridVersionedMapEntry.java | 33 +++++++++++++++++++++ .../version/CacheVersionedEntryAbstractTest.java | 34 +++++++--------------- 4 files changed, 55 insertions(+), 25 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridVersionedMapEntry.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java index d2b1923..a5e27d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.cache.version.*; +import org.apache.ignite.internal.processors.cache.version.*; + import javax.cache.*; import java.util.*; @@ -49,6 +52,8 @@ public class CacheEntryImpl0 implements Cache.Entry { @Override public T unwrap(Class cls) { if(cls.isAssignableFrom(getClass())) return cls.cast(this); + else if (cls.isAssignableFrom(VersionedEntry.class) && e instanceof GridVersionedMapEntry) + return (T)new CacheVersionedEntryImpl<>(e.getKey(), e.getValue(), ((GridVersionedMapEntry)e).version()); throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 9e9c958..0530c19 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1513,7 +1513,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { @Override protected Map.Entry onNext() { final Map.Entry cur0 = it.next(); - cur = new Map.Entry() { + cur = new GridVersionedMapEntry() { @Override public K getKey() { try { KeyCacheObject key = cctx.toCacheKeyObject(cur0.getKey()); @@ -1538,6 +1538,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } } + @Override public GridCacheVersion version() { + GridCacheSwapEntry e = unmarshalSwapEntry(cur0.getValue()); + + return e.version(); + } + @Override public V setValue(V val) { throw new UnsupportedOperationException(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridVersionedMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridVersionedMapEntry.java new file mode 100644 index 0000000..f653fac --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridVersionedMapEntry.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import java.util.*; + +/** + * This interface extends {@link java.util.Map.Entry} by adding the method that returns entry's + * {@link GridCacheVersion}. + */ +public interface GridVersionedMapEntry extends Map.Entry { + /** + * Gets entry version. + * + * @return Entry version. + */ + public GridCacheVersion version(); +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java index 4cfacb7..25a2a42 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java @@ -32,7 +32,7 @@ import java.util.concurrent.atomic.*; */ public abstract class CacheVersionedEntryAbstractTest extends GridCacheAbstractSelfTest { /** Entries number to store in a cache. */ - private static final int ENTRIES_NUM = 1000; + private static final int ENTRIES_NUM = 500; /** {@inheritDoc} */ @Override protected int gridCount() { @@ -57,21 +57,19 @@ public abstract class CacheVersionedEntryAbstractTest extends GridCacheAbstractS final AtomicInteger invoked = new AtomicInteger(); - for (int i = 0; i < ENTRIES_NUM; i++) { - cache.invoke(i, new EntryProcessor() { - @Override public Object process(MutableEntry entry, Object... arguments) - throws EntryProcessorException { + cache.invoke(100, new EntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) + throws EntryProcessorException { - invoked.incrementAndGet(); + invoked.incrementAndGet(); - VersionedEntry verEntry = entry.unwrap(VersionedEntry.class); + VersionedEntry verEntry = entry.unwrap(VersionedEntry.class); - checkVersionedEntry(verEntry); + checkVersionedEntry(verEntry); - return entry; - } - }); - } + return entry; + } + }); assert invoked.get() > 0; } @@ -119,18 +117,6 @@ public abstract class CacheVersionedEntryAbstractTest extends GridCacheAbstractS /** * @throws Exception If failed. */ - public void testIterator() throws Exception { - IgniteCache cache = grid(0).cache(null); - - Iterator> entries = cache.iterator(); - - while (entries.hasNext()) - checkVersionedEntry(entries.next().unwrap(VersionedEntry.class)); - } - - /** - * @throws Exception If failed. - */ public void testLocalPeek() throws Exception { IgniteCache cache = grid(0).cache(null); -- 1.9.5.msysgit.0 From 4928d99d80a837843ce733eab546a3ea6aa3c2d3 Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Fri, 31 Jul 2015 12:14:54 +0300 Subject: [PATCH 06/15] ignite-946: added documentation --- .../ignite/cache/version/VersionedEntry.java | 31 +++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java index 6f9d8f6..e669f15 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java @@ -17,11 +17,40 @@ package org.apache.ignite.cache.version; +import org.apache.ignite.*; + import javax.cache.*; +import javax.cache.processor.*; import java.util.*; /** - * Cache entry along with version information. + * Cache entry that stores entry's version related information. + * + * To get a {@code VersionedEntry} of an {@link Cache.Entry} use {@link Cache.Entry#unwrap(Class)} by passing + * {@code VersionedEntry} class to it as the argument. + *

+ * {@code VersionedEntry} is supported only for {@link Cache.Entry} returned by one of the following methods: + *

    + *
  • {@link Cache#invoke(Object, EntryProcessor, Object...)}
  • + *
  • {@link Cache#invokeAll(Set, EntryProcessor, Object...)}
  • + *
  • invoke and invokeAll methods of {@link IgniteCache}
  • + *
  • {@link IgniteCache#randomEntry()}
  • + *
+ *

+ * {@code VersionedEntry} is not supported for {@link Cache#iterator()} because of performance reasons. + * {@link Cache#iterator()} loads entries from all the cluster nodes and to speed up the load version information + * is excluded from responses. + *

Java Example

+ *
+ * Cache cache = grid(0).cache(null);
+ *
+ *  cache.invoke(100, new EntryProcessor() {
+ *      public Object process(MutableEntry entry, Object... arguments) throws EntryProcessorException {
+ *          VersionedEntry verEntry = entry.unwrap(VersionedEntry.class);
+ *          return entry;
+ *       }
+ *   });
+ * 
*/ public interface VersionedEntry extends Cache.Entry { /** -- 1.9.5.msysgit.0 From 26144dc4f845af7533cc645d898202b80c0a29f6 Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Fri, 31 Jul 2015 12:18:58 +0300 Subject: [PATCH 07/15] ignite-946: formatting fixes --- .../internal/processors/cache/GridCacheMapEntry.java | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 45ff619..ebcb908 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -609,16 +609,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable IgniteCacheExpiryPolicy expirePlc) throws IgniteCheckedException, GridCacheEntryRemovedException { return innerGet0(tx, - readSwap, - readThrough, - evt, - unmarshal, - updateMetrics, - tmp, - subjId, - transformClo, - taskName, - expirePlc); + readSwap, + readThrough, + evt, + unmarshal, + updateMetrics, + tmp, + subjId, + transformClo, + taskName, + expirePlc); } /** {@inheritDoc} */ -- 1.9.5.msysgit.0 From f0fe0769751b7957a64fc5dd56989e54f1223ab1 Mon Sep 17 00:00:00 2001 From: nikolay_tikhonov Date: Fri, 31 Jul 2015 13:00:09 +0300 Subject: [PATCH 08/15] IGNITE-946 Added test to Suite. --- .../test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 3ac7879..228be92 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -164,7 +164,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CacheVersionedEntryReplicatedAtomicSelfTest.class); suite.addTestSuite(CacheVersionedEntryReplicatedTransactionalSelfTest.class); suite.addTestSuite(CacheVersionedEntryReplicatedAtomicOffHeapSelfTest.class); - suite.addTestSuite(CacheVersionedEntryPartitionedTransactionalOffHeapSelfTest.class); + suite.addTestSuite(CacheVersionedEntryReplicatedTransactionalOffHeapSelfTest.class); return suite; } -- 1.9.5.msysgit.0 From 2e7799d446653bba379cc231628ba2b02c993e5e Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Fri, 31 Jul 2015 15:32:45 +0300 Subject: [PATCH 09/15] ignite-946: fixing version retrieval for transactions --- .../processors/cache/CacheInvokeEntry.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 4 +- .../cache/transactions/IgniteTxAdapter.java | 14 +++- .../cache/transactions/IgniteTxEntry.java | 11 ++- .../cache/transactions/IgniteTxLocalAdapter.java | 90 +++++++++++++++++----- 5 files changed, 98 insertions(+), 23 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java index e6f8d4e..2d8f738 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java @@ -121,7 +121,7 @@ public class CacheInvokeEntry extends CacheLazyEntry implements Muta /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public T unwrap(Class cls) { - if (cls.isAssignableFrom(VersionedEntry.class)) + if (cls.isAssignableFrom(VersionedEntry.class) && ver != null) return (T)new CacheVersionedEntryImpl<>(getKey(), getValue(), ver); return super.unwrap(cls); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index ebcb908..43cf2fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1653,7 +1653,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme oldVal = rawGetOrUnmarshalUnlocked(true); - CacheInvokeEntry entry = new CacheInvokeEntry(cctx, key, oldVal, this.ver); + CacheInvokeEntry entry = new CacheInvokeEntry(cctx, key, oldVal, version()); try { Object computed = entryProcessor.process(entry, invokeArgs); @@ -1878,7 +1878,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme EntryProcessor entryProcessor = (EntryProcessor)writeObj; - CacheInvokeEntry entry = new CacheInvokeEntry(cctx, key, oldVal, this.ver); + CacheInvokeEntry entry = new CacheInvokeEntry(cctx, key, oldVal, version()); try { Object computed = entryProcessor.process(entry, invokeArgs); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 0d14012..797f75e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1228,9 +1228,21 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter Object key = null; + GridCacheVersion ver; + + try { + ver = txEntry.cached().version(); + } + catch (GridCacheEntryRemovedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); + + ver = null; + } + for (T2, Object[]> t : txEntry.entryProcessors()) { CacheInvokeEntry invokeEntry = new CacheInvokeEntry(txEntry.context(), - txEntry.key(), key, cacheVal, val, txEntry.cached().version()); + txEntry.key(), key, cacheVal, val, ver); try { EntryProcessor processor = t.get1(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 7f06380..ed57bf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -571,10 +571,19 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { Object val = null; Object keyVal = null; + GridCacheVersion ver; + + try { + ver = entry.version(); + } + catch (GridCacheEntryRemovedException e) { + ver = null; + } + for (T2, Object[]> t : entryProcessors()) { try { CacheInvokeEntry invokeEntry = new CacheInvokeEntry(ctx, key, keyVal, cacheVal, val, - entry.version()); + ver); EntryProcessor processor = t.get1(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index d8797fe..7f171c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1938,13 +1938,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter Map drMap ) { return this.putAllAsync0(cacheCtx, - null, - null, - null, - drMap, - false, - null, - null); + null, + null, + null, + drMap, + false, + null, + null); } /** {@inheritDoc} */ @@ -2229,8 +2229,22 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (retval && !transform) ret.set(cacheCtx, old, true); else { - if (txEntry.op() == TRANSFORM) - addInvokeResult(txEntry, old, ret); + if (txEntry.op() == TRANSFORM) { + GridCacheVersion ver; + + try { + ver = entry.version(); + } + catch (GridCacheEntryRemovedException ex) { + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + + ex.getMessage() + ']'); + + ver = null; + } + + addInvokeResult(txEntry, old, ret, ver); + } else ret.success(true); } @@ -2290,8 +2304,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter enlisted.add(cacheKey); - if (txEntry.op() == TRANSFORM) - addInvokeResult(txEntry, txEntry.value(), ret); + if (txEntry.op() == TRANSFORM) { + GridCacheVersion ver; + + try { + ver = entry.version(); + } + catch (GridCacheEntryRemovedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); + + ver = null; + } + + addInvokeResult(txEntry, txEntry.value(), ret, ver); + } } if (!pessimistic()) { @@ -2328,8 +2355,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CacheObject cacheVal = cacheCtx.toCacheObject(val); - if (e.op() == TRANSFORM) - addInvokeResult(e, cacheVal, ret); + if (e.op() == TRANSFORM) { + GridCacheVersion ver; + + try { + ver = e.cached().version(); + } + catch (GridCacheEntryRemovedException ex) { + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); + + ver = null; + } + + addInvokeResult(e, cacheVal, ret, ver); + } else ret.set(cacheCtx, cacheVal, true); } @@ -2442,8 +2482,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } if (txEntry.op() == TRANSFORM) { - if (computeInvoke) - addInvokeResult(txEntry, v, ret); + if (computeInvoke) { + GridCacheVersion ver; + + try { + ver = cached.version(); + } + catch (GridCacheEntryRemovedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); + + ver = null; + } + + addInvokeResult(txEntry, v, ret, ver); + } } else ret.value(cacheCtx, v); @@ -2510,8 +2563,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param txEntry Entry. * @param cacheVal Value. * @param ret Return value to update. + * @param ver Entry version. */ - private void addInvokeResult(IgniteTxEntry txEntry, CacheObject cacheVal, GridCacheReturn ret) { + private void addInvokeResult(IgniteTxEntry txEntry, CacheObject cacheVal, GridCacheReturn ret, + GridCacheVersion ver) { GridCacheContext ctx = txEntry.context(); Object key0 = null; @@ -2522,8 +2577,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter for (T2, Object[]> t : txEntry.entryProcessors()) { CacheInvokeEntry invokeEntry = - new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0, - txEntry.cached().version()); + new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0, ver); EntryProcessor entryProcessor = t.get1(); -- 1.9.5.msysgit.0 From fe2be79c8e4eccf8f823abeb32521f0fca8ddf42 Mon Sep 17 00:00:00 2001 From: Yakov Zhdanov Date: Tue, 4 Aug 2015 17:49:48 +0300 Subject: [PATCH 10/15] review --- .../apache/ignite/cache/version/package-info.java | 4 ++-- .../internal/processors/cache/CacheInvokeEntry.java | 5 +++-- .../processors/cache/transactions/IgniteTxEntry.java | 4 +++- .../cache/transactions/IgniteTxLocalAdapter.java | 20 ++++++++++---------- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/cache/version/package-info.java b/modules/core/src/main/java/org/apache/ignite/cache/version/package-info.java index 9aeaba2..50ceb13 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/version/package-info.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/version/package-info.java @@ -16,6 +16,6 @@ */ /** - * Contains cache version based implementations. + * Contains cache versioned entry interface. */ -package org.apache.ignite.cache.version; \ No newline at end of file +package org.apache.ignite.cache.version; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java index 2d8f738..515a4c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java @@ -49,14 +49,15 @@ public class CacheInvokeEntry extends CacheLazyEntry implements Muta public CacheInvokeEntry(GridCacheContext cctx, KeyCacheObject keyObj, @Nullable CacheObject valObj, - GridCacheVersion ver) { + GridCacheVersion ver + ) { super(cctx, keyObj, valObj); this.hadVal = valObj != null; this.ver = ver; } - /** + /** * @param ctx Cache context. * @param keyObj Key cache object. * @param key Key value. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index ed57bf2..73b9975 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -577,6 +577,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { ver = entry.version(); } catch (GridCacheEntryRemovedException e) { + assert tx.optimistic() : tx; + ver = null; } @@ -924,7 +926,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { return false; reader.incrementState(); - + case 6: flags = reader.readByte("flags"); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 7f171c2..e03f34d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1937,14 +1937,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter GridCacheContext cacheCtx, Map drMap ) { - return this.putAllAsync0(cacheCtx, - null, - null, - null, - drMap, - false, - null, - null); + return putAllAsync0(cacheCtx, + null, + null, + null, + drMap, + false, + null, + null); } /** {@inheritDoc} */ @@ -2237,8 +2237,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } catch (GridCacheEntryRemovedException ex) { if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + - ex.getMessage() + ']'); + log.debug("Failed to get entry version " + + "[err=" + ex.getMessage() + ']'); ver = null; } -- 1.9.5.msysgit.0 From 53557e327721fee683b9b37217d530db44166aec Mon Sep 17 00:00:00 2001 From: Yakov Zhdanov Date: Wed, 5 Aug 2015 10:15:52 +0300 Subject: [PATCH 11/15] review --- .../internal/processors/cache/transactions/IgniteTxLocalAdapter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index e03f34d..a209780 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1937,7 +1937,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter GridCacheContext cacheCtx, Map drMap ) { - return putAllAsync0(cacheCtx, + return this.putAllAsync0(cacheCtx, null, null, null, -- 1.9.5.msysgit.0 From 911ffbb5de261d662a915677b99cae4a160407eb Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Mon, 10 Aug 2015 11:39:44 +0300 Subject: [PATCH 12/15] ignite-946: fixed review notes --- .../ignite/cache/version/VersionedEntry.java | 2 +- .../internal/processors/cache/CacheEntryImpl0.java | 4 +- .../processors/cache/GridCacheMapEntry.java | 2 +- .../processors/cache/GridCacheSwapManager.java | 88 +++++++++++++--------- .../cache/transactions/IgniteTxAdapter.java | 2 + .../cache/transactions/IgniteTxEntry.java | 2 +- .../cache/transactions/IgniteTxLocalAdapter.java | 8 ++ .../cache/version/GridCacheVersionAware.java | 30 ++++++++ .../cache/version/GridVersionedMapEntry.java | 33 -------- .../version/CacheVersionedEntryAbstractTest.java | 2 +- 10 files changed, 99 insertions(+), 74 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionAware.java delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridVersionedMapEntry.java diff --git a/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java index e669f15..2c0879b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java @@ -56,7 +56,7 @@ public interface VersionedEntry extends Cache.Entry { /** * Versions comparator. */ - public static final Comparator VERSIONS_COMPARATOR = new Comparator() { + public static final Comparator VER_COMP = new Comparator() { @Override public int compare(VersionedEntry o1, VersionedEntry o2) { int res = Integer.compare(o1.topologyVersion(), o2.topologyVersion()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java index a5e27d6..987fbd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java @@ -52,8 +52,8 @@ public class CacheEntryImpl0 implements Cache.Entry { @Override public T unwrap(Class cls) { if(cls.isAssignableFrom(getClass())) return cls.cast(this); - else if (cls.isAssignableFrom(VersionedEntry.class) && e instanceof GridVersionedMapEntry) - return (T)new CacheVersionedEntryImpl<>(e.getKey(), e.getValue(), ((GridVersionedMapEntry)e).version()); + else if (cls.isAssignableFrom(VersionedEntry.class) && e instanceof GridCacheVersionAware) + return (T)new CacheVersionedEntryImpl<>(e.getKey(), e.getValue(), ((GridCacheVersionAware)e).version()); throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 43cf2fe..298f7a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1385,7 +1385,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert entryProcessor != null; - CacheInvokeEntry entry = new CacheInvokeEntry<>(cctx, key, old, this.ver); + CacheInvokeEntry entry = new CacheInvokeEntry<>(cctx, key, old, version()); try { Object computed = entryProcessor.process(entry, invokeArgs); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index ffe6169..ea9b0fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1513,41 +1513,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { @Override protected Map.Entry onNext() { final Map.Entry cur0 = it.next(); - cur = new GridVersionedMapEntry() { - @Override public K getKey() { - try { - KeyCacheObject key = cctx.toCacheKeyObject(cur0.getKey()); - - return key.value(cctx.cacheObjectContext(), false); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - @Override public V getValue() { - try { - GridCacheSwapEntry e = unmarshalSwapEntry(cur0.getValue()); - - swapEntry(e); - - return e.value().value(cctx.cacheObjectContext(), false); - } - catch (IgniteCheckedException ex) { - throw new IgniteException(ex); - } - } - - @Override public GridCacheVersion version() { - GridCacheSwapEntry e = unmarshalSwapEntry(cur0.getValue()); - - return e.version(); - } - - @Override public V setValue(V val) { - throw new UnsupportedOperationException(); - } - }; + cur = new GridVersionedMapEntry(cur0); return cur; } @@ -2330,4 +2296,56 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { */ abstract protected GridCloseableIterator partitionIterator(int part) throws IgniteCheckedException; } + + private class GridVersionedMapEntry implements Map.Entry, GridCacheVersionAware { + /** */ + private Map.Entry entry; + + /** + * Constructor. + * + * @param entry Entry. + */ + public GridVersionedMapEntry(Map.Entry entry) { + this.entry = entry; + } + + /** {@inheritDoc} */ + @Override public K getKey() { + try { + KeyCacheObject key = cctx.toCacheKeyObject(entry.getKey()); + + return key.value(cctx.cacheObjectContext(), false); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public V getValue() { + try { + GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue()); + + swapEntry(e); + + return e.value().value(cctx.cacheObjectContext(), false); + } + catch (IgniteCheckedException ex) { + throw new IgniteException(ex); + } + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue()); + + return e.version(); + } + + /** {@inheritDoc} */ + @Override public V setValue(V val) { + throw new UnsupportedOperationException(); + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 797f75e..e9fdd22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1234,6 +1234,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter ver = txEntry.cached().version(); } catch (GridCacheEntryRemovedException e) { + assert optimistic() : txEntry; + if (log.isDebugEnabled()) log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 73b9975..3c792f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -577,7 +577,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { ver = entry.version(); } catch (GridCacheEntryRemovedException e) { - assert tx.optimistic() : tx; + assert tx == null || tx.optimistic() : tx; ver = null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index a209780..b354fed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2236,6 +2236,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter ver = entry.version(); } catch (GridCacheEntryRemovedException ex) { + assert optimistic() : txEntry; + if (log.isDebugEnabled()) log.debug("Failed to get entry version " + "[err=" + ex.getMessage() + ']'); @@ -2311,6 +2313,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter ver = entry.version(); } catch (GridCacheEntryRemovedException e) { + assert optimistic() : txEntry; + if (log.isDebugEnabled()) log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); @@ -2362,6 +2366,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter ver = e.cached().version(); } catch (GridCacheEntryRemovedException ex) { + assert optimistic() : e; + if (log.isDebugEnabled()) log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); @@ -2489,6 +2495,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter ver = cached.version(); } catch (GridCacheEntryRemovedException e) { + assert optimistic() : txEntry; + if (log.isDebugEnabled()) log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionAware.java new file mode 100644 index 0000000..0c0a270 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionAware.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +/** + * Interface implemented by classes that holds version related information. + */ +public interface GridCacheVersionAware { + /** + * Gets version. + * + * @return Cache entry version. + */ + public GridCacheVersion version(); +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridVersionedMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridVersionedMapEntry.java deleted file mode 100644 index f653fac..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridVersionedMapEntry.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.version; - -import java.util.*; - -/** - * This interface extends {@link java.util.Map.Entry} by adding the method that returns entry's - * {@link GridCacheVersion}. - */ -public interface GridVersionedMapEntry extends Map.Entry { - /** - * Gets entry version. - * - * @return Entry version. - */ - public GridCacheVersion version(); -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java index 25a2a42..b121995 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java @@ -152,7 +152,7 @@ public abstract class CacheVersionedEntryAbstractTest extends GridCacheAbstractS } }); - assert VersionedEntry.VERSIONS_COMPARATOR.compare(ver1, ver2) < 0; + assert VersionedEntry.VER_COMP.compare(ver1, ver2) < 0; } /** -- 1.9.5.msysgit.0 From c1582fc32884cdf9494df32fb33207d64ed1230f Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Mon, 10 Aug 2015 12:44:31 +0300 Subject: [PATCH 13/15] ignite-946: fixed comments and bugs --- .../org/apache/ignite/cache/version/VersionedEntry.java | 17 +++++++++-------- .../cache/version/CacheVersionedEntryImpl.java | 4 ++-- .../processors/clock/GridClockSyncProcessor.java | 2 +- .../cache/version/CacheVersionedEntryAbstractTest.java | 2 +- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java index 2c0879b..1aac68a 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java @@ -73,30 +73,31 @@ public interface VersionedEntry extends Cache.Entry { }; /** - * Gets entry's topology version. + * Gets the topology version at the time when the entry with a given pair of key and value has been created. * * @return Topology version plus number of seconds from the start time of the first grid node. */ public int topologyVersion(); /** - * Gets entry's order. + * Gets versioned entry unique order. + * Each time a cache entry for a given key is updated a new {@code VersionedEntry} with increased order is created. * - * @return Version order. + * @return Versioned entry unique order. */ public long order(); /** - * Gets entry's node order. + * Gets local node order at the time when the entry with a given pair of key and value has been created. * - * @return Node order on which this version was assigned. + * @return Local node order on which this version has been assigned. */ public int nodeOrder(); /** - * Gets entry's global time. + * Gets the time when the entry with a given pair of key and value has been created. * - * @return Adjusted time. + * @return Time in milliseconds. */ - public long globalTime(); + public long creationTime(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java index 924eff9..74e4a9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java @@ -74,7 +74,7 @@ public class CacheVersionedEntryImpl extends CacheEntryImpl implemen } /** {@inheritDoc} */ - @Override public long globalTime() { + @Override public long creationTime() { return ver.globalTime(); } @@ -95,6 +95,6 @@ public class CacheVersionedEntryImpl extends CacheEntryImpl implemen /** {@inheritDoc} */ public String toString() { return "VersionedEntry [key=" + getKey() + ", val=" + getValue() + ", topVer=" + ver.topologyVersion() + - ", nodeOrder=" + ver.nodeOrder() + ", order=" + ver.order() + ", globalTime=" + ver.globalTime() + ']'; + ", nodeOrder=" + ver.nodeOrder() + ", order=" + ver.order() + ", creationTime=" + ver.globalTime() + ']'; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java index 3ac44f2..69b07b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java @@ -264,7 +264,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter { long now = clockSrc.currentTimeMillis(); if (snap == null) - return System.currentTimeMillis(); + return now; Long delta = snap.deltas().get(ctx.localNodeId()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java index b121995..0ad8038 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java @@ -164,7 +164,7 @@ public abstract class CacheVersionedEntryAbstractTest extends GridCacheAbstractS assert entry.topologyVersion() > 0; assert entry.order() > 0; assert entry.nodeOrder() > 0; - assert entry.globalTime() > 0; + assert entry.creationTime() > 0; assertNotNull(entry.getKey()); assertNotNull(entry.getValue()); -- 1.9.5.msysgit.0 From dd3cecf6a68deec93c084af2f7d7e1be9de9f877 Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Tue, 11 Aug 2015 09:02:00 +0300 Subject: [PATCH 14/15] ignite-946: renamed topologyVersion to topologyOrder for GridCacheVersion --- .../cache/GridCacheAtomicVersionComparator.java | 4 ++-- .../processors/cache/GridCacheMapEntry.java | 2 +- .../internal/processors/cache/GridCacheUtils.java | 2 +- .../cache/transactions/IgniteTxManager.java | 2 +- .../cache/version/CacheVersionedEntryImpl.java | 4 ++-- .../version/GridCachePlainVersionedEntry.java | 2 +- .../cache/version/GridCacheRawVersionedEntry.java | 2 +- .../processors/cache/version/GridCacheVersion.java | 28 +++++++++++----------- .../cache/version/GridCacheVersionManager.java | 5 ++-- .../apache/ignite/internal/util/IgniteUtils.java | 4 ++-- .../cache/GridCacheEntryVersionSelfTest.java | 4 ++-- 11 files changed, 29 insertions(+), 30 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicVersionComparator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicVersionComparator.java index 3a06100..45288d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicVersionComparator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicVersionComparator.java @@ -32,8 +32,8 @@ public class GridCacheAtomicVersionComparator { * @return Comparison value. */ public int compare(GridCacheVersion one, GridCacheVersion other, boolean ignoreTime) { - int topVer = one.topologyVersion(); - int otherTopVer = other.topologyVersion(); + int topVer = one.topologyOrder(); + int otherTopVer = other.topologyOrder(); if (topVer == otherTopVer) { long globalTime = one.globalTime(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 298f7a6..33c42c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1927,7 +1927,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Incorporate conflict version into new version if needed. if (conflictVer != null && conflictVer != newVer) - newVer = new GridCacheVersionEx(newVer.topologyVersion(), + newVer = new GridCacheVersionEx(newVer.topologyOrder(), newVer.globalTime(), newVer.order(), newVer.nodeOrder(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index a313e3d..75f1dee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1060,7 +1060,7 @@ public class GridCacheUtils { byte[] bytes = new byte[28]; - U.intToBytes(ver.topologyVersion(), bytes, 0); + U.intToBytes(ver.topologyOrder(), bytes, 0); U.longToBytes(ver.globalTime(), bytes, 4); U.longToBytes(ver.order(), bytes, 12); U.intToBytes(ver.nodeOrderAndDrIdRaw(), bytes, 20); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 630330e..446002e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -2098,7 +2098,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param nearVer Near transaction version. */ private CommittedVersion(GridCacheVersion ver, GridCacheVersion nearVer) { - super(ver.topologyVersion(), ver.globalTime(), ver.order(), ver.nodeOrder(), ver.dataCenterId()); + super(ver.topologyOrder(), ver.globalTime(), ver.order(), ver.nodeOrder(), ver.dataCenterId()); assert nearVer != null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java index 74e4a9a..22b699a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java @@ -60,7 +60,7 @@ public class CacheVersionedEntryImpl extends CacheEntryImpl implemen /** {@inheritDoc} */ @Override public int topologyVersion() { - return ver.topologyVersion(); + return ver.topologyOrder(); } /** {@inheritDoc} */ @@ -94,7 +94,7 @@ public class CacheVersionedEntryImpl extends CacheEntryImpl implemen /** {@inheritDoc} */ public String toString() { - return "VersionedEntry [key=" + getKey() + ", val=" + getValue() + ", topVer=" + ver.topologyVersion() + + return "VersionedEntry [key=" + getKey() + ", val=" + getValue() + ", topVer=" + ver.topologyOrder() + ", nodeOrder=" + ver.nodeOrder() + ", order=" + ver.order() + ", creationTime=" + ver.globalTime() + ']'; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java index 4f13ae7..102805c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java @@ -101,7 +101,7 @@ public class GridCachePlainVersionedEntry implements GridCacheVersionedEnt /** {@inheritDoc} */ @Override public int topologyVersion() { - return ver.topologyVersion(); + return ver.topologyOrder(); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java index 87fe976..f8d33f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java @@ -155,7 +155,7 @@ public class GridCacheRawVersionedEntry extends DataStreamerEntry implemen /** {@inheritDoc} */ @Override public int topologyVersion() { - return ver.topologyVersion(); + return ver.topologyOrder(); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java index 64eef99..7adfa2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java @@ -42,7 +42,7 @@ public class GridCacheVersion implements Message, Comparable, private static final int DR_ID_MASK = 0x1F; /** Topology version. */ - private int topVer; + private int topOrder; /** Node order (used as global order) and DR ID. */ private int nodeOrderDrId; @@ -76,7 +76,7 @@ public class GridCacheVersion implements Message, Comparable, if (nodeOrder > NODE_ORDER_MASK) throw new IllegalArgumentException("Node order overflow: " + nodeOrder); - this.topVer = topVer; + this.topOrder = topVer; this.globalTime = globalTime; this.order = order; @@ -85,13 +85,13 @@ public class GridCacheVersion implements Message, Comparable, /** - * @param topVer Topology version. + * @param topVer Topology version plus number of seconds from the start time of the first grid node. * @param nodeOrderDrId Node order and DR ID. * @param globalTime Globally adjusted time. * @param order Version order. */ public GridCacheVersion(int topVer, int nodeOrderDrId, long globalTime, long order) { - this.topVer = topVer; + this.topOrder = topVer; this.nodeOrderDrId = nodeOrderDrId; this.globalTime = globalTime; this.order = order; @@ -100,8 +100,8 @@ public class GridCacheVersion implements Message, Comparable, /** * @return Topology version plus number of seconds from the start time of the first grid node.. */ - public int topologyVersion() { - return topVer; + public int topologyOrder() { + return topOrder; } /** @@ -184,12 +184,12 @@ public class GridCacheVersion implements Message, Comparable, * @return Version represented as {@code GridUuid} */ public IgniteUuid asGridUuid() { - return new IgniteUuid(new UUID(((long)topVer << 32) | nodeOrderDrId, globalTime), order); + return new IgniteUuid(new UUID(((long)topOrder << 32) | nodeOrderDrId, globalTime), order); } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(topVer); + out.writeInt(topOrder); out.writeLong(globalTime); out.writeLong(order); out.writeInt(nodeOrderDrId); @@ -197,7 +197,7 @@ public class GridCacheVersion implements Message, Comparable, /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException { - topVer = in.readInt(); + topOrder = in.readInt(); globalTime = in.readLong(); order = in.readLong(); nodeOrderDrId = in.readInt(); @@ -213,12 +213,12 @@ public class GridCacheVersion implements Message, Comparable, GridCacheVersion that = (GridCacheVersion)o; - return topVer == that.topVer && order == that.order && nodeOrder() == that.nodeOrder(); + return topOrder == that.topOrder && order == that.order && nodeOrder() == that.nodeOrder(); } /** {@inheritDoc} */ @Override public int hashCode() { - int res = topVer; + int res = topOrder; res = 31 * res + nodeOrder(); @@ -230,7 +230,7 @@ public class GridCacheVersion implements Message, Comparable, /** {@inheritDoc} */ @SuppressWarnings("IfMayBeConditional") @Override public int compareTo(GridCacheVersion other) { - int res = Integer.compare(topologyVersion(), other.topologyVersion()); + int res = Integer.compare(topologyOrder(), other.topologyOrder()); if (res != 0) return res; @@ -274,7 +274,7 @@ public class GridCacheVersion implements Message, Comparable, writer.incrementState(); case 3: - if (!writer.writeInt("topVer", topVer)) + if (!writer.writeInt("topVer", topOrder)) return false; writer.incrementState(); @@ -317,7 +317,7 @@ public class GridCacheVersion implements Message, Comparable, reader.incrementState(); case 3: - topVer = reader.readInt("topVer"); + topOrder = reader.readInt("topVer"); if (!reader.isLastRead()) return false; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index 90919c7..d465a0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -24,7 +24,6 @@ import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -197,7 +196,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version for cache store operations. */ public GridCacheVersion nextForLoad(GridCacheVersion ver) { - return next(ver.topologyVersion(), false, true); + return next(ver.topologyOrder(), false, true); } /** @@ -207,7 +206,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version based on given cache version. */ public GridCacheVersion next(GridCacheVersion ver) { - return next(ver.topologyVersion(), false, false); + return next(ver.topologyOrder(), false, false); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index f8c4c7e..a1c40f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -8705,7 +8705,7 @@ public abstract class IgniteUtils { assert drVer != null; - UNSAFE.putInt(arr, off, drVer.topologyVersion()); + UNSAFE.putInt(arr, off, drVer.topologyOrder()); off += 4; @@ -8722,7 +8722,7 @@ public abstract class IgniteUtils { off += 8; } - UNSAFE.putInt(arr, off, ver.topologyVersion()); + UNSAFE.putInt(arr, off, ver.topologyOrder()); off += 4; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryVersionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryVersionSelfTest.java index 28358d4..4e34e16 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryVersionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryVersionSelfTest.java @@ -116,7 +116,7 @@ public class GridCacheEntryVersionSelfTest extends GridCommonAbstractTest { long order = grid.affinity(null).mapKeyToNode(key).order(); // Check topology version. - assertEquals(3, ver.topologyVersion() - + assertEquals(3, ver.topologyOrder() - (grid.context().discovery().gridStartTime() - TOP_VER_BASE_TIME) / 1000); // Check node order. @@ -143,7 +143,7 @@ public class GridCacheEntryVersionSelfTest extends GridCommonAbstractTest { long order = grid.affinity(null).mapKeyToNode(key).order(); // Check topology version. - assertEquals(4, ver.topologyVersion() - + assertEquals(4, ver.topologyOrder() - (grid.context().discovery().gridStartTime() - TOP_VER_BASE_TIME) / 1000); // Check node order. -- 1.9.5.msysgit.0 From b6bef13eef82d9a2e040c01881d07c186ade372e Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Tue, 11 Aug 2015 10:17:24 +0300 Subject: [PATCH 15/15] ignite-946: simplified VersionedEntry interface --- .../ignite/cache/version/VersionedEntry.java | 83 ++++++++++------------ .../cache/version/CacheVersionedEntryImpl.java | 23 +----- .../cache/version/GridCacheVersionManager.java | 4 +- .../version/CacheVersionedEntryAbstractTest.java | 9 ++- 4 files changed, 45 insertions(+), 74 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java index 1aac68a..135d681 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java @@ -24,7 +24,7 @@ import javax.cache.processor.*; import java.util.*; /** - * Cache entry that stores entry's version related information. + * Cache entry that stores entry's version related information along with its data. * * To get a {@code VersionedEntry} of an {@link Cache.Entry} use {@link Cache.Entry#unwrap(Class)} by passing * {@code VersionedEntry} class to it as the argument. @@ -42,62 +42,51 @@ import java.util.*; * is excluded from responses. *

Java Example

*
- * Cache cache = grid(0).cache(null);
+ * IgniteCache cache = grid(0).cache(null);
  *
- *  cache.invoke(100, new EntryProcessor() {
- *      public Object process(MutableEntry entry, Object... arguments) throws EntryProcessorException {
- *          VersionedEntry verEntry = entry.unwrap(VersionedEntry.class);
- *          return entry;
- *       }
- *   });
+ * VersionedEntry entry1 = cache.invoke(100,
+ *     new EntryProcessor>() {
+ *          public VersionedEntry process(MutableEntry entry,
+ *              Object... arguments) throws EntryProcessorException {
+ *                  return entry.unwrap(VersionedEntry.class);
+ *          }
+ *     });
+ *
+ * // Cache entry for the given key may be updated at some point later.
+ *
+ * VersionedEntry entry2 = cache.invoke(100,
+ *     new EntryProcessor>() {
+ *          public VersionedEntry process(MutableEntry entry,
+ *              Object... arguments) throws EntryProcessorException {
+ *                  return entry.unwrap(VersionedEntry.class);
+ *          }
+ *     });
+ *
+ * if (entry1.version().compareTo(entry2.version()) < 0) {
+ *     // the entry has been updated
+ * }
  * 
*/ public interface VersionedEntry extends Cache.Entry { /** - * Versions comparator. - */ - public static final Comparator VER_COMP = new Comparator() { - @Override public int compare(VersionedEntry o1, VersionedEntry o2) { - int res = Integer.compare(o1.topologyVersion(), o2.topologyVersion()); - - if (res != 0) - return res; - - res = Long.compare(o1.order(), o2.order()); - - if (res != 0) - return res; - - return Integer.compare(o1.nodeOrder(), o2.nodeOrder()); - } - }; - - /** - * Gets the topology version at the time when the entry with a given pair of key and value has been created. - * - * @return Topology version plus number of seconds from the start time of the first grid node. - */ - public int topologyVersion(); - - /** - * Gets versioned entry unique order. - * Each time a cache entry for a given key is updated a new {@code VersionedEntry} with increased order is created. - * - * @return Versioned entry unique order. - */ - public long order(); - - /** - * Gets local node order at the time when the entry with a given pair of key and value has been created. + * Returns a comparable object representing the version of this cache entry. + *

+ * It is valid to compare cache entries' versions for the same key. In this case the latter update will be + * represented by a higher version. The result of versions comparison of cache entries of different keys is + * undefined. * - * @return Local node order on which this version has been assigned. + * @return Version of this cache entry. */ - public int nodeOrder(); + public Comparable version(); /** - * Gets the time when the entry with a given pair of key and value has been created. + * Returns the time when the cache entry for the given key has been updated or initially created. + *

+ * It is valid to compare cache entries' update time for the same key. In this case the latter update will + * be represented by higher update time. The result of update time comparison of cache entries of different keys is + * undefined. * * @return Time in milliseconds. */ - public long creationTime(); + public long updateTime(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java index 22b699a..97b1995 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java @@ -51,30 +51,13 @@ public class CacheVersionedEntryImpl extends CacheEntryImpl implemen this.ver = ver; } - /** - * @return Version. - */ - @Nullable public GridCacheVersion version() { - return ver; - } - - /** {@inheritDoc} */ - @Override public int topologyVersion() { - return ver.topologyOrder(); - } - - /** {@inheritDoc} */ - @Override public int nodeOrder() { - return ver.nodeOrder(); - } - /** {@inheritDoc} */ - @Override public long order() { - return ver.order(); + public GridCacheVersion version() { + return ver; } /** {@inheritDoc} */ - @Override public long creationTime() { + @Override public long updateTime() { return ver.globalTime(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index d465a0c..85b14a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -224,6 +224,8 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { if (topVer == -1) topVer = cctx.kernalContext().discovery().topologyVersion(); + long globalTime = cctx.kernalContext().clockSync().adjustedTime(topVer); + if (addTime) { if (gridStartTime == 0) gridStartTime = cctx.kernalContext().discovery().gridStartTime(); @@ -231,8 +233,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { topVer += (gridStartTime - TOP_VER_BASE_TIME) / 1000; } - long globalTime = cctx.kernalContext().clockSync().adjustedTime(topVer); - int locNodeOrder = (int)cctx.localNode().order(); if (txSerEnabled) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java index 0ad8038..9648b9b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java @@ -152,7 +152,8 @@ public abstract class CacheVersionedEntryAbstractTest extends GridCacheAbstractS } }); - assert VersionedEntry.VER_COMP.compare(ver1, ver2) < 0; + assert ver1.version().compareTo(ver2.version()) < 0; + assert ver1.updateTime() < ver2.updateTime(); } /** @@ -161,10 +162,8 @@ public abstract class CacheVersionedEntryAbstractTest extends GridCacheAbstractS private void checkVersionedEntry(VersionedEntry entry) { assertNotNull(entry); - assert entry.topologyVersion() > 0; - assert entry.order() > 0; - assert entry.nodeOrder() > 0; - assert entry.creationTime() > 0; + assertNotNull(entry.version()); + assert entry.updateTime() > 0; assertNotNull(entry.getKey()); assertNotNull(entry.getValue()); -- 1.9.5.msysgit.0