From a1601d79181755b985dab80e02c08bdf05788722 Mon Sep 17 00:00:00 2001 From: ivasilinets Date: Tue, 28 Jul 2015 12:02:27 +0300 Subject: [PATCH] #ignite-1161: add schedule remover for query cursor. --- .../rest/AbstractRestProcessorSelfTest.java | 2 + .../rest/JettyRestProcessorAbstractSelfTest.java | 34 ++++++++ .../configuration/ConnectorConfiguration.java | 23 ++++++ .../rest/handlers/query/QueryCommandHandler.java | 90 +++++++++++++--------- 4 files changed, 113 insertions(+), 36 deletions(-) diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java index 8310b0f..b5b430c 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java @@ -73,6 +73,8 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest { clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml"); + clientCfg.setQueryRemoveDelay(5); + cfg.setConnectorConfiguration(clientCfg); TcpDiscoverySpi disco = new TcpDiscoverySpi(); diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java index 8ce070f..91dfa66 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java @@ -25,6 +25,7 @@ import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.rest.handlers.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; import java.io.*; @@ -1194,6 +1195,39 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro assertFalse(queryCursorFound()); } + /** + * @throws Exception If failed. + */ + public void testQueryDelay() throws Exception { + String qry = "salary > ? and salary <= ?"; + + Map params = new HashMap<>(); + params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key()); + params.put("type", "Person"); + params.put("psz", "1"); + params.put("cacheName", "person"); + params.put("qry", URLEncoder.encode(qry)); + params.put("arg1", "1000"); + params.put("arg2", "2000"); + + String ret = content(params); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + JSONObject json = JSONObject.fromObject(ret); + + List items = (List)((Map)json.get("response")).get("items"); + + assertEquals(1, items.size()); + + assertTrue(queryCursorFound()); + + U.sleep(12000); + + assertFalse(queryCursorFound()); + } + protected abstract String signature() throws Exception; /** diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java index 98753e2..237f4b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java @@ -61,6 +61,9 @@ public class ConnectorConfiguration { /** Default socket send and receive buffer size. */ public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024; + /** Default delay for storing query cursor (10 minutes). */ + private static final int DFLT_QRY_RMV_DELAY = 10 * 60; + /** Jetty XML configuration path. */ private String jettyPath; @@ -85,6 +88,9 @@ public class ConnectorConfiguration { /** REST TCP receive buffer size. */ private int rcvBufSize = DFLT_SOCK_BUF_SIZE; + /** REST delay for storing query cursor. */ + private int qryRmvDelay = DFLT_QRY_RMV_DELAY; + /** REST TCP send queue limit. */ private int sndQueueLimit; @@ -148,6 +154,7 @@ public class ConnectorConfiguration { sslClientAuth = cfg.isSslClientAuth(); sslCtxFactory = cfg.getSslContextFactory(); sslEnabled = cfg.isSslEnabled(); + qryRmvDelay = cfg.getQueryRemoveDelay(); } /** @@ -547,4 +554,20 @@ public class ConnectorConfiguration { public void setMessageInterceptor(ConnectorMessageInterceptor interceptor) { msgInterceptor = interceptor; } + + /** + * Sets delay for removing query cursors that are not used. + * + * @param qryRmvDelay Query remove delay in seconds. + */ + public void setQueryRemoveDelay(int qryRmvDelay) { + this.qryRmvDelay = qryRmvDelay; + } + + /** + * Gets delay for removing query cursors that are not used. + */ + public int getQueryRemoveDelay() { + return qryRmvDelay; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java index 59f95c9..18a2ae7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java @@ -26,8 +26,9 @@ import org.apache.ignite.internal.processors.rest.*; import org.apache.ignite.internal.processors.rest.handlers.*; import org.apache.ignite.internal.processors.rest.request.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; import java.util.*; import java.util.concurrent.*; @@ -49,13 +50,22 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { private static final AtomicLong qryIdGen = new AtomicLong(); /** Current queries cursors. */ - private final ConcurrentHashMap> qryCurs = new ConcurrentHashMap<>(); + private final static ConcurrentHashMap> qryCurs = + new ConcurrentHashMap<>(); + + /** Remove delay. */ + private static int rmvDelay = 0; + + /** Scheduler. */ + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1); /** * @param ctx Context. */ public QueryCommandHandler(GridKernalContext ctx) { super(ctx); + + rmvDelay = ctx.config().getConnectorConfiguration().getQueryRemoveDelay(); } /** {@inheritDoc} */ @@ -74,17 +84,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { case EXECUTE_SQL_QUERY: case EXECUTE_SQL_FIELDS_QUERY: { return ctx.closure().callLocalSafe( - new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, qryCurs), false); + new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req), false); } case FETCH_SQL_QUERY: { return ctx.closure().callLocalSafe( - new FetchQueryCallable((RestSqlQueryRequest)req, qryCurs), false); + new FetchQueryCallable((RestSqlQueryRequest)req), false); } case CLOSE_SQL_QUERY: { return ctx.closure().callLocalSafe( - new CloseQueryCallable((RestSqlQueryRequest)req, qryCurs), false); + new CloseQueryCallable((RestSqlQueryRequest)req), false); } } @@ -101,24 +111,18 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ - private ConcurrentHashMap> qryCurs; - /** * @param ctx Kernal context. * @param req Execute query request. - * @param qryCurs Queries cursors. */ - public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req, - ConcurrentHashMap> qryCurs) { + public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req) { this.ctx = ctx; this.req = req; - this.qryCurs = qryCurs; } /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { - long qryId = qryIdGen.getAndIncrement(); + final long qryId = qryIdGen.getAndIncrement(); try { Query qry; @@ -140,13 +144,15 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { return new GridRestResponse(GridRestResponse.STATUS_FAILED, "No cache with name [cacheName=" + req.cacheName() + "]"); - QueryCursor qryCur = cache.query(qry); + final QueryCursor qryCur = cache.query(qry); Iterator cur = qryCur.iterator(); - qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, cur)); + qryCurs.put(qryId, new GridTuple3<>(qryCur, cur, true)); - CacheQueryResult res = createQueryResult(qryCurs, cur, req, qryId); + scheduleRemove(qryId); + + CacheQueryResult res = createQueryResult(cur, req, qryId); List fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); @@ -184,17 +190,11 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ - private final ConcurrentHashMap> qryCurs; - /** * @param req Execute query request. - * @param qryCurs Queries cursors. */ - public CloseQueryCallable(RestSqlQueryRequest req, - ConcurrentHashMap> qryCurs) { + public CloseQueryCallable(RestSqlQueryRequest req) { this.req = req; - this.qryCurs = qryCurs; } /** {@inheritDoc} */ @@ -227,29 +227,27 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ - private final ConcurrentHashMap> qryCurs; - /** * @param req Execute query request. - * @param qryCurs Queries cursors. */ - public FetchQueryCallable(RestSqlQueryRequest req, - ConcurrentHashMap> qryCurs) { + public FetchQueryCallable(RestSqlQueryRequest req) { this.req = req; - this.qryCurs = qryCurs; } /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { try { - Iterator cur = qryCurs.get(req.queryId()).get2(); + GridTuple3 t = qryCurs.get(req.queryId()); + + t.set3(true); + + Iterator cur = t.get2(); if (cur == null) return new GridRestResponse(GridRestResponse.STATUS_FAILED, "Cannot find query [qryId=" + req.queryId() + "]"); - CacheQueryResult res = createQueryResult(qryCurs, cur, req, req.queryId()); + CacheQueryResult res = createQueryResult(cur, req, req.queryId()); return new GridRestResponse(res); } @@ -262,15 +260,12 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { } /** - * @param qryCurs Query cursors. * @param cur Current cursor. * @param req Sql request. * @param qryId Query id. * @return Query result with items. */ - private static CacheQueryResult createQueryResult( - ConcurrentHashMap> qryCurs, - Iterator cur, RestSqlQueryRequest req, Long qryId) { + private static CacheQueryResult createQueryResult(Iterator cur, RestSqlQueryRequest req, Long qryId) { CacheQueryResult res = new CacheQueryResult(); List items = new ArrayList<>(); @@ -289,4 +284,27 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { return res; } + + /** + * Schedule remove for query cursor. + * + * @param id Query id. + */ + private static void scheduleRemove(final long id) { + SCHEDULER.schedule(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + GridTuple3 t = qryCurs.get(id); + + if (t != null) { + if (t.get3()) { + t.set3(false); + + scheduleRemove(id); + } + else + qryCurs.remove(id); + } + } + }, rmvDelay, TimeUnit.SECONDS); + } } -- 1.9.5.msysgit.0