Index: modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageTest.java (date 1534155635134) +++ modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageTest.java (date 1534155635134) @@ -0,0 +1,560 @@ +package org.apache.ignite.internal.processors.query.h2.twostep; + +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceArray; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.processors.cache.distributed.dht.MockGridDhtLocalPartition; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; + +/** + * Test for 6 retry cases + */ +public class RetryCauseMessageTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES_COUNT = 2; + + /** */ + private static final int ORG_COUNT = NODES_COUNT; + + /** */ + private static final int PERSON_PER_ORG_COUNT = 50; + /** */ + private static final String JOIN_SQL = "select * from Person, \"org\".Organization as org " + + "where Person.orgId = org.id " + + "and lower(org.name) = lower(?)"; + /** */ + private static final String ORG_SQL = "select * from Organization"; + /** */ + private static final String ORG = "org"; + /** */ + private IgniteCache personCache; + /** */ + private IgniteCache orgCache; + /** */ + private IgniteH2Indexing h2Idx; + + /** */ + @Override protected long getTestTimeout() { + return 600 * 1000; + } + + /** + * Failed to reserve partitions for query (cache is not found on local node) + */ + public void testCacheWasNotFoundMessage() { + GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", + new MockGridMapQueryExecutor(null) { + @Override public void onMessage(UUID nodeId, Object msg) { + if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) { + GridH2QueryRequest qryReq = (GridH2QueryRequest)msg; + qryReq.caches().add(Integer.MAX_VALUE); + startedExecutor.onMessage(nodeId, msg); + qryReq.caches().remove(qryReq.caches().size() - 1); + } + else + startedExecutor.onMessage(nodeId, msg); + + } + }.insertRealExecutor(mapQryExec)); + + SqlQuery qry = new SqlQuery(Person.class, JOIN_SQL).setArgs("Organization #0"); + qry.setDistributedJoins(true); + try { + personCache.query(qry).getAll(); + } + catch (CacheException e) { + assertTrue(e.getMessage().contains("Failed to reserve partitions for query (cache is not found on local node) [")); + return; + } + finally { + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec); + } + fail(); + } + + /** + * Failed to reserve partitions for query (group reservation failed) + */ + public void testGrpReservationFailureMessage() { + GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); + ConcurrentMap reservations = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "reservations"); + + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", + new MockGridMapQueryExecutor(null) { + @Override public void onMessage(UUID nodeId, Object msg) { + if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) { + final MapReservationKey grpKey = new MapReservationKey(ORG, null); + reservations.put(grpKey, new GridReservable() { + @Override public boolean reserve() { + return false; + } + @Override public void release() {} + + }); + } + startedExecutor.onMessage(nodeId, msg); + + } + }.insertRealExecutor(mapQryExec)); + + SqlQuery qry = new SqlQuery(Person.class, JOIN_SQL).setArgs("Organization #0"); + qry.setDistributedJoins(true); + try { + personCache.query(qry).getAll(); + } + catch (CacheException e) { + assertTrue(e.getMessage().contains("Failed to reserve partitions for query (group reservation failed) [")); + return; + } + finally { + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec); + } + fail(); + } + + /** + * Failed to reserve partitions for query (partition of REPLICATED cache is not in OWNING state) + */ + public void testReplicatedCacheReserveFailureMessage() { + GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); + final GridKernalContext ctx = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "ctx"); + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", + new MockGridMapQueryExecutor(null) { + @Override public void onMessage(UUID nodeId, Object msg) { + if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) { + GridH2QueryRequest qryReq = (GridH2QueryRequest)msg; + GridCacheContext cctx = ctx.cache().context().cacheContext(qryReq.caches().get(0)); + GridDhtLocalPartition part = cctx.topology().localPartition(0, NONE, false); + AtomicLong aState = GridTestUtils.getFieldValue(part, GridDhtLocalPartition.class, "state"); + long stateVal = aState.getAndSet(2); + startedExecutor.onMessage(nodeId, msg); + aState.getAndSet(stateVal); + } + else + startedExecutor.onMessage(nodeId, msg); + } + }.insertRealExecutor(mapQryExec)); + + SqlQuery qry = new SqlQuery<>(Organization.class, ORG_SQL); + qry.setDistributedJoins(true); + try { + orgCache.query(qry).getAll(); + } + catch (CacheException e) { + assertTrue(e.getMessage().contains("Failed to reserve partitions for query (partition of REPLICATED cache is not in OWNING state) [")); + return; + } + finally { + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec); + } + fail(); + } + + /** + * Failed to reserve partitions for query (partition of PARTITIONED cache cannot be reserved) + */ + public void testPartitionedCacheReserveFailureMessage() { + GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); + final GridKernalContext ctx = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "ctx"); + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", + new MockGridMapQueryExecutor(null) { + @Override public void onMessage(UUID nodeId, Object msg) { + if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) { + GridH2QueryRequest qryReq = (GridH2QueryRequest)msg; + GridCacheContext cctx = ctx.cache().context().cacheContext(qryReq.caches().get(0)); + GridDhtLocalPartition part = cctx.topology().localPartition(0, NONE, false); + AtomicLong aState = GridTestUtils.getFieldValue(part, GridDhtLocalPartition.class, "state"); + long stateVal = aState.getAndSet(2); + startedExecutor.onMessage(nodeId, msg); + aState.getAndSet(stateVal); + } + else + startedExecutor.onMessage(nodeId, msg); + + } + }.insertRealExecutor(mapQryExec)); + + SqlQuery qry = new SqlQuery(Person.class, JOIN_SQL).setArgs("Organization #0"); + qry.setDistributedJoins(true); + try { + personCache.query(qry).getAll(); + } + catch (CacheException e) { + assertTrue(e.getMessage().contains("Failed to reserve partitions for query (partition of PARTITIONED cache cannot be reserved) [")); + return; + } + finally { + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec); + } + fail(); + } + + /** + * Failed to reserve partitions for query (partition of PARTITIONED cache is not in OWNING state after reservation) + */ + public void testPartitionStateChangedMessage() { + GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); + ConcurrentMap reservations = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "reservations"); + final GridKernalContext ctx = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "ctx"); + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", + new MockGridMapQueryExecutor(null) { + @Override public void onMessage(UUID nodeId, Object msg) { + if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) { + GridH2QueryRequest qryReq = (GridH2QueryRequest)msg; + final MapReservationKey grpKey = new MapReservationKey(ORG, null); + reservations.put(grpKey, new GridReservable() { + @Override public boolean reserve() { + return true; + } + + @Override public void release() { + + } + }); + GridCacheContext cctx = ctx.cache().context().cacheContext(qryReq.caches().get(0)); + + if (GridDhtPartitionTopologyImpl.class.isAssignableFrom(cctx.topology().getClass())) { + GridDhtPartitionTopologyImpl tpg = (GridDhtPartitionTopologyImpl)(cctx.topology()); + AtomicReferenceArray locParts = GridTestUtils.getFieldValue(tpg, GridDhtPartitionTopologyImpl.class, "locParts"); + GridDhtLocalPartition part = locParts.get(0); + MockGridDhtLocalPartition mockPart = new MockGridDhtLocalPartition( + GridTestUtils.getFieldValue(part, GridDhtLocalPartition.class, "ctx"), + GridTestUtils.getFieldValue(part, GridDhtLocalPartition.class, "grp"), + part) { + volatile private boolean preReserved = true; + + @Override public boolean reserve() { + preReserved = false; + return super.reserve(); + } + + @Override public GridDhtPartitionState state() { + if (preReserved) + return super.state(); + else + return MOVING; + } + }; + locParts.set(0, mockPart); + startedExecutor.onMessage(nodeId, msg); + locParts.set(0, part); + } + else + startedExecutor.onMessage(nodeId, msg); + + } + else + startedExecutor.onMessage(nodeId, msg); + + } + }.insertRealExecutor(mapQryExec)); + + SqlQuery qry = new SqlQuery(Person.class, JOIN_SQL).setArgs("Organization #0"); + qry.setDistributedJoins(true); + try { + personCache.query(qry).getAll(); + } + catch (CacheException e) { + assertTrue(e.getMessage().contains("Failed to reserve partitions for query (partition of PARTITIONED cache is not in OWNING state after reservation) [")); + return; + } + finally { + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec); + } + fail(); + } + + /** + * Failed to execute non-collocated query (will retry) + */ + public void testNonCollocatedFailureMessage() { + GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); + ConcurrentMap reservations = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "reservations"); + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", + new MockGridMapQueryExecutor(null) { + @Override public void onMessage(UUID nodeId, Object msg) { + if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) { + final MapReservationKey grpKey = new MapReservationKey(ORG, null); + reservations.put(grpKey, new GridReservable() { + @Override public boolean reserve() { + throw new GridH2RetryException("test retry exception"); + } + + @Override public void release() { + + } + }); + } + startedExecutor.onMessage(nodeId, msg); + + } + }.insertRealExecutor(mapQryExec)); + + SqlQuery qry = new SqlQuery(Person.class, JOIN_SQL).setArgs("Organization #0"); + qry.setDistributedJoins(true); + try { + personCache.query(qry).getAll(); + } + catch (CacheException e) { + assertTrue(e.getMessage().contains("Failed to execute non-collocated query (will retry) [")); + return; + } + finally { + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec); + } + fail(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(false); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + disco.setForceServerMode(true); + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Ignite ignite = startGridsMultiThreaded(NODES_COUNT, false); + GridQueryProcessor qryProc = grid(ignite.name()).context().query(); + + h2Idx = GridTestUtils.getFieldValue(qryProc, GridQueryProcessor.class, "idx"); + + personCache = ignite(0).getOrCreateCache(new CacheConfiguration("pers") + .setIndexedTypes(String.class, Person.class)); + orgCache = ignite(0).getOrCreateCache(new CacheConfiguration(ORG) + .setCacheMode(CacheMode.REPLICATED) + .setIndexedTypes(String.class, Organization.class) + ); + + awaitPartitionMapExchange(); + + populateDataIntoOrg(orgCache); + populateDataIntoPerson(personCache); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Populate organization cache with test data + * @param cache @{IgniteCache} + */ + private void populateDataIntoOrg(IgniteCache cache) { + for (int i = 0; i < ORG_COUNT; i++) { + Organization org = new Organization(); + org.setId("org" + i); + org.setName("Organization #" + i); + cache.put(org.getId(), org); + } + } + + /** + * Populate person cache with test data + * @param cache @{IgniteCache} + */ + private void populateDataIntoPerson(IgniteCache cache) { + int personId = 0; + for (int i = 0; i < ORG_COUNT; i++) { + Organization org = new Organization(); + org.setId("org" + i); + org.setName("Organization #" + i); + + for (int j = 0; j < PERSON_PER_ORG_COUNT; j++) { + Person prsn = new Person(); + prsn.setId("pers" + personId); + prsn.setOrgId(org.getId()); + prsn.setName("Person name #" + personId); + + cache.put(prsn.getId(), prsn); + + personId++; + } + } + } + + /** + * + */ + private static class Person { + /** */ + @QuerySqlField(index = true) + private String id; + + /** */ + @QuerySqlField(index = true) + private String orgId; + + /** */ + @QuerySqlField(index = true) + private String name; + + /** */ + public String getId() { + return id; + } + + /** */ + public void setId(String id) { + this.id = id; + } + + /** */ + public String getOrgId() { + return orgId; + } + + /** */ + public void setOrgId(String orgId) { + this.orgId = orgId; + } + + /** */ + public String getName() { + return name; + } + + /** */ + public void setName(String name) { + this.name = name; + } + } + + /** + * + */ + private static class Organization { + /** */ + @QuerySqlField(index = true) + private String id; + + /** */ + @QuerySqlField(index = true) + private String name; + + /** */ + public void setId(String id) { + this.id = id; + } + + /** */ + public String getId() { + return id; + } + + /** */ + public String getName() { + return name; + } + + /** */ + public void setName(String name) { + this.name = name; + } + } + + /** + * Wrapper around @{GridMapQueryExecutor} + */ + private abstract static class MockGridMapQueryExecutor extends GridMapQueryExecutor { + + /** + * Wrapped executor + */ + GridMapQueryExecutor startedExecutor; + + /** */ + MockGridMapQueryExecutor insertRealExecutor(GridMapQueryExecutor realExecutor) { + this.startedExecutor = realExecutor; + return this; + } + + /** + * @param busyLock Busy lock. + */ + MockGridMapQueryExecutor(GridSpinBusyLock busyLock) { + super(busyLock); + } + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + + startedExecutor.onMessage(nodeId, msg); + } + + /** {@inheritDoc} */ + @Override public void cancelLazyWorkers() { + + startedExecutor.cancelLazyWorkers(); + } + + /** {@inheritDoc} */ + @Override GridSpinBusyLock busyLock() { + + return startedExecutor.busyLock(); + + } + + /** {@inheritDoc} */ + @Override public void onCacheStop(String cacheName) { + + startedExecutor.onCacheStop(cacheName); + } + + /** {@inheritDoc} */ + @Override public void stopAndUnregisterCurrentLazyWorker() { + startedExecutor.stopAndUnregisterCurrentLazyWorker(); + } + + /** {@inheritDoc} */ + @Override public void unregisterLazyWorker(MapQueryLazyWorker worker) { + startedExecutor.unregisterLazyWorker(worker); + } + + /** {@inheritDoc} */ + @Override public int registeredLazyWorkers() { + + return startedExecutor.registeredLazyWorkers(); + } + } + +} Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java (revision d04d76440ce86873de7aebc8c03d39484cd1e3e5) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java (date 1534156032094) @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -296,10 +297,10 @@ * @param reserved Reserved list. * @param nodeId Node ID. * @param reqId Request ID. - * @return {@code true} If all the needed partitions successfully reserved. + * @return Optional which is empty in case of success or with causeMessage if failed * @throws IgniteCheckedException If failed. */ - private boolean reservePartitions( + private Optional reservePartitions( @Nullable List cacheIds, AffinityTopologyVersion topVer, final int[] explicitParts, @@ -310,7 +311,7 @@ assert topVer != null; if (F.isEmpty(cacheIds)) - return true; + return Optional.empty(); Collection partIds = wrap(explicitParts); @@ -319,11 +320,10 @@ // Cache was not found, probably was not deployed yet. if (cctx == null) { - logRetry("Failed to reserve partitions for query (cache is not found on local node) [" + - "rmtNodeId=" + nodeId + ", reqId=" + reqId + ", affTopVer=" + topVer + ", cacheId=" + - cacheIds.get(i) + "]"); - - return false; + final Optional rslt = Optional.of(String.format("Failed to reserve partitions for query (cache is not found on local node) [" + + "rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]", nodeId, reqId, topVer, cacheIds.get(i))); + logRetry(rslt.get()); + return rslt; } if (cctx.isLocal() || !cctx.rebalanceEnabled()) @@ -337,11 +337,10 @@ if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits. if (r != MapReplicatedReservation.INSTANCE) { if (!r.reserve()) { - logRetry("Failed to reserve partitions for query (group reservation failed) [" + - "rmtNodeId=" + nodeId + ", reqId=" + reqId + ", affTopVer=" + topVer + - ", cacheId=" + cacheIds.get(i) + ", cacheName=" + cctx.name() + "]"); - - return false; // We need explicit partitions here -> retry. + final Optional rslt = Optional.of(String.format("Failed to reserve partitions for query (group reservation failed) [" + + "rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, cacheName=%s]", nodeId, reqId, topVer, cacheIds.get(i), cctx.name())); + logRetry(rslt.get()); + return rslt; } reserved.add(r); @@ -359,13 +358,20 @@ GridDhtPartitionState partState = part != null ? part.state() : null; if (partState != OWNING) { - logRetry("Failed to reserve partitions for query (partition of " + - "REPLICATED cache is not in OWNING state) [rmtNodeId=" + nodeId + - ", reqId=" + reqId + ", affTopVer=" + topVer + ", cacheId=" + cacheIds.get(i) + - ", cacheName=" + cctx.name() + ", part=" + p + ", partFound=" + (part != null) + - ", partState=" + partState + "]"); - - return false; + final Optional rslt = Optional.of(String.format("Failed to reserve partitions for query " + + "(partition of REPLICATED cache is not in OWNING state) [" + + "rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, cacheName=%s, part=%s, partFound=%s, partState=%s]", + nodeId, + reqId, + topVer, + cacheIds.get(i), + cctx.name(), + p, + (part != null), + partState + )); + logRetry(rslt.get()); + return rslt; } } @@ -383,13 +389,20 @@ GridDhtPartitionState partState = part != null ? part.state() : null; if (partState != OWNING || !part.reserve()) { - logRetry("Failed to reserve partitions for query (partition of " + - "PARTITIONED cache cannot be reserved) [rmtNodeId=" + nodeId + ", reqId=" + reqId + - ", affTopVer=" + topVer + ", cacheId=" + cacheIds.get(i) + - ", cacheName=" + cctx.name() + ", part=" + partId + ", partFound=" + (part != null) + - ", partState=" + partState + "]"); - - return false; + final Optional rslt = Optional.of(String.format("Failed to reserve partitions for query " + + "(partition of PARTITIONED cache cannot be reserved) [" + + "rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, cacheName=%s, part=%s, partFound=%s, partState=%s]", + nodeId, + reqId, + topVer, + cacheIds.get(i), + cctx.name(), + partId, + (part != null), + partState + )); + logRetry(rslt.get()); + return rslt; } reserved.add(part); @@ -398,12 +411,19 @@ partState = part.state(); if (part.state() != OWNING) { - logRetry("Failed to reserve partitions for query (partition of " + - "PARTITIONED cache is not in OWNING state after reservation) [rmtNodeId=" + nodeId + - ", reqId=" + reqId + ", affTopVer=" + topVer + ", cacheId=" + cacheIds.get(i) + - ", cacheName=" + cctx.name() + ", part=" + partId + ", partState=" + partState + "]"); - - return false; + final Optional rslt = Optional.of(String.format("Failed to reserve partitions for query " + + "(partition of PARTITIONED cache is not in OWNING state after reservation) [" + + "rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, cacheName=%s, part=%s, partState=%s]", + nodeId, + reqId, + topVer, + cacheIds.get(i), + cctx.name(), + partId, + partState + )); + logRetry(rslt.get()); + return rslt; } } @@ -426,7 +446,7 @@ } } - return true; + return Optional.empty(); } /** @@ -673,12 +693,13 @@ try { if (topVer != null) { // Reserve primary for topology version or explicit partitions. - if (!reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId)) { + Optional reservationError = reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId); + if (reservationError.isPresent()) { // Unregister lazy worker because re-try may never reach this node again. if (lazy) stopAndUnregisterCurrentLazyWorker(); - sendRetry(node, reqId, segmentId); + sendRetry(node, reqId, segmentId, reservationError.get()); return; } @@ -793,10 +814,9 @@ GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class); if (retryErr != null) { - logRetry("Failed to execute non-collocated query (will retry) [nodeId=" + node.id() + - ", reqId=" + reqId + ", errMsg=" + retryErr.getMessage() + ']'); - - sendRetry(node, reqId, segmentId); + final String retryCause = String.format("Failed to execute non-collocated query (will retry) [nodeId=%s, reqId=%s, errMsg=%s]",node.id(),reqId,retryErr.getMessage()); + logRetry(retryCause); + sendRetry(node, reqId, segmentId,retryCause); } else { U.error(log, "Failed to execute local query.", e); @@ -845,7 +865,7 @@ List reserved = new ArrayList<>(); - if (!reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId)) { + if (reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId).isPresent()) { U.error(log, "Failed to reserve partitions for DML request. [localNodeId=" + ctx.localNodeId() + ", nodeId=" + node.id() + ", reqId=" + req.requestId() + ", cacheIds=" + cacheIds + ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']'); @@ -1081,7 +1101,7 @@ * @param reqId Request ID. * @param segmentId Index segment ID. */ - private void sendRetry(ClusterNode node, long reqId, int segmentId) { + private void sendRetry(ClusterNode node, long reqId, int segmentId, String retryCause) { try { boolean loc = node.isLocal(); @@ -1092,6 +1112,7 @@ false); msg.retry(h2.readyTopologyVersion()); + msg.retryCause(retryCause); if (loc) h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java (revision d04d76440ce86873de7aebc8c03d39484cd1e3e5) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java (date 1534155635124) @@ -67,6 +67,9 @@ /** */ private AffinityTopologyVersion retry; + /** Retry cause description*/ + private String retryCause; + /** Last page flag. */ private boolean last; @@ -228,6 +231,12 @@ case 8: if (!writer.writeBoolean("last", last)) return false; + + writer.incrementState(); + + case 9: + if (!writer.writeString("retryCause", retryCause)) + return false; writer.incrementState(); } @@ -313,6 +322,14 @@ if (!reader.isLastRead()) return false; + reader.incrementState(); + + case 9: + retryCause = reader.readString("retryCause"); + + if (!reader.isLastRead()) + return false; + reader.incrementState(); } @@ -326,7 +343,7 @@ /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 9; + return 10; } /** @@ -343,6 +360,18 @@ this.retry = retry; } + /** + * @return Retry Ccause message. + */ + public String retryCause() { return retryCause; } + + /** + * @param retryCause Retry Ccause message. + */ + public void retryCause(String retryCause){ + this.retryCause = retryCause; + } + /** * @return Last page flag. */ Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java (revision d04d76440ce86873de7aebc8c03d39484cd1e3e5) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java (date 1534155635124) @@ -17,10 +17,12 @@ package org.apache.ignite.internal.processors.query.h2.twostep; +import java.util.Optional; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; +import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; import org.h2.jdbc.JdbcConnection; import org.jetbrains.annotations.Nullable; @@ -56,6 +58,9 @@ /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */ private final AtomicReference state = new AtomicReference<>(); + /** Holder of root cause description*/ + private final AtomicReference rootCause = new AtomicReference<>(); + /** * Constructor. * @@ -99,6 +104,16 @@ idx.fail(nodeId, e); } + /** + * @param msg corresponding response message + * @param nodeId Node ID. + */ + void stateWithMsg(GridQueryNextPageResponse msg, @Nullable UUID nodeId) { + assert msg != null; + rootCause.compareAndSet(null, msg.retryCause()); + state(msg.retry(), nodeId); + } + /** * @param e Error. */ @@ -134,6 +149,11 @@ return state.get(); } + /** + * @return Root Cause. + */ + Optional rootCause() { return Optional.ofNullable(rootCause.get()); } + /** * @return Indexes. */ Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java (revision d04d76440ce86873de7aebc8c03d39484cd1e3e5) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java (date 1534155635124) @@ -31,6 +31,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -38,6 +39,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.cache.CacheException; @@ -289,7 +291,7 @@ * @param node Node. * @param msg Message. */ - private void onNextPage(final ClusterNode node, GridQueryNextPageResponse msg) { + private void onNextPage(final ClusterNode node, final GridQueryNextPageResponse msg) { final long qryReqId = msg.queryRequestId(); final int qry = msg.query(); final int seg = msg.segmentId(); @@ -316,7 +318,8 @@ if (err0 != null && err0.getCause() instanceof IgniteClientDisconnectedException) throw err0; - CacheException e = new CacheException("Failed to fetch data from node: " + node.id()); + CacheException e = new CacheException( + (msg.retryCause()!=null) ? msg.retryCause() : "Failed to fetch data from node: " + node.id()); if (err0 != null) e.addSuppressed(err0); @@ -349,7 +352,7 @@ idx.addPage(page); if (msg.retry() != null) - retry(r, msg.retry(), node.id()); + r.stateWithMsg(msg, node.id()); else if (msg.page() == 0) // Do count down on each first page received. r.latch().countDown(); } @@ -572,10 +575,12 @@ final long startTime = U.currentTimeMillis(); + final AtomicReference rootCause = new AtomicReference<>(); + for (int attempt = 0;; attempt++) { - if (attempt > 0 && retryTimeout > 0 && (U.currentTimeMillis() - startTime > retryTimeout)) - throw new CacheException("Failed to map SQL query to topology."); - + if (attempt > 0 && retryTimeout > 0 && (U.currentTimeMillis() - startTime > retryTimeout)) { + throw new CacheException(Optional.ofNullable(rootCause.get()).orElse("Failed to map SQL query to topology.")); + } if (attempt != 0) { try { Thread.sleep(attempt * 10); // Wait for exchange. @@ -843,6 +848,8 @@ } if (retry) { + assert r != null; + r.rootCause().ifPresent( val -> rootCause.compareAndSet(null, val)); if (Thread.currentThread().isInterrupted()) throw new IgniteInterruptedCheckedException("Query was interrupted."); Index: modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/MockGridDhtLocalPartition.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/MockGridDhtLocalPartition.java (date 1534155635134) +++ modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/MockGridDhtLocalPartition.java (date 1534155635134) @@ -0,0 +1,310 @@ +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.lang.IgniteInClosure; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Wrapper around GridDhtLocalPartition to be extended in test cases + */ +public abstract class MockGridDhtLocalPartition extends GridDhtLocalPartition{ + /** + * fake id generator in order to bypass construction stage failure (if invoked with real id) + */ + private static AtomicInteger cntr = new AtomicInteger(Integer.MAX_VALUE); + + /** + * Real object + */ + private GridDhtLocalPartition internal; + + /** + * @param ctx Context. + * @param grp Cache group. + * @param id Partition ID. + */ + private MockGridDhtLocalPartition(GridCacheSharedContext ctx, + CacheGroupContext grp, int id) { + super(ctx, grp, id); + } + + /** */ + protected MockGridDhtLocalPartition(GridCacheSharedContext ctx, + CacheGroupContext grp, GridDhtLocalPartition internal){ + this(ctx, grp, cntr.getAndDecrement()); + this.internal = internal; + } + + /** */ + protected GridDhtLocalPartition getInternal(){ + return internal; + } + + /** {@inheritDoc} */ + @Override public int internalSize() { + return internal.internalSize(); + } + + /** {@inheritDoc} */ + @Override protected CacheMapHolder entriesMap(GridCacheContext cctx) { + return internal.entriesMap(cctx); + } + + /** {@inheritDoc} */ + @Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer cacheId) { + return internal.entriesMapIfExists(cacheId); + } + + /** {@inheritDoc} */ + @Override public IgniteCacheOffheapManager.CacheDataStore dataStore() { + return internal.dataStore(); + } + + /** {@inheritDoc} */ + @Override public boolean addReservation(GridDhtPartitionsReservation r) { + return internal.addReservation(r); + } + + /** {@inheritDoc} */ + @Override public void removeReservation(GridDhtPartitionsReservation r) { + internal.removeReservation(r); + } + + /** {@inheritDoc} */ + @Override public int id() { + return internal.id(); + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionState state() { + return internal.state(); + } + + /** {@inheritDoc} */ + @Override public int reservations() { + return internal.reservations(); + } + + /** {@inheritDoc} */ + @Override public boolean isEmpty() { + return internal.isEmpty(); + } + + /** {@inheritDoc} */ + @Override public boolean valid() { + return internal.valid(); + } + + /** {@inheritDoc} */ + @Override public void cleanupRemoveQueue() { + internal.cleanupRemoveQueue(); + } + + /** {@inheritDoc} */ + @Override public void onDeferredDelete(int cacheId, KeyCacheObject key, GridCacheVersion ver) { + internal.onDeferredDelete(cacheId,key,ver); + } + + /** {@inheritDoc} */ + @Override public void lock() { + internal.lock(); + } + + /** {@inheritDoc} */ + @Override public void unlock() { + internal.unlock(); + } + + /** {@inheritDoc} */ + @Override public boolean reserve() { + return internal.reserve(); + } + + /** {@inheritDoc} */ + @Override public void release() { + internal.release(); + } + + /** {@inheritDoc} */ + @Override protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) { + internal.release(); + } + + /** {@inheritDoc} */ + @Override public void restoreState(GridDhtPartitionState stateToRestore) { + internal.restoreState(stateToRestore); + } + + /** {@inheritDoc} */ + @Override public void moving() { + + internal.moving(); + } + + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture rent(boolean updateSeq) { + return internal.rent(updateSeq); + } + + /** {@inheritDoc} */ + @Override public void clearAsync() { + internal.clearAsync(); + } + + /** {@inheritDoc} */ + @Override public boolean markForDestroy() { + return internal.markForDestroy(); + } + + /** {@inheritDoc} */ + @Override public void destroy() { + internal.destroy(); + } + + + /** {@inheritDoc} */ + @Override public void awaitDestroy() { + internal.awaitDestroy(); + } + + + /** {@inheritDoc} */ + @Override public void onClearFinished(IgniteInClosure> lsnr) { + internal.onClearFinished(lsnr); + } + + + /** {@inheritDoc} */ + @Override public boolean isClearing() { + return internal.isClearing(); + } + + + /** {@inheritDoc} */ + @Override public boolean tryClear(EvictionContext evictionCtx) throws NodeStoppingException { + return internal.tryClear(evictionCtx); + } + + /** {@inheritDoc} */ + @Override public boolean primary(AffinityTopologyVersion topVer) { + return internal.primary(topVer); + } + + + /** {@inheritDoc} */ + @Override public boolean backup(AffinityTopologyVersion topVer) { + return internal.backup(topVer); + } + + /** {@inheritDoc} */ + @Override public long initialUpdateCounter() { + return internal.initialUpdateCounter(); + } + + + /** {@inheritDoc} */ + @Override public void updateCounter(long val) { + + internal.updateCounter(val); + } + + + /** {@inheritDoc} */ + @Override public void initialUpdateCounter(long val) { + + internal.initialUpdateCounter(val); + } + + + /** {@inheritDoc} */ + @Override public long fullSize() { + + return internal.fullSize(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + + return internal.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + + return internal.equals(obj); + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull GridDhtLocalPartition part) { + return internal.compareTo(part); + } + + /** {@inheritDoc} */ + @Override public String toString() { + + return internal.toString(); + } + + /** {@inheritDoc} */ + @Override public int publicSize(int cacheId) { + + return internal.publicSize(cacheId); + } + + /** {@inheritDoc} */ + @Override public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { + internal.incrementPublicSize(hld,e); + } + + /** {@inheritDoc} */ + @Override public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { + internal.decrementPublicSize(hld,e); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key) { + return internal.getEntry(ctx,key); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent( + GridCacheContext ctx, + final AffinityTopologyVersion topVer, + KeyCacheObject key, + final boolean create, + final boolean touch) { + return internal.putEntryIfObsoleteOrAbsent(ctx, topVer, key, create, touch); + } + + /** {@inheritDoc} */ + @Override public boolean removeEntry(final GridCacheEntryEx entry) { + + return internal.removeEntry(entry); + } + + /** {@inheritDoc} */ + @Override public Collection entries(int cacheId, final CacheEntryPredicate... filter) { + return internal.entries(cacheId, filter); + } + + /** {@inheritDoc} */ + @Override public Set entrySet(int cacheId, final CacheEntryPredicate... filter) { + return internal.entrySet(cacheId, filter); + } +}