From 4a003c1b5a1c66af76d10e82561402745cfa594a Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Wed, 12 Aug 2015 16:46:18 +0300 Subject: [PATCH 1/2] ignite-946: reverted renaming topVer to avoid breaking compatibility --- .../cache/query/GridCacheQueryManager.java | 21 +++++- .../CacheScanPartitionQueryFallbackSelfTest.java | 86 ++++++++++++++++++++++ 2 files changed, 103 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 5d3f6a3..9245aab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1260,6 +1260,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte try { boolean loc = qryInfo.local(); + QueryResult res = null; + if (log.isDebugEnabled()) log.debug("Running query: " + qryInfo); @@ -1286,8 +1288,6 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte IgniteSpiCloseableIterator> iter; GridCacheQueryType type; - QueryResult res; - res = loc ? executeQuery(qry, qryInfo.arguments(), loc, qry.subjectId(), taskName, recipient(qryInfo.senderId(), qryInfo.requestId())) : @@ -1496,8 +1496,21 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte throw (Error)e; } finally { - if (rmvIter) - removeQueryResult(qryInfo.senderId(), qryInfo.requestId()); + if (rmvIter) { + if (loc) { + if (res != null) { + try { + res.closeIfNotShared(recipient(qryInfo.senderId(), qryInfo.requestId())); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" + + cctx.nodeId() + "]", e); + } + } + } + else + removeQueryResult(qryInfo.senderId(), qryInfo.requestId()); + } } } finally { diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java index 84ceafd..f708959 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; @@ -157,6 +158,81 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT } /** + * Scan should activate fallback mechanism when new nodes join topology and rebalancing happens in parallel with + * scan query. + * + * @throws Exception In case of error. + */ + public void testScanFallbackOnRebalancing() throws Exception { + cacheMode = CacheMode.PARTITIONED; + clientMode = false; + backups = 1; + commSpiFactory = new TestFallbackOnRebalancingCommunicationSpiFactory(); + + try { + Ignite ignite = startGrids(GRID_CNT); + + final IgniteCacheProxy cache = fillCache(ignite); + + final AtomicBoolean done = new AtomicBoolean(false); + + final AtomicInteger idx = new AtomicInteger(GRID_CNT); + + IgniteInternalFuture fut1 = multithreadedAsync( + new Callable() { + @Override public Object call() throws Exception { + int id = idx.getAndIncrement(); + + while (!done.get()) { + startGrid(id); + Thread.sleep(3000); + + stopGrid(id); + + if (done.get()) + return null; + + Thread.sleep(3000); + } + + return null; + } + }, GRID_CNT); + + IgniteInternalFuture fut2 = multithreadedAsync( + new Callable() { + @Override public Object call() throws Exception { + while (!done.get()) { + ScanQuery query = new ScanQuery<>(); + + IgniteBiTuple tup = remotePartition(cache.context()); + + int part = tup.get1(); + + query.setPartition(part); + + cache.query(query).getAll(); + + Thread.sleep(100); + } + + return null; + } + }, 1); + + Thread.sleep(2 * 60 * 1000); + + done.set(true); + + fut2.get(); + fut1.get(); + } + finally { + stopAllGrids(); + } + } + + /** * Scan should try first remote node and fallbacks to second remote node. * * @throws Exception If failed. @@ -408,4 +484,14 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT }; } } + + /** + * + */ + private static class TestFallbackOnRebalancingCommunicationSpiFactory implements CommunicationSpiFactory { + /** {@inheritDoc} */ + @Override public TcpCommunicationSpi create() { + return new TcpCommunicationSpi(); + } + } } -- 1.9.5.msysgit.0 From 4d8b4bf342393ed8c64ba171bdf5b38ea4797fdc Mon Sep 17 00:00:00 2001 From: Denis Magda Date: Wed, 12 Aug 2015 16:49:57 +0300 Subject: [PATCH 2/2] ignite-1239: fixed bug, improved test --- .../processors/cache/CacheScanPartitionQueryFallbackSelfTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java index f708959..1f5a305 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java @@ -212,15 +212,13 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT query.setPartition(part); cache.query(query).getAll(); - - Thread.sleep(100); } return null; } }, 1); - Thread.sleep(2 * 60 * 1000); + Thread.sleep(60 * 1000); // Test for one minute done.set(true); -- 1.9.5.msysgit.0