diff --git a/examples/schema-import/pom.xml b/examples/schema-import/pom.xml index 5bea512..32ce869 100644 --- a/examples/schema-import/pom.xml +++ b/examples/schema-import/pom.xml @@ -20,7 +20,10 @@ - + 4.0.0 @@ -88,14 +91,6 @@ 1.7 - - - org.apache.maven.plugins - maven-deploy-plugin - - true - - diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 6aba211..fb2efe2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -102,6 +102,7 @@ import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.plugin.segmentation.SegmentationPolicy; +import org.apache.ignite.spi.IgniteSpi; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; import org.apache.ignite.spi.discovery.DiscoverySpi; @@ -1767,6 +1768,17 @@ public class GridDiscoveryManager extends GridManagerAdapter { } /** + * Failure detection timeout used by discovery SPI. If the timeout is disabled then a value of the + * network timeout is returned. + * + * @return . + */ + public long failureDetectionTimeout() { + return getSpi().failureDetectionTimeoutEnabled() ? ctx.config().getFailureDetectionTimeout() : + ctx.config().getNetworkTimeout(); + } + + /** * Updates topology version if current version is smaller than updated. * * @param updated Updated topology version. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index ee1f4a1..f84054a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1788,19 +1788,16 @@ public class GridCacheUtils { return c.call(); } catch (IgniteCheckedException e) { - if (X.hasCause(e, ClusterTopologyCheckedException.class) || - X.hasCause(e, IgniteTxRollbackCheckedException.class) || - X.hasCause(e, CachePartialUpdateCheckedException.class)) { - if (i < retries - 1) { - err = e; - - U.sleep(1); + if (i == retries) + throw e; - continue; - } + if (X.hasCause(e, ClusterTopologyCheckedException.class)) { + ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); - throw e; + topErr.retryReadyFuture().get(); } + else if (X.hasCause(e, IgniteTxRollbackCheckedException.class)) + U.sleep(1); else throw e; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 1ff4575..ac90efc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.datastructures.GridTransactionalCac import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -770,4 +771,4 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { return "RemoveSetCallable [setId=" + setId + ']'; } } -} \ No newline at end of file +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index a68e834..e0e8f6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -710,8 +710,9 @@ public class GridPartitionedGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture implements GridCacheAtomicRef * @return Callable for execution in async and sync mode. */ private Callable internalSet(final T val) { - return new Callable() { + return retryTopologySafe(new Callable() { @Override public Boolean call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue ref = atomicView.get(key); @@ -252,7 +253,7 @@ public final class GridCacheAtomicReferenceImpl implements GridCacheAtomicRef throw e; } } - }; + }); } /** @@ -265,7 +266,8 @@ public final class GridCacheAtomicReferenceImpl implements GridCacheAtomicRef */ private Callable internalCompareAndSet(final IgnitePredicate expValPred, final IgniteClosure newValClos) { - return new Callable() { + + return retryTopologySafe(new Callable() { @Override public Boolean call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue ref = atomicView.get(key); @@ -295,7 +297,7 @@ public final class GridCacheAtomicReferenceImpl implements GridCacheAtomicRef throw e; } } - }; + }); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index 2667938..c984ab3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -342,20 +342,9 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc private class GetCountCallable implements Callable { /** {@inheritDoc} */ @Override public Integer call() throws Exception { - Integer val; + GridCacheCountDownLatchValue latchVal = latchView.get(key); - try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheCountDownLatchValue latchVal = latchView.get(key); - - if (latchVal == null) - return 0; - - val = latchVal.get(); - - tx.rollback(); - } - - return val; + return latchVal == null ? 0 : latchVal.get(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java index 0e4aebc..0843eac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java @@ -169,14 +169,22 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp @SuppressWarnings("unchecked") @Nullable @Override public T peek() throws IgniteException { try { - GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); + while (true) { + GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); - checkRemoved(hdr); + checkRemoved(hdr); - if (hdr.empty()) - return null; + if (hdr.empty()) + return null; + + T val = (T)cache.get(itemKey(hdr.head())); - return (T)cache.get(itemKey(hdr.head())); + if (val == null) + // Header might have been polled. Retry. + continue; + + return val; + } } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -416,8 +424,7 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp long startIdx, long endIdx, int batchSize) - throws IgniteCheckedException - { + throws IgniteCheckedException { Set keys = new HashSet<>(batchSize > 0 ? batchSize : 10); for (long idx = startIdx; idx < endIdx; idx++) { @@ -435,8 +442,7 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp } /** - * Checks result of closure modifying queue header, throws {@link IllegalStateException} - * if queue was removed. + * Checks result of closure modifying queue header, throws {@link IllegalStateException} if queue was removed. * * @param idx Result of closure execution. */ @@ -529,7 +535,6 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp */ protected abstract void removeItem(long rmvIdx) throws IgniteCheckedException; - /** * @param idx Item index. * @return Item key. @@ -816,7 +821,8 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp } next++; - } while (next != hdr.tail()); + } + while (next != hdr.tail()); GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), hdr.capacity(), @@ -1036,7 +1042,7 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp if (o == null || getClass() != o.getClass()) return false; - GridCacheQueueAdapter that = (GridCacheQueueAdapter) o; + GridCacheQueueAdapter that = (GridCacheQueueAdapter)o; return id.equals(that.id); @@ -1051,4 +1057,4 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp @Override public String toString() { return S.toString(GridCacheQueueAdapter.class, this); } -} \ No newline at end of file +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java index c7750a6..b14bb5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java @@ -244,4 +244,4 @@ public class GridTransactionalCacheQueueImpl extends GridCacheQueueAdapter throw U.convertException(e); } } -} \ No newline at end of file +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java index 612c1f1..baa26d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java @@ -164,4 +164,11 @@ public interface DiscoverySpi extends IgniteSpi { * @throws IllegalStateException If discovery SPI has not started. */ public boolean isClientMode() throws IllegalStateException; -} \ No newline at end of file + + /** + * Checks whether failure detection timeout is enabled for the discovery SPI. + * + * @return {@code true} if enabled, {@code false} otherwise. + */ + public boolean failureDetectionTimeoutEnabled(); +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 2fd40f6..6e91107 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.datastructures; import java.util.Collection; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteAtomicReference; @@ -38,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.testframework.GridTestUtils; @@ -127,13 +130,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try (IgniteAtomicLong atomic = grid(0).atomicLong(STRUCTURE_NAME, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); - assert g.atomicLong(STRUCTURE_NAME, 10, true).get() == 10; + assertEquals(10, g.atomicLong(STRUCTURE_NAME, 10, false).get()); - assert g.atomicLong(STRUCTURE_NAME, 10, true).addAndGet(10) == 20; + assertEquals(20, g.atomicLong(STRUCTURE_NAME, 10, false).addAndGet(10)); stopGrid(NEW_GRID_NAME); - assert grid(0).atomicLong(STRUCTURE_NAME, 10, true).get() == 20; + assertEquals(20, grid(0).atomicLong(STRUCTURE_NAME, 10, true).get()); } } @@ -141,97 +144,44 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicLongConstantTopologyChange() throws Exception { - try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override - public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - long val = s.get(); - - while (!fut.isDone()) { - assert s.get() == val; - - assert s.incrementAndGet() == val + 1; - - val++; - } - - fut.get(); - - for (Ignite g : G.allGrids()) - assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get()); - } + doTestAtomicLong(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testAtomicLongConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection names = new GridLeanSet<>(3); - - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); - - names.add(name); + doTestAtomicLong(new ConstantMultipleTopologyChangeWorker()); + } - Ignite g = startGrid(name); + /** + * Tests IgniteAtomicLong. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicLong(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { + IgniteInternalFuture fut = topWorker.startChangingTopology(new IgniteClosure() { + @Override public Object apply(Ignite ignite) { + assert ignite.atomicLong(STRUCTURE_NAME, 1, true).get() > 0; - assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); long val = s.get(); while (!fut.isDone()) { - assert s.get() == val; - - assert s.incrementAndGet() == val + 1; + assertEquals(val, s.get()); - val++; + assertEquals(++val, s.incrementAndGet()); } fut.get(); for (Ignite g : G.allGrids()) - assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get()); + assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, false).get()); } } @@ -242,13 +192,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try (IgniteAtomicReference atomic = grid(0).atomicReference(STRUCTURE_NAME, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); - assert g.atomicReference(STRUCTURE_NAME, 10, true).get() == 10; + assertEquals((Integer)10, g.atomicReference(STRUCTURE_NAME, 10, false).get()); - g.atomicReference(STRUCTURE_NAME, 10, true).set(20); + g.atomicReference(STRUCTURE_NAME, 10, false).set(20); stopGrid(NEW_GRID_NAME); - assertEquals(20, (int) grid(0).atomicReference(STRUCTURE_NAME, 10, true).get()); + assertEquals((Integer)20, grid(0).atomicReference(STRUCTURE_NAME, 10, true).get()); } } @@ -256,85 +206,36 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicReferenceConstantTopologyChange() throws Exception { - try (IgniteAtomicReference s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override - public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.get(); - - while (!fut.isDone()) { - assert s.get() == val; - - s.set(++val); - } - - fut.get(); - - for (Ignite g : G.allGrids()) - assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get()); - } + doTestAtomicReference(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testAtomicReferenceConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicReference s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection names = new GridLeanSet<>(3); - - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); - - names.add(name); + doTestAtomicReference(new ConstantMultipleTopologyChangeWorker()); + } - Ignite g = startGrid(name); + /** + * Tests atomic reference. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicReference(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicReference s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) { + IgniteInternalFuture fut = topWorker.startChangingTopology(new IgniteClosure() { + @Override public Object apply(Ignite ignite) { + assert ignite.atomicReference(STRUCTURE_NAME, 1, false).get() > 0; - assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); int val = s.get(); while (!fut.isDone()) { - assert s.get() == val; + assertEquals(val, (int)s.get()); s.set(++val); } @@ -342,7 +243,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig fut.get(); for (Ignite g : G.allGrids()) - assert g.atomicReference(STRUCTURE_NAME, 1, true).get() == val; + assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get()); } } @@ -353,19 +254,19 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try (IgniteAtomicStamped atomic = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); - IgniteBiTuple t = g.atomicStamped(STRUCTURE_NAME, 10, 10, true).get(); + IgniteBiTuple t = g.atomicStamped(STRUCTURE_NAME, 10, 10, false).get(); - assert t.get1() == 10; - assert t.get2() == 10; + assertEquals((Integer)10, t.get1()); + assertEquals((Integer)10, t.get2()); - g.atomicStamped(STRUCTURE_NAME, 10, 10, true).set(20, 20); + g.atomicStamped(STRUCTURE_NAME, 10, 10, false).set(20, 20); stopGrid(NEW_GRID_NAME); - t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true).get(); + t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, false).get(); - assert t.get1() == 20; - assert t.get2() == 20; + assertEquals((Integer)20, t.get1()); + assertEquals((Integer)20, t.get2()); } } @@ -373,107 +274,44 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicStampedConstantTopologyChange() throws Exception { - try (IgniteAtomicStamped s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override - public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - IgniteBiTuple t = - g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); - - assert t.get1() > 0; - assert t.get2() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.value(); - - while (!fut.isDone()) { - IgniteBiTuple t = s.get(); - - assert t.get1() == val; - assert t.get2() == val; - - val++; - - s.set(val, val); - } - - fut.get(); - - for (Ignite g : G.allGrids()) { - IgniteBiTuple t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); - - assert t.get1() == val; - assert t.get2() == val; - } - } + doTestAtomicStamped(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testAtomicStampedConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicStamped s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection names = new GridLeanSet<>(3); - - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); - - names.add(name); + doTestAtomicStamped(new ConstantMultipleTopologyChangeWorker()); + } - Ignite g = startGrid(name); + /** + * Tests atomic stamped value. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicStamped(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicStamped s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { + IgniteInternalFuture fut = topWorker.startChangingTopology(new IgniteClosure() { + @Override public Object apply(Ignite ignite) { + IgniteBiTuple t = ignite.atomicStamped(STRUCTURE_NAME, 1, 1, false).get(); - IgniteBiTuple t = - g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); + assert t.get1() > 0; + assert t.get2() > 0; - assert t.get1() > 0; - assert t.get2() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); int val = s.value(); while (!fut.isDone()) { IgniteBiTuple t = s.get(); - assert t.get1() == val; - assert t.get2() == val; + assertEquals(val, (int)t.get1()); + assertEquals(val, (int)t.get2()); - val++; + ++val; s.set(val, val); } @@ -481,10 +319,10 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig fut.get(); for (Ignite g : G.allGrids()) { - IgniteBiTuple t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); + IgniteBiTuple t = g.atomicStamped(STRUCTURE_NAME, 1, 1, false).get(); - assert t.get1() == val; - assert t.get2() == val; + assertEquals(val, (int)t.get1()); + assertEquals(val, (int)t.get2()); } } } @@ -497,16 +335,16 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try { Ignite g = startGrid(NEW_GRID_NAME); - assert g.countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 20; + assertEquals(20, g.countDownLatch(STRUCTURE_NAME, 20, true, false).count()); - g.countDownLatch(STRUCTURE_NAME, 20, true, true).countDown(10); + g.countDownLatch(STRUCTURE_NAME, 20, true, false).countDown(10); stopGrid(NEW_GRID_NAME); - assert grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 10; + assertEquals(10, grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).count()); } finally { - grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).countDownAll(); + grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).countDownAll(); } } } @@ -515,102 +353,45 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testCountDownLatchConstantTopologyChange() throws Exception { - try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) { - try { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.count(); - - while (!fut.isDone()) { - assert s.count() == val; - - assert s.countDown() == val - 1; - - val--; - } - - fut.get(); - - for (Ignite g : G.allGrids()) - assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count() == val; - } - finally { - grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).countDownAll(); - } - } + doTestCountDownLatch(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testCountDownLatchConstantMultipleTopologyChange() throws Exception { + doTestCountDownLatch(new ConstantMultipleTopologyChangeWorker()); + } + + /** + * Tests distributed count down latch. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestCountDownLatch(ConstantTopologyChangeWorker topWorker) throws Exception { try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) { try { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection names = new GridLeanSet<>(3); + IgniteInternalFuture fut = topWorker.startChangingTopology( + new IgniteClosure() { + @Override public Object apply(Ignite ignite) { + assert ignite.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count() > 0; - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); - - names.add(name); - - Ignite g = startGrid(name); - - assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } + return null; } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); int val = s.count(); while (!fut.isDone()) { - assert s.count() == val; - - assert s.countDown() == val - 1; - - val--; + assertEquals(val, s.count()); + assertEquals(--val, s.countDown()); } fut.get(); for (Ignite g : G.allGrids()) - assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count()); + assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count()); } finally { grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).countDownAll(); @@ -627,13 +408,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig Ignite g = startGrid(NEW_GRID_NAME); - assert g.queue(STRUCTURE_NAME, 0, null).poll() == 10; + assertEquals(10, (int)g.queue(STRUCTURE_NAME, 0, null).poll()); g.queue(STRUCTURE_NAME, 0, null).put(20); stopGrid(NEW_GRID_NAME); - assert grid(0).queue(STRUCTURE_NAME, 0, null).peek() == 20; + assertEquals(20, (int)grid(0).queue(STRUCTURE_NAME, 0, null).peek()); } finally { grid(0).queue(STRUCTURE_NAME, 0, null).close(); @@ -644,153 +425,74 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testQueueConstantTopologyChange() throws Exception { - try (IgniteQueue s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) { - s.put(1); - - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - assert g.queue(STRUCTURE_NAME, 0, null).peek() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.peek(); - - int origVal = val; - - while (!fut.isDone()) - s.put(++val); - - fut.get(); - - for (Ignite g : G.allGrids()) - assert g.queue(STRUCTURE_NAME, 0, null).peek() == origVal; - } + doTestQueue(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testQueueConstantMultipleTopologyChange() throws Exception { + doTestQueue(new ConstantMultipleTopologyChangeWorker()); + } + + /** + * Tests the queue. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestQueue(ConstantTopologyChangeWorker topWorker) throws Exception { + int queueMaxSize = 100; + try (IgniteQueue s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) { s.put(1); - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection names = new GridLeanSet<>(3); + IgniteInternalFuture fut = topWorker.startChangingTopology(new IgniteClosure() { + @Override public Object apply(Ignite ignite) { + IgniteQueue queue = ignite.queue(STRUCTURE_NAME, 0, null); - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); + assertNotNull(queue); - names.add(name); + Integer val = queue.peek(); - Ignite g = startGrid(name); + assertNotNull(val); - assert g.queue(STRUCTURE_NAME, 0, null).peek() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + assert val > 0; + + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); int val = s.peek(); - int origVal = val; - - while (!fut.isDone()) - s.put(++val); - - fut.get(); - - for (Ignite g : G.allGrids()) - assert g.queue(STRUCTURE_NAME, 0, null).peek() == origVal; - } - } - - /** - * @throws Exception If failed. - */ - public void testAtomicSequenceTopologyChange() throws Exception { - try (IgniteAtomicSequence s = grid().atomicSequence(STRUCTURE_NAME, 10, true)) { - Ignite g = startGrid(NEW_GRID_NAME); - - assert g.atomicSequence(STRUCTURE_NAME, 10, false).get() == 1010; + while (!fut.isDone()) { + if (s.size() == queueMaxSize) { + int last = 0; - assert g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10) == 1020; + for (int i = 0, size = s.size() - 1; i < size; i++) { + int cur = s.poll(); - stopGrid(NEW_GRID_NAME); - } - } + if (i == 0) { + last = cur; - /** - * @throws Exception If failed. - */ - public void testAtomicSequenceConstantTopologyChange() throws Exception { - try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - String name = UUID.randomUUID().toString(); + continue; + } - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - try { - Ignite g = startGrid(name); + assertEquals(last, cur - 1); - assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0); - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); + last = cur; } } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - long old = s.get(); - - while (!fut.isDone()) { - assertEquals(old, s.get()); - - long val = s.incrementAndGet(); - assertTrue(val > old); - - old = val; + s.put(++val); } fut.get(); + + val = s.peek(); + + for (Ignite g : G.allGrids()) + assertEquals(val, (int)g.queue(STRUCTURE_NAME, 0, null).peek()); } } @@ -825,21 +527,21 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig while (!fut.isDone()) { grid(0).compute().call(new IgniteCallable() { - /** */ - @IgniteInstanceResource - private Ignite g; + /** */ + @IgniteInstanceResource + private Ignite g; - @Override public Object call() throws Exception { - IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true); + @Override public Object call() throws Exception { + IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true); - assert seq != null; + assert seq != null; - for (int i = 0; i < 1000; i++) - seq.getAndIncrement(); + for (int i = 0; i < 1000; i++) + seq.getAndIncrement(); - return null; - } - }); + return null; + } + }); } fut.get(); @@ -848,37 +550,47 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** * @throws Exception If failed. */ - public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection names = new GridLeanSet<>(3); + public void testAtomicSequenceTopologyChange() throws Exception { + try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 10, true)) { + Ignite g = startGrid(NEW_GRID_NAME); - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); + assertEquals(1010, g.atomicSequence(STRUCTURE_NAME, 10, false).get()); - names.add(name); + assertEquals(1020, g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10)); - Ignite g = startGrid(name); + stopGrid(NEW_GRID_NAME); + } + } - assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0); - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + /** + * @throws Exception If failed. + */ + public void testAtomicSequenceConstantTopologyChange() throws Exception { + doTestAtomicSequence(new ConstantTopologyChangeWorker()); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception { + doTestAtomicSequence(new ConstantMultipleTopologyChangeWorker()); + } + + /** + * Tests atomic sequence. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicSequence(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) { + IgniteInternalFuture fut = topWorker.startChangingTopology(new IgniteClosure() { + @Override public Object apply(Ignite ignite) { + assertTrue(ignite.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0); + + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); long old = s.get(); @@ -911,10 +623,9 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try { g.transactions().txStart(); - g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1); - assert g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet() == val + 1; + assertEquals(val + 1, g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet()); } finally { stopGrid(NEW_GRID_NAME); @@ -926,6 +637,103 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig waitForDiscovery(G.allGrids().toArray(new Ignite[gridCount()])); - assert grid(0).atomicLong(STRUCTURE_NAME, val, false).get() == val + 1; + assertEquals(val + 1, grid(0).atomicLong(STRUCTURE_NAME, val, false).get()); + } + + /** + * + */ + private class ConstantTopologyChangeWorker { + /** */ + protected final AtomicBoolean failed = new AtomicBoolean(false); + + /** + * Starts changing cluster's topology. + * + * @return Future. + */ + IgniteInternalFuture startChangingTopology(final IgniteClosure callback) { + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + try { + for (int i = 0; i < TOP_CHANGE_CNT; i++) { + if (failed.get()) + return; + + String name = UUID.randomUUID().toString(); + + try { + Ignite g = startGrid(name); + + callback.apply(g); + } + finally { + if (i != TOP_CHANGE_CNT - 1) + stopGrid(name); + } + } + } + catch (Exception e) { + if (failed.compareAndSet(false, true)) + throw F.wrap(e); + } + } + }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + + return fut; + } + } + + /** + * + */ + private class ConstantMultipleTopologyChangeWorker extends ConstantTopologyChangeWorker { + /** + * Starts changing cluster's topology. + * + * @return Future. + */ + @Override IgniteInternalFuture startChangingTopology(final IgniteClosure callback) { + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + try { + for (int i = 0; i < TOP_CHANGE_CNT; i++) { + if (failed.get()) + return; + + Collection names = new GridLeanSet<>(3); + + try { + for (int j = 0; j < 3; j++) { + if (failed.get()) + return; + + String name = UUID.randomUUID().toString(); + + Ignite g = startGrid(name); + + names.add(name); + + callback.apply(g); + } + } + finally { + if (i != TOP_CHANGE_CNT - 1) { + + for (String name : names) + stopGrid(name); + } + } + } + } + catch (Exception e) { + if (failed.compareAndSet(false, true)) + throw F.wrap(e); + } + } + }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + + return fut; + } } -} \ No newline at end of file +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java index 86b763a..a9cd470 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java @@ -34,6 +34,4 @@ public class GridCachePartitionedOffheapDataStructuresFailoverSelfTest extends G @Override protected CacheMemoryMode collectionMemoryMode() { return OFFHEAP_TIERED; } - - } \ No newline at end of file diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java index 69de7cd..902ba44 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java @@ -32,11 +32,6 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED; public class GridCacheReplicatedDataStructuresFailoverSelfTest extends GridCacheAbstractDataStructuresFailoverSelfTest { /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-801"); - } - - /** {@inheritDoc} */ @Override protected CacheMode collectionCacheMode() { return REPLICATED; }