diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index cdd5f90..01d8c58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -344,6 +344,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc @Override public Integer call() throws Exception { Integer val; + //REMOVE TR try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheCountDownLatchValue latchVal = latchView.get(key); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 2fd40f6..b28e0aa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -20,14 +20,18 @@ package org.apache.ignite.internal.processors.cache.datastructures; import java.util.Collection; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteAtomicReference; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteAtomicStamped; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.AtomicConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -38,6 +42,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.testframework.GridTestUtils; @@ -65,6 +70,15 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** */ private static final int TOP_CHANGE_THREAD_CNT = 3; + /** */ + private static final int TOP_CHANGED_ERR_RETRY_CNT = 5; + + /** */ + private static final long TOP_CHANGED_ERR_RETRY_TIMEOUT = 3000; + + /** */ + private static final long READY_FUTURE_WAIT_TIMEOUT = 10_000; + /** {@inheritDoc} */ @Override protected long getTestTimeout() { return TEST_TIMEOUT; @@ -127,13 +141,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try (IgniteAtomicLong atomic = grid(0).atomicLong(STRUCTURE_NAME, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); - assert g.atomicLong(STRUCTURE_NAME, 10, true).get() == 10; + assertEquals(10, g.atomicLong(STRUCTURE_NAME, 10, false).get()); - assert g.atomicLong(STRUCTURE_NAME, 10, true).addAndGet(10) == 20; + assertEquals(20, g.atomicLong(STRUCTURE_NAME, 10, false).addAndGet(10)); stopGrid(NEW_GRID_NAME); - assert grid(0).atomicLong(STRUCTURE_NAME, 10, true).get() == 20; + assertEquals(20, grid(0).atomicLong(STRUCTURE_NAME, 10, true).get()); } } @@ -141,97 +155,54 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicLongConstantTopologyChange() throws Exception { - try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override - public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - long val = s.get(); - - while (!fut.isDone()) { - assert s.get() == val; - - assert s.incrementAndGet() == val + 1; - - val++; - } - - fut.get(); - - for (Ignite g : G.allGrids()) - assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get()); - } + doTestAtomicLong(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testAtomicLongConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection names = new GridLeanSet<>(3); - - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); - - names.add(name); + doTestAtomicLong(new ConstantMultipleTopologyChangeWorker()); + } - Ignite g = startGrid(name); + /** + * Tests IgniteAtomicLong. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicLong(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { + final IgniteInternalFuture fut = topWorker.startChangingTopology(new IgniteClosure() { + @Override public Object apply(Ignite ignite) { + assert ignite.atomicLong(STRUCTURE_NAME, 1, true).get() > 0; - assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); - long val = s.get(); + final AtomicLong val = new AtomicLong(s.get()); - while (!fut.isDone()) { - assert s.get() == val; + // Blocks the Thread until run() stops executing. + topWorker.callWithRetryOnTopologyChange(new Runnable() { + @Override public void run() { + while (!fut.isDone()) { + long cVal = val.get(); - assert s.incrementAndGet() == val + 1; + assertEquals(cVal, s.get()); - val++; - } + assertEquals(++cVal, s.incrementAndGet()); + + //Increase only if code above succeeds. + val.incrementAndGet(); + } + } + }); fut.get(); for (Ignite g : G.allGrids()) - assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get()); + assertEquals(val.get(), g.atomicLong(STRUCTURE_NAME, 1, false).get()); } } @@ -242,13 +213,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try (IgniteAtomicReference atomic = grid(0).atomicReference(STRUCTURE_NAME, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); - assert g.atomicReference(STRUCTURE_NAME, 10, true).get() == 10; + assertEquals((Integer)10, g.atomicReference(STRUCTURE_NAME, 10, false).get()); - g.atomicReference(STRUCTURE_NAME, 10, true).set(20); + g.atomicReference(STRUCTURE_NAME, 10, false).set(20); stopGrid(NEW_GRID_NAME); - assertEquals(20, (int) grid(0).atomicReference(STRUCTURE_NAME, 10, true).get()); + assertEquals((Integer)20, grid(0).atomicReference(STRUCTURE_NAME, 10, true).get()); } } @@ -256,93 +227,54 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicReferenceConstantTopologyChange() throws Exception { - try (IgniteAtomicReference s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override - public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.get(); - - while (!fut.isDone()) { - assert s.get() == val; - - s.set(++val); - } - - fut.get(); - - for (Ignite g : G.allGrids()) - assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get()); - } + doTestAtomicReference(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testAtomicReferenceConstantMultipleTopologyChange() throws Exception { + doTestAtomicReference(new ConstantMultipleTopologyChangeWorker()); + } + + /** + * Tests atomic reference. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicReference(ConstantTopologyChangeWorker topWorker) throws Exception { try (IgniteAtomicReference s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection names = new GridLeanSet<>(3); + final IgniteInternalFuture fut = topWorker.startChangingTopology(new IgniteClosure() { + @Override public Object apply(Ignite ignite) { + assert ignite.atomicReference(STRUCTURE_NAME, 1, false).get() > 0; - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); + return null; + } + }); - names.add(name); + final AtomicInteger val = new AtomicInteger(s.get()); - Ignite g = startGrid(name); + // Blocks the Thread until run() stops executing. + topWorker.callWithRetryOnTopologyChange(new Runnable() { + @Override public void run() { + while (!fut.isDone()) { + int cVal = val.get(); - assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.get(); + assertEquals((Integer)cVal, s.get()); - while (!fut.isDone()) { - assert s.get() == val; + s.set(++cVal); - s.set(++val); - } + //Increase only if code above succeeds. + val.incrementAndGet(); + } + } + }); fut.get(); for (Ignite g : G.allGrids()) - assert g.atomicReference(STRUCTURE_NAME, 1, true).get() == val; + assertEquals(val.get(), (int)g.atomicReference(STRUCTURE_NAME, 1, true).get()); } } @@ -353,19 +285,19 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try (IgniteAtomicStamped atomic = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); - IgniteBiTuple t = g.atomicStamped(STRUCTURE_NAME, 10, 10, true).get(); + IgniteBiTuple t = g.atomicStamped(STRUCTURE_NAME, 10, 10, false).get(); - assert t.get1() == 10; - assert t.get2() == 10; + assertEquals((Integer)10, t.get1()); + assertEquals((Integer)10, t.get2()); - g.atomicStamped(STRUCTURE_NAME, 10, 10, true).set(20, 20); + g.atomicStamped(STRUCTURE_NAME, 10, 10, false).set(20, 20); stopGrid(NEW_GRID_NAME); - t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true).get(); + t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, false).get(); - assert t.get1() == 20; - assert t.get2() == 20; + assertEquals((Integer)20, t.get1()); + assertEquals((Integer)20, t.get2()); } } @@ -373,118 +305,65 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicStampedConstantTopologyChange() throws Exception { - try (IgniteAtomicStamped s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override - public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - IgniteBiTuple t = - g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); - - assert t.get1() > 0; - assert t.get2() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.value(); - - while (!fut.isDone()) { - IgniteBiTuple t = s.get(); - - assert t.get1() == val; - assert t.get2() == val; - - val++; - - s.set(val, val); - } - - fut.get(); - - for (Ignite g : G.allGrids()) { - IgniteBiTuple t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); - - assert t.get1() == val; - assert t.get2() == val; - } - } + doTestAtomicStamped(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testAtomicStampedConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicStamped s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection names = new GridLeanSet<>(3); + doTestAtomicStamped(new ConstantMultipleTopologyChangeWorker()); + } - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); + /** + * Tests atomic stamped value. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicStamped(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicStamped s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { + final IgniteInternalFuture fut = topWorker.startChangingTopology(new IgniteClosure() { + @Override public Object apply(Ignite ignite) { + IgniteBiTuple t = ignite.atomicStamped(STRUCTURE_NAME, 1, 1, false).get(); - names.add(name); + assert t.get1() > 0; + assert t.get2() > 0; - Ignite g = startGrid(name); + return null; + } + }); - IgniteBiTuple t = - g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); + final AtomicInteger val = new AtomicInteger(s.value()); - assert t.get1() > 0; - assert t.get2() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + // Blocks the Thread until run() stops executing. + topWorker.callWithRetryOnTopologyChange(new Runnable() { + @Override public void run() { + while (!fut.isDone()) { + IgniteBiTuple t = s.get(); - int val = s.value(); + int cVal = val.get(); - while (!fut.isDone()) { - IgniteBiTuple t = s.get(); + assertEquals((Integer)cVal, t.get1()); + assertEquals((Integer)cVal, t.get2()); - assert t.get1() == val; - assert t.get2() == val; + cVal++; - val++; + s.set(cVal, cVal); - s.set(val, val); - } + //Increase only if code above succeeds. + val.incrementAndGet(); + } + } + }); fut.get(); for (Ignite g : G.allGrids()) { - IgniteBiTuple t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); + IgniteBiTuple t = g.atomicStamped(STRUCTURE_NAME, 1, 1, false).get(); - assert t.get1() == val; - assert t.get2() == val; + assertEquals((Integer)val.get(), t.get1()); + assertEquals((Integer)val.get(), t.get2()); } } } @@ -497,16 +376,16 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try { Ignite g = startGrid(NEW_GRID_NAME); - assert g.countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 20; + assertEquals(20, g.countDownLatch(STRUCTURE_NAME, 20, true, false).count()); - g.countDownLatch(STRUCTURE_NAME, 20, true, true).countDown(10); + g.countDownLatch(STRUCTURE_NAME, 20, true, false).countDown(10); stopGrid(NEW_GRID_NAME); - assert grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 10; + assertEquals(10, grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).count()); } finally { - grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).countDownAll(); + grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).countDownAll(); } } } @@ -515,102 +394,52 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testCountDownLatchConstantTopologyChange() throws Exception { - try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) { - try { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.count(); - - while (!fut.isDone()) { - assert s.count() == val; - - assert s.countDown() == val - 1; - - val--; - } - - fut.get(); - - for (Ignite g : G.allGrids()) - assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count() == val; - } - finally { - grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).countDownAll(); - } - } + doTestCountDownLatch(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testCountDownLatchConstantMultipleTopologyChange() throws Exception { + doTestCountDownLatch(new ConstantMultipleTopologyChangeWorker()); + } + + /** + * Tests distributed count down latch. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestCountDownLatch(ConstantTopologyChangeWorker topWorker) throws Exception { try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) { try { - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection names = new GridLeanSet<>(3); + final IgniteInternalFuture fut = topWorker.startChangingTopology( + new IgniteClosure() { + @Override public Object apply(Ignite ignite) { + assert ignite.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count() > 0; - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); + return null; + } + }); - names.add(name); + final AtomicInteger val = new AtomicInteger(s.count()); - Ignite g = startGrid(name); + topWorker.callWithRetryOnTopologyChange(new Runnable() { + @Override public void run() { + while (!fut.isDone()) { + assertEquals(val.get(), s.count()); + assertEquals(val.get() - 1, s.countDown()); - assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); + //Increase only if code above succeeds. + val.decrementAndGet(); } } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.count(); - - while (!fut.isDone()) { - assert s.count() == val; - - assert s.countDown() == val - 1; - - val--; - } + }); fut.get(); for (Ignite g : G.allGrids()) - assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count()); + assertEquals(val.get(), g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count()); } finally { grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).countDownAll(); @@ -739,7 +568,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicSequenceTopologyChange() throws Exception { - try (IgniteAtomicSequence s = grid().atomicSequence(STRUCTURE_NAME, 10, true)) { + try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); assert g.atomicSequence(STRUCTURE_NAME, 10, false).get() == 1010; @@ -825,21 +654,21 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig while (!fut.isDone()) { grid(0).compute().call(new IgniteCallable() { - /** */ - @IgniteInstanceResource - private Ignite g; + /** */ + @IgniteInstanceResource + private Ignite g; - @Override public Object call() throws Exception { - IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true); + @Override public Object call() throws Exception { + IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true); - assert seq != null; + assert seq != null; - for (int i = 0; i < 1000; i++) - seq.getAndIncrement(); + for (int i = 0; i < 1000; i++) + seq.getAndIncrement(); - return null; - } - }); + return null; + } + }); } fut.get(); @@ -911,7 +740,6 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try { g.transactions().txStart(); - g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1); assert g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet() == val + 1; @@ -928,4 +756,178 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig assert grid(0).atomicLong(STRUCTURE_NAME, val, false).get() == val + 1; } + + /** + * + */ + private class ConstantTopologyChangeWorker { + /** */ + protected final AtomicBoolean failed = new AtomicBoolean(false); + + /** + * Starts changing cluster's topology. + * + * @return Future. + */ + IgniteInternalFuture startChangingTopology(final IgniteClosure callback) { + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + try { + for (int i = 0; i < TOP_CHANGE_CNT; i++) { + if (failed.get()) + return; + + String name = UUID.randomUUID().toString(); + + try { + Ignite g = startGrid(name); + + int rtrCnt = 0; + + while (true) { + if (failed.get()) + return; + + try { + callback.apply(g); + + break; + } + catch (ClusterTopologyException e) { + log.warning(e.getMessage()); + + if (rtrCnt++ == TOP_CHANGED_ERR_RETRY_CNT) + throw new IgniteCheckedException( + "ClusterTopologyException retries are exhausted", e); + + if (e.retryReadyFuture() != null) + e.retryReadyFuture().get(READY_FUTURE_WAIT_TIMEOUT); + else + Thread.sleep(TOP_CHANGED_ERR_RETRY_TIMEOUT); + } + } + } + finally { + if (i != TOP_CHANGE_CNT - 1) + stopGrid(name); + } + } + } + catch (Exception e) { + failed.set(true); + + throw F.wrap(e); + } + } + }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + + return fut; + } + + /** + * Executes callback automatically processing cluster topology error. + * + * @param callback Callback. + * @throws Exception If failed. + */ + void callWithRetryOnTopologyChange(final Runnable callback) throws Exception { + int rtrCnt = 0; + + while (true) { + try { + callback.run(); + + break; + } + catch (ClusterTopologyException e) { + log.warning(e.getMessage()); + + if (rtrCnt++ == TOP_CHANGED_ERR_RETRY_CNT) + throw new IgniteCheckedException("ClusterTopologyException retries are exhausted", e); + + if (e.retryReadyFuture() != null) + e.retryReadyFuture().get(READY_FUTURE_WAIT_TIMEOUT); + else + Thread.sleep(TOP_CHANGED_ERR_RETRY_TIMEOUT); + } + } + } + } + + /** + * + */ + private class ConstantMultipleTopologyChangeWorker extends ConstantTopologyChangeWorker { + /** + * Starts changing cluster's topology. + * + * @return Future. + */ + @Override IgniteInternalFuture startChangingTopology(final IgniteClosure callback) { + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + try { + for (int i = 0; i < TOP_CHANGE_CNT; i++) { + if (failed.get()) + return; + + Collection names = new GridLeanSet<>(3); + + try { + for (int j = 0; j < 3; j++) { + if (failed.get()) + return; + + String name = UUID.randomUUID().toString(); + + Ignite g = startGrid(name); + + names.add(name); + + int rtrCnt = 0; + + while (true) { + if (failed.get()) + return; + + try { + callback.apply(g); + + break; + } + catch (ClusterTopologyException e) { + log.warning(e.getMessage()); + + if (rtrCnt++ == TOP_CHANGED_ERR_RETRY_CNT) + throw new IgniteCheckedException( + "ClusterTopologyException retries are exhausted", e); + + if (e.retryReadyFuture() != null) + e.retryReadyFuture().get(READY_FUTURE_WAIT_TIMEOUT); + else + Thread.sleep(TOP_CHANGED_ERR_RETRY_TIMEOUT); + } + } + } + } + finally { + if (i != TOP_CHANGE_CNT - 1) { + + for (String name : names) + stopGrid(name); + } + } + } + } + catch (Exception e) { + failed.set(true); + + throw F.wrap(e); + } + } + }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + + return fut; + } + } } \ No newline at end of file diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java index 86b763a..a9cd470 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java @@ -34,6 +34,4 @@ public class GridCachePartitionedOffheapDataStructuresFailoverSelfTest extends G @Override protected CacheMemoryMode collectionMemoryMode() { return OFFHEAP_TIERED; } - - } \ No newline at end of file diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java index 69de7cd..902ba44 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java @@ -32,11 +32,6 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED; public class GridCacheReplicatedDataStructuresFailoverSelfTest extends GridCacheAbstractDataStructuresFailoverSelfTest { /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-801"); - } - - /** {@inheritDoc} */ @Override protected CacheMode collectionCacheMode() { return REPLICATED; }