diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 00ab49a1914..ec34587751f 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -963,6 +963,10 @@ public final class IgniteSystemProperties { */ public static final String IGNITE_EVICTION_PERMITS = "IGNITE_EVICTION_PERMITS"; + /** + * Proper partitions co-location check is enabled + */ + public static final String IGNITE_PARTITIONS_CO_LOCATION_CHECK_ENABLED = "IGNITE_PARTITIONS_CO_LOCATION_CHECK_ENABLED"; /** * Enforces singleton. */ diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityDependsOnPreviousState.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityDependsOnPreviousState.java new file mode 100644 index 00000000000..d33125a36e2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityDependsOnPreviousState.java @@ -0,0 +1,15 @@ +package org.apache.ignite.cache.affinity; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation marker which identifies affinity function that requires previous affinity state + * in order to calculate new one. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface AffinityDependsOnPreviousState { +} diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 2f894e0e241..09d52934b11 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -47,6 +47,8 @@ import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.affinity.AffinityDependsOnPreviousState; +import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; @@ -135,6 +137,9 @@ public class GridReduceQueryExecutor { /** */ private static final Set UNMAPPED_PARTS = Collections.emptySet(); + /** */ + private static final boolean CHECK_CO_LOCATION = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_PARTITIONS_CO_LOCATION_CHECK_ENABLED); + /** */ private GridKernalContext ctx; @@ -570,6 +575,9 @@ public class GridReduceQueryExecutor { final int[] parts, boolean lazy, MvccQueryTracker mvccTracker) { + if (CHECK_CO_LOCATION) + checkCorrectCoLocation(qry); //throws exception if not valid + assert !qry.mvccEnabled() || mvccTracker != null; if (F.isEmpty(params)) @@ -801,24 +809,6 @@ public class GridReduceQueryExecutor { boolean retry = false; - // Always enforce join order on map side to have consistent behavior. - int flags = GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER; - - if (distributedJoins) - flags |= GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS; - - if (qry.isLocal()) - flags |= GridH2QueryRequest.FLAG_IS_LOCAL; - - if (qry.explain()) - flags |= GridH2QueryRequest.FLAG_EXPLAIN; - - if (isReplicatedOnly) - flags |= GridH2QueryRequest.FLAG_REPLICATED; - - if (lazy && mapQrys.size() == 1) - flags |= GridH2QueryRequest.FLAG_LAZY; - GridH2QueryRequest req = new GridH2QueryRequest() .requestId(qryReqId) .topologyVersion(topVer) @@ -828,7 +818,7 @@ public class GridReduceQueryExecutor { .partitions(convert(partsMap)) .queries(mapQrys) .parameters(params) - .flags(flags) + .flags(prepareFlags(qry, lazy, mapQrys.size())) .timeout(timeoutMillis) .schemaName(schemaName); @@ -874,28 +864,7 @@ public class GridReduceQueryExecutor { if (send(nodes, req, spec, false)) { awaitAllReplies(r, nodes, cancel); - if (r.hasErrorOrRetry()) { - CacheException err = r.exception(); - - if (err != null) { - if (err.getCause() instanceof IgniteClientDisconnectedException) - throw err; - - if (wasCancelled(err)) - throw new QueryCancelledException(); // Throw correct exception. - - throw new CacheException("Failed to run map query remotely: " + err.getMessage(), err); - } - else { - retry = true; - - // On-the-fly topology change must not be possible in FOR UPDATE case. - assert sfuFut == null; - - // If remote node asks us to retry then we have outdated full partition map. - h2.awaitForReadyTopologyVersion(r.retryTopologyVersion()); - } - } + retry = analyseCurrentRun(r, sfuFut); } else // Send failed. retry = true; @@ -1015,6 +984,110 @@ public class GridReduceQueryExecutor { } } + /** */ + private void checkCorrectCoLocation(GridCacheTwoStepQuery qry) { + if (qry.distributedJoins()) + return; + + Iterator it = qry.cacheIds().iterator(); + + if (!it.hasNext()) + return; + + int grpId=0; + + AffinityFunction af=null; + + while(it.hasNext()){ + GridCacheContext cctx = cacheContext(it.next()); + + if (!cctx.isReplicated()){ + grpId=cctx.groupId(); + if (U.hasAnnotation(cctx.config().getAffinity(), AffinityDependsOnPreviousState.class)) + throw new CacheException(String.format("Partitioned cache uses stateful affinity function [qry=%s, grpId=%d]", + qry.originalSql(), + grpId + )); + af = cctx.config().getAffinity(); + break; + } + } + + if (af != null) // at least one partitioned cache group + while(it.hasNext()){ + GridCacheContext cctx = cacheContext(it.next()); + + if(!cctx.isReplicated() && cctx.groupId()!=grpId && !af.equals(cctx.config().getAffinity())) + throw new CacheException(String.format("Query has cache groups with different affinity functions " + + "[qry=%s, grpId=%d, other grpId=%d]", + qry.originalSql(), + grpId, + cctx.groupId() + )); + } + } + + /** + * Analyse reduce query run to decide if retry is required + * @param r reduce query run to be analysed + * @return true if retry is required, false otherwise + * @throws IgniteCheckedException in case of reduce query run contains exception record + */ + private boolean analyseCurrentRun(ReduceQueryRun r, GridNearTxSelectForUpdateFuture sfuFut) throws IgniteCheckedException { + if (r.hasErrorOrRetry()) { + CacheException err = r.exception(); + + if (err != null) { + if (err.getCause() instanceof IgniteClientDisconnectedException) + throw err; + + if (wasCancelled(err)) + throw new QueryCancelledException(); // Throw correct exception. + + throw new CacheException("Failed to run map query remotely: " + err.getMessage(), err); + } + else { + // On-the-fly topology change must not be possible in FOR UPDATE case. + assert sfuFut == null; + + // If remote node asks us to retry then we have outdated full partition map. + h2.awaitForReadyTopologyVersion(r.retryTopologyVersion()); + + return true; + } + } + return false; + } + + /** + * Builds flag out of parameters + * @param qry query parameter holder + * @param lazy if lazy execution + * @param mapQrysSize number of queries + * @return flag + */ + private int prepareFlags(GridCacheTwoStepQuery qry, boolean lazy, int mapQrysSize) { + // Always enforce join order on map side to have consistent behavior. + int flags = GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER; + + if (qry.distributedJoins()) + flags |= GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS; + + if (qry.isLocal()) + flags |= GridH2QueryRequest.FLAG_IS_LOCAL; + + if (qry.explain()) + flags |= GridH2QueryRequest.FLAG_EXPLAIN; + + if (qry.isReplicatedOnly()) + flags |= GridH2QueryRequest.FLAG_REPLICATED; + + if (lazy && mapQrysSize == 1) + flags |= GridH2QueryRequest.FLAG_LAZY; + + return flags; + } + /** * * @param schemaName Schema name. diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinSqlTestHelper.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinSqlTestHelper.java index fe7821ae297..5175fa4fa2f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinSqlTestHelper.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinSqlTestHelper.java @@ -25,10 +25,10 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField; */ public class JoinSqlTestHelper { /** */ - private static final int ORG_COUNT = 100; + static final int ORG_COUNT = 100; /** */ - private static final int PERSON_PER_ORG_COUNT = 10; + static final int PERSON_PER_ORG_COUNT = 10; /** */ static final String JOIN_SQL = "select * from Person, \"org\".Organization as org " + diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/QueryColocationCheckSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/QueryColocationCheckSelfTest.java new file mode 100644 index 00000000000..a8b2ee23f54 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/QueryColocationCheckSelfTest.java @@ -0,0 +1,412 @@ +package org.apache.ignite.internal.processors.query.h2.twostep; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import javax.cache.Cache; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.AffinityDependsOnPreviousState; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** */ +public class QueryColocationCheckSelfTest extends GridCommonAbstractTest { + /** */ + public static final String REPL_SQL = "select * from Organization"; + + /** */ + public static final String TWO_STATELESS_SQL = "select * from Location, " + + "\"subs\".SubOrganization as sub, \"org\".Organization as org " + + " where Location.orgId=sub.id AND sub.orgId=org.id AND lower(org.name) = lower(?)"; + + /** */ + public static final String STATEFUL_SQL = "select * from Kid, \"org\".Organization as org, " + + "\"pers\".Person as per where Kid.parentId=per.id AND per.orgId = org.id AND lower(org.name) = lower(?)"; + + /** */ + public static final String TWO_DIFFERENT_SQL = "select * from Person, \"subs\".SubOrganization as sub " + + "where Person.orgId = sub.orgId AND lower(sub.name) = lower(?)"; + + /** */ + private static final int KID_PER_PERSON_COUNT = 5; + + /** */ + private static final int SUBS_PER_ORGANISATION_COUNT = 10; + + /** */ + private static final int NODES_COUNT = 1; + + /** */ + private static final String ORG = "org"; + + /** */ + private static IgniteCache personCache; + + /** */ + private static IgniteCache orgCache; + + /** */ + private static IgniteCache kidCache; + + /** */ + private static IgniteCache subOrgCache; + + /** */ + private static IgniteCache locationCache; + + /** */ + public void testPassDistributedJoins(){ + SqlQuery qry = new SqlQuery( + JoinSqlTestHelper.Person.class, TWO_DIFFERENT_SQL).setArgs("sub1"); + + qry.setDistributedJoins(true); + + List> prsns = personCache.query(qry).getAll(); + + assertNotNull(prsns); + } + + /** */ + public void testPassAllReplicated(){ + SqlQuery qry = new SqlQuery( + JoinSqlTestHelper.Organization.class, REPL_SQL); + + List> orgs = orgCache.query(qry).getAll(); + + assertNotNull(orgs); + } + + /** */ + public void testPassReplicatedAndOnePartitioned(){ + SqlQuery qry = new SqlQuery( + JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0"); + + List> prsns = personCache.query(qry).getAll(); + + assertNotNull(prsns); + } + + /** */ + public void testPassReplicatedAndTwoStatelessPartitioned(){ + SqlQuery qry = new SqlQuery( + Location.class, TWO_STATELESS_SQL).setArgs("Organization #0"); + + List> locs = locationCache.query(qry).getAll(); + + assertNotNull(locs); + } + + public void testFailReplicatedAndStatefulPartitioned(){ + SqlQuery qry = new SqlQuery( + Kid.class, STATEFUL_SQL).setArgs("Organization #0"); + + try { + List> kids = kidCache.query(qry).getAll(); + + fail("No exceptions are emitted. Check property is set to "+IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_PARTITIONS_CO_LOCATION_CHECK_ENABLED)); + }catch (Exception e){ + log().error("Caught exception", e); + + assertTrue(e.getMessage(), e.getMessage().contains("Partitioned cache uses stateful affinity function [")); + } + } + + public void testFailReplicatedAndTwoDifferentAffinityPartitioned(){ + SqlQuery qry = new SqlQuery( + JoinSqlTestHelper.Person.class, TWO_DIFFERENT_SQL).setArgs("sub1"); + + try { + List> prsns = personCache.query(qry).getAll(); + + fail("No exceptions are emitted. Check property is set to "+IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_PARTITIONS_CO_LOCATION_CHECK_ENABLED)); + }catch (Exception e){ + log().error("Caught exception", e); + + assertTrue(e.getMessage(), e.getMessage().contains("Query has cache groups with different affinity functions [")); + } + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(NODES_COUNT, false); + + final AffinityFunction statefulAffinity = new TestAffinityFunction(); + + final AffinityFunction statelessAffinity = new RendezvousAffinityFunction(); + + personCache = ignite(0).getOrCreateCache(new CacheConfiguration("pers") + .setIndexedTypes(String.class, JoinSqlTestHelper.Person.class).setAffinity(new RendezvousAffinityFunction()) + ); + + orgCache = ignite(0).getOrCreateCache(new CacheConfiguration(ORG) + .setCacheMode(CacheMode.REPLICATED) + .setIndexedTypes(String.class, JoinSqlTestHelper.Organization.class) + ); + + kidCache = ignite(0).getOrCreateCache(new CacheConfiguration("kids") + .setIndexedTypes(String.class, Kid.class).setAffinity(statefulAffinity) + ); + + subOrgCache = ignite(0).getOrCreateCache(new CacheConfiguration("subs") + .setIndexedTypes(String.class, SubOrganization.class).setAffinity(statelessAffinity) + ); + + locationCache = ignite(0).getOrCreateCache(new CacheConfiguration("locs") + .setIndexedTypes(String.class, Location.class).setAffinity(statelessAffinity) + ); + awaitPartitionMapExchange(); + + JoinSqlTestHelper.populateDataIntoOrg(orgCache); + JoinSqlTestHelper.populateDataIntoPerson(personCache); + populateDataIntoKid(kidCache); + populateDataIntoSub(subOrgCache); + populateDataIntoLocation(locationCache); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + // Give some time to clean up. + try { + Thread.sleep(3000); + } + catch (InterruptedException e) { + // no op + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** */ + private void populateDataIntoKid(IgniteCache cache) { + for(int i=0;i cache) { + for(int i=0;i cache) { + for(int i=0;i> assignPartitions(AffinityFunctionContext affCtx) { + List> res = new ArrayList<>(); + + res.add(nodes(0, affCtx.currentTopologySnapshot())); + + return res; + } + + /** {@inheritDoc} */ + public List nodes(int part, Collection nodes) { + return new ArrayList<>(nodes); + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + // No-op. + } + } + + /** */ + public class Kid { + /** */ + @QuerySqlField(index = true) + private String id; + + /** */ + @QuerySqlField(index = true) + private String parentId; + + /** */ + @QuerySqlField(index = true) + private String name; + + /** */ + public String getId() { + return id; + } + + /** */ + public void setId(String id) { + this.id = id; + } + + /** */ + public String getparentId() { + return parentId; + } + + /** */ + public void setParentId(String parentId) { + this.parentId = parentId; + } + + /** */ + public String getName() { + return name; + } + + /** */ + public void setName(String name) { + this.name = name; + } + } + + /** */ + public class SubOrganization { + /** */ + @QuerySqlField(index = true) + private String id; + + /** */ + @QuerySqlField(index = true) + private String orgId; + + /** */ + @QuerySqlField(index = true) + private String name; + + /** */ + public String getId() { + return id; + } + + /** */ + public void setId(String id) { + this.id = id; + } + + /** */ + public String getOrgId() { + return orgId; + } + + /** */ + public void setOrgId(String orgId) { + this.orgId = orgId; + } + + /** */ + public String getName() { + return name; + } + + /** */ + public void setName(String name) { + this.name = name; + } + } + + /** */ + public class Location { + /** */ + @QuerySqlField(index = true) + private String id; + + /** */ + @QuerySqlField(index = true) + private String orgId; + + /** */ + @QuerySqlField(index = true) + private String name; + + /** */ + public String getId() { + return id; + } + + /** */ + public void setId(String id) { + this.id = id; + } + + /** */ + public String getOrgId() { + return orgId; + } + + /** */ + public void setOrgId(String orgId) { + this.orgId = orgId; + } + + /** */ + public String getName() { + return name; + } + + /** */ + public void setName(String name) { + this.name = name; + } + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java index 536834cda93..73a29e79aec 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.processors.cache.CacheScanPartitionQueryFallbackSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheCrossCacheJoinRandomTest; import org.apache.ignite.internal.processors.cache.IgniteCacheObjectKeyIndexingSelfTest; @@ -54,6 +55,7 @@ import org.apache.ignite.internal.processors.query.h2.twostep.CacheQueryMemoryLe import org.apache.ignite.internal.processors.query.h2.twostep.DisappearedCacheCauseRetryMessageSelfTest; import org.apache.ignite.internal.processors.query.h2.twostep.DisappearedCacheWasNotFoundMessageSelfTest; import org.apache.ignite.internal.processors.query.h2.twostep.NonCollocatedRetryMessageSelfTest; +import org.apache.ignite.internal.processors.query.h2.twostep.QueryColocationCheckSelfTest; import org.apache.ignite.internal.processors.query.h2.twostep.RetryCauseMessageSelfTest; import org.apache.ignite.testframework.IgniteTestSuite; @@ -115,10 +117,13 @@ public class IgniteCacheQuerySelfTestSuite2 extends TestSuite { suite.addTestSuite(CacheQueryMemoryLeakTest.class); suite.addTestSuite(NonCollocatedRetryMessageSelfTest.class); + suite.addTestSuite(QueryColocationCheckSelfTest.class); suite.addTestSuite(RetryCauseMessageSelfTest.class); suite.addTestSuite(DisappearedCacheCauseRetryMessageSelfTest.class); suite.addTestSuite(DisappearedCacheWasNotFoundMessageSelfTest.class); + System.setProperty(IgniteSystemProperties.IGNITE_PARTITIONS_CO_LOCATION_CHECK_ENABLED, "true"); + return suite; } }