User: sgrimstad Date: 29 Aug 18 15:15 Revision: dd35f024d77d12badf711bce3644450008e38921 Summary: IGNITE-8913 Query cancelled messages are enriched with details, tests updated TeamCity URL: https://ci.ignite.apache.org/viewModification.html?tab=vcsModificationFiles&modId=829953&personal=false Index: modules/core/src/main/java/org/apache/ignite/cache/query/QueryCancelledException.java =================================================================== --- modules/core/src/main/java/org/apache/ignite/cache/query/QueryCancelledException.java (revision 0c4301cdc0d6108ed5b51173144e19d3ad450e63) +++ modules/core/src/main/java/org/apache/ignite/cache/query/QueryCancelledException.java (revision dd35f024d77d12badf711bce3644450008e38921) @@ -20,7 +20,7 @@ import org.apache.ignite.IgniteCheckedException; /** - * The exception is thrown if a query was cancelled or timed out while executing. + * The exception is thrown if a qry was cancelled or timed out while executing. */ public class QueryCancelledException extends IgniteCheckedException { /** */ @@ -30,6 +30,11 @@ * Default constructor. */ public QueryCancelledException() { - super("The query was cancelled while executing."); + super("The qry was cancelled while executing"); } + + /** */ + public QueryCancelledException(String msg) { + super(msg); -} \ No newline at end of file + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java (revision 0c4301cdc0d6108ed5b51173144e19d3ad450e63) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java (revision dd35f024d77d12badf711bce3644450008e38921) @@ -98,7 +98,10 @@ // Handle race with cancel and make sure the iterator resources are freed correctly. closeIter(); - throw new CacheException(new QueryCancelledException()); + throw new CacheException(new QueryCancelledException(String.format( + "The query was cancelled while executing. Client node should provide details [reason=%s]", + "Cancelled by client" + ))); } assert iter != null; Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java (revision 0c4301cdc0d6108ed5b51173144e19d3ad450e63) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java (revision dd35f024d77d12badf711bce3644450008e38921) @@ -1231,7 +1231,13 @@ catch (SQLException e) { // Throw special exception. if (e.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED) - throw new QueryCancelledException(); + throw new CacheException(new QueryCancelledException(String.format( + "The query was cancelled while executing. [query=%s, localNodeId=%s, reason=%s, timeout=%s ms]", + stmt, + ctx.localNodeId(), + timeoutMillis>0 ? "Statement with timeout was cancelled" : "Cancelled by client", + timeoutMillis + ))); throw new IgniteCheckedException("Failed to execute SQL query. " + e.getMessage(), e); } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java (revision 0c4301cdc0d6108ed5b51173144e19d3ad450e63) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java (revision dd35f024d77d12badf711bce3644450008e38921) @@ -726,7 +726,14 @@ nodeRess.cancelRequest(reqId); - throw new QueryCancelledException(); + throw new QueryCancelledException(String.format( + "The query request (could be more than 1 query) was cancelled while executing. " + + "[reqId=%s, firstQuery=%s, localNodeId=%s, reason=%s]", + reqId, + qrys.isEmpty() ? "no queries" : qrys.iterator().next().query(), + ctx.localNodeId(), + "Cancelled by client" + )); } // Run queries. @@ -811,8 +818,10 @@ else { U.error(log, "Failed to execute local query.", e); - sendError(node, reqId, e); + Exception cancelled = X.cause(e,QueryCancelledException.class); + sendError(node, reqId, (cancelled != null) ? cancelled : e); + if (e instanceof Error) throw (Error)e; } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java (revision 0c4301cdc0d6108ed5b51173144e19d3ad450e63) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java (revision dd35f024d77d12badf711bce3644450008e38921) @@ -281,7 +281,7 @@ if (failCode == GridQueryFailResponse.CANCELLED_BY_ORIGINATOR) e.addSuppressed(new QueryCancelledException()); - r.state(e, nodeId); + r.state(msg, e, nodeId); } } @@ -360,7 +360,7 @@ * @param nodeId Node ID. */ private void retry(ReduceQueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) { - r.state(retryVer, nodeId); + r.state("Node left the grid", retryVer, nodeId); } /** @@ -751,24 +751,6 @@ boolean retry = false; - // Always enforce join order on map side to have consistent behavior. - int flags = GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER; - - if (distributedJoins) - flags |= GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS; - - if (qry.isLocal()) - flags |= GridH2QueryRequest.FLAG_IS_LOCAL; - - if (qry.explain()) - flags |= GridH2QueryRequest.FLAG_EXPLAIN; - - if (isReplicatedOnly) - flags |= GridH2QueryRequest.FLAG_REPLICATED; - - if (lazy && mapQrys.size() == 1) - flags |= GridH2QueryRequest.FLAG_LAZY; - GridH2QueryRequest req = new GridH2QueryRequest() .requestId(qryReqId) .topologyVersion(topVer) @@ -778,32 +760,15 @@ .partitions(convert(partsMap)) .queries(mapQrys) .parameters(params) - .flags(flags) + .flags(prepareFlags(qry, lazy, mapQrys.size())) .timeout(timeoutMillis) .schemaName(schemaName); if (send(nodes, req, parts == null ? null : new ExplicitPartitionsSpecializer(qryMap), false)) { awaitAllReplies(r, nodes, cancel); - if (r.hasError()) { - if (r.cacheException() != null) { - CacheException err = r.cacheException(); - - if (err.getCause() instanceof IgniteClientDisconnectedException) - throw err; - - if (wasCancelled(err)) - throw new QueryCancelledException(); // Throw correct exception. - - throw new CacheException("Failed to run map query remotely." + err.getMessage(), err); - } else { - retry = true; - - // If remote node asks us to retry then we have outdated full partition map. - h2.awaitForReadyTopologyVersion(r.topVersion()); + retry = analyseCurrentRun(r); - } + } - } - } else // Send failed. retry = true; @@ -876,6 +841,14 @@ Throwable disconnectedErr = ((IgniteCheckedException)e).getCause(IgniteClientDisconnectedException.class); + if ( QueryCancelledException.class.isAssignableFrom(e.getClass()) ) + cause = new QueryCancelledException(String.format( + "The query was cancelled while executing. [query=%s, localNodeId=%s, reason=%s]", + qry.originalSql(), + ctx.localNodeId(), + "Cancelled by client" + )); + if (disconnectedErr != null) cause = disconnectedErr; } @@ -896,6 +869,64 @@ } /** + * Analyse reduce query run to decide if retry is required + * @param r reduce query run to be analysed + * @return true if retry is required, false otherwise + * @throws IgniteCheckedException in case of reduce query run contains exception record + */ + private boolean analyseCurrentRun(ReduceQueryRun r) throws IgniteCheckedException { + if (r.hasError()) { + if (r.cacheException() != null) { + CacheException err = r.cacheException(); + + if (err.getCause() instanceof IgniteClientDisconnectedException) + throw err; + + Exception cause = wasCancelled(err) || X.hasCause(err, QueryCancelledException.class) + ? new QueryCancelledException(r.rootCause()) + : err; + + throw new CacheException("Failed to run map query remotely." + cause.getMessage(), cause); + } else { + // If remote node asks us to retry then we have outdated full partition map. + h2.awaitForReadyTopologyVersion(r.topVersion()); + + return true; + } + } + return false; + } + + /** + * Builds flag out of parameters + * @param qry query parameter holder + * @param lazy if lazy execution + * @param mapQrysSize number of queries + * @return flag + */ + private int prepareFlags(GridCacheTwoStepQuery qry, boolean lazy, int mapQrysSize) { + // Always enforce join order on map side to have consistent behavior. + int flags = GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER; + + if (qry.distributedJoins()) + flags |= GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS; + + if (qry.isLocal()) + flags |= GridH2QueryRequest.FLAG_IS_LOCAL; + + if (qry.explain()) + flags |= GridH2QueryRequest.FLAG_EXPLAIN; + + if (qry.isReplicatedOnly()) + flags |= GridH2QueryRequest.FLAG_REPLICATED; + + if (lazy && mapQrysSize == 1) + flags |= GridH2QueryRequest.FLAG_LAZY; + + return flags; + } + + /** * * @param schemaName Schema name. * @param cacheIds Cache ids. Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java (revision 0c4301cdc0d6108ed5b51173144e19d3ad450e63) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java (revision dd35f024d77d12badf711bce3644450008e38921) @@ -83,16 +83,16 @@ * @param o Fail state object. * @param nodeId Node ID. */ - void state(Object o, @Nullable UUID nodeId) { + void state(String msg, Object o, @Nullable UUID nodeId) { assert o != null; assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass(); if ( o instanceof CacheException ) - state(new State((CacheException)o, null, null, nodeId)); + state(new State((CacheException)o, msg, null, nodeId)); else - state(new State(null, null, (AffinityTopologyVersion)o, nodeId)); + state(new State(null, msg, (AffinityTopologyVersion)o, nodeId)); } /** @@ -126,7 +126,7 @@ * @param e Error. */ void disconnected(CacheException e) { - state(e, null); + state("", e, null); } /** Index: modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java =================================================================== --- modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java (revision 0c4301cdc0d6108ed5b51173144e19d3ad450e63) +++ modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java (revision dd35f024d77d12badf711bce3644450008e38921) @@ -35,6 +35,7 @@ import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.internal.processors.GridProcessor; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -46,7 +47,7 @@ */ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends GridCommonAbstractTest { /** Grids count. */ - private static final int GRIDS_CNT = 3; + private static final int GRIDS_CNT = 4; /** IP finder. */ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); @@ -66,6 +67,12 @@ /** */ private static final String QRY_3 = "select a._val from String a"; + /** */ + private static final String CANCELLED_BY_CLIENT = "reason=Cancelled by client"; + + /** */ + private static final String WITH_TIMEOUT_WAS_CANCELLED = "reason=Statement with timeout was cancelled"; + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); @@ -100,82 +107,96 @@ /** */ public void testRemoteQueryExecutionTimeout() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, true); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 10, TimeUnit.MILLISECONDS, true, + WITH_TIMEOUT_WAS_CANCELLED); } /** */ - public void testRemoteQueryWithMergeTableTimeout() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, true); + public void testRemoteQueryWithMergeTableTimeout0() throws Exception { + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 3, TimeUnit.MILLISECONDS, true, + WITH_TIMEOUT_WAS_CANCELLED); } + /** Query possibly could be executed faster than timeout*/ + public void testRemoteQueryWithMergeTableTimeout1() throws Exception { + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 25, TimeUnit.MILLISECONDS, true, + WITH_TIMEOUT_WAS_CANCELLED); + } + /** */ public void testRemoteQueryExecutionCancel0() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.MILLISECONDS, false, + CANCELLED_BY_CLIENT); } /** */ public void testRemoteQueryExecutionCancel1() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 10, TimeUnit.MILLISECONDS, false, + CANCELLED_BY_CLIENT); } /** */ public void testRemoteQueryExecutionCancel2() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.SECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.SECONDS, false, + CANCELLED_BY_CLIENT); } /** */ public void testRemoteQueryExecutionCancel3() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 3, TimeUnit.SECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 3, TimeUnit.SECONDS, false, + CANCELLED_BY_CLIENT); } /** */ public void testRemoteQueryWithMergeTableCancel0() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1, TimeUnit.MILLISECONDS, false, + CANCELLED_BY_CLIENT); } - /** */ + /** Query with far less complex sql and expected to be executed faster than timeout*/ public void testRemoteQueryWithMergeTableCancel1() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, false, null); } - /** */ + /** Query with far less complex sql and expected to be executed faster than timeout*/ public void testRemoteQueryWithMergeTableCancel2() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1_500, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1_500, TimeUnit.MILLISECONDS, false, null); } - /** */ + /** Query with far less complex sql and expected to be executed faster than timeout*/ public void testRemoteQueryWithMergeTableCancel3() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 3, TimeUnit.SECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 3, TimeUnit.SECONDS, false, null); } - /** */ + /** Query possibly could be executed faster than timeout*/ public void testRemoteQueryWithoutMergeTableCancel0() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1, TimeUnit.MILLISECONDS, false); + testQueryCancel(2*CACHE_SIZE, VAL_SIZE, QRY_3, 1, TimeUnit.MILLISECONDS, false, + CANCELLED_BY_CLIENT); } - /** */ + /** Query with far less complex sql and expected to be executed faster than timeout*/ public void testRemoteQueryWithoutMergeTableCancel1() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 500, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 500, TimeUnit.MILLISECONDS, false, null); } - /** */ + /** Query with far less complex sql and expected to be executed faster than timeout*/ public void testRemoteQueryWithoutMergeTableCancel2() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1_000, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1_000, TimeUnit.MILLISECONDS, false, null); } - /** */ + /** Query with far less complex sql and expected to be executed faster than timeout*/ public void testRemoteQueryWithoutMergeTableCancel3() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false, null); } - /** */ + /** Query with far less complex sql and expected to be executed faster than timeout*/ public void testRemoteQueryAlreadyFinishedStop() throws Exception { - testQueryCancel(100, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false); + testQueryCancel(100, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false, null); } /** */ private void testQueryCancel(int keyCnt, int valSize, String sql, int timeoutUnits, TimeUnit timeUnit, - boolean timeout) throws Exception { + boolean timeout, String cause) throws Exception { try (Ignite client = startGrid("client")) { IgniteCache cache = client.cache(DEFAULT_CACHE_NAME); @@ -216,20 +237,27 @@ try(QueryCursor> ignored = cursor) { cursor.iterator(); - fail(); + + if (!F.isEmpty(cause)) + fail("No exception caught"); } catch (CacheException ex) { - log().error("Got expected exception", ex); + log().error("Got exception", ex); + log().error( "Cause of exception", ex.getCause()); + assertTrue("Must throw correct exception", ex.getCause() instanceof QueryCancelledException); - } + assertTrue( "Cause message "+ex.getCause().getMessage(), ex.getCause().getMessage().contains(cause)); + }finally { + - // Give some time to clean up. - Thread.sleep(TimeUnit.MILLISECONDS.convert(timeoutUnits, timeUnit) + 3_000); + // Give some time to clean up. + Thread.sleep(TimeUnit.MILLISECONDS.convert(timeoutUnits, timeUnit) + 3_000); - checkCleanState(); - } - } + checkCleanState(); + } + } + } /** * Validates clean state on all participating nodes after query cancellation. Index: modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/local/IgniteCacheLocalQueryCancelOrTimeoutSelfTest.java =================================================================== --- modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/local/IgniteCacheLocalQueryCancelOrTimeoutSelfTest.java (revision 0c4301cdc0d6108ed5b51173144e19d3ad450e63) +++ modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/local/IgniteCacheLocalQueryCancelOrTimeoutSelfTest.java (revision dd35f024d77d12badf711bce3644450008e38921) @@ -42,6 +42,12 @@ /** */ private static final String QUERY = "select a._val, b._val from String a, String b"; + /** */ + private static final String CANCELLED_BY_CLIENT = "reason=Cancelled by client"; + + /** */ + private static final String WITH_TIMEOUT_WAS_CANCELLED = "reason=Statement with timeout was cancelled"; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -93,27 +99,27 @@ * Tests cancellation. */ public void testQueryCancel() { - testQuery(false, 1, TimeUnit.SECONDS); + testQuery(false, 1, TimeUnit.SECONDS, CANCELLED_BY_CLIENT); } /** * Tests cancellation with zero timeout. */ public void testQueryCancelZeroTimeout() { - testQuery(false, 1, TimeUnit.MILLISECONDS); + testQuery(false, 1, TimeUnit.MILLISECONDS, CANCELLED_BY_CLIENT); } /** * Tests timeout. */ public void testQueryTimeout() { - testQuery(true, 1, TimeUnit.SECONDS); + testQuery(true, 1, TimeUnit.SECONDS, WITH_TIMEOUT_WAS_CANCELLED); } /** * Tests cancellation. */ - private void testQuery(boolean timeout, int timeoutUnits, TimeUnit timeUnit) { + private void testQuery(boolean timeout, int timeoutUnits, TimeUnit timeUnit, String cause) { Ignite ignite = grid(0); IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME); @@ -143,9 +149,16 @@ fail("Expecting timeout"); } catch (Exception e) { + log().error("Got exception", e); + + log().error( "Cause of exception", e.getCause()); + assertTrue("Must throw correct exception", e.getCause() instanceof QueryCancelledException); + + assertTrue( "Cause message "+e.getCause().getMessage(), e.getCause().getMessage().contains(cause)); + } // Test must exit gracefully. } -} \ No newline at end of file +}