From 5279e5eee98b434ae8ee0837fd0cb390099500db Mon Sep 17 00:00:00 2001 From: ivasilinets Date: Wed, 29 Jul 2015 13:00:23 +0300 Subject: [PATCH] # ignite-1161 --- .../rest/AbstractRestProcessorSelfTest.java | 3 + .../rest/JettyRestProcessorAbstractSelfTest.java | 37 ++++++ .../configuration/ConnectorConfiguration.java | 59 +++++++++ .../rest/handlers/query/QueryCommandHandler.java | 132 +++++++++++++-------- 4 files changed, 182 insertions(+), 49 deletions(-) diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java index 8310b0f..9b26bd8 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java @@ -73,6 +73,9 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest { clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml"); + clientCfg.setIdleQueryCursorTimeout(5000); + clientCfg.setQueryCheckFrequency(5000); + cfg.setConnectorConfiguration(clientCfg); TcpDiscoverySpi disco = new TcpDiscoverySpi(); diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java index 8ce070f..29ca521 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java @@ -25,6 +25,7 @@ import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.rest.handlers.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; import java.io.*; @@ -1194,6 +1195,42 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro assertFalse(queryCursorFound()); } + /** + * @throws Exception If failed. + */ + public void testQueryDelay() throws Exception { + String qry = "salary > ? and salary <= ?"; + + Map params = new HashMap<>(); + params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key()); + params.put("type", "Person"); + params.put("psz", "1"); + params.put("cacheName", "person"); + params.put("qry", URLEncoder.encode(qry)); + params.put("arg1", "1000"); + params.put("arg2", "2000"); + + String ret = null; + + for (int i = 0; i < 10; ++i) + ret = content(params); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + JSONObject json = JSONObject.fromObject(ret); + + List items = (List)((Map)json.get("response")).get("items"); + + assertEquals(1, items.size()); + + assertTrue(queryCursorFound()); + + U.sleep(10000); + + assertFalse(queryCursorFound()); + } + protected abstract String signature() throws Exception; /** diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java index 98753e2..e607767 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java @@ -61,6 +61,12 @@ public class ConnectorConfiguration { /** Default socket send and receive buffer size. */ public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024; + /** Default REST idle timeout for query cursor. */ + private static final long DFLT_IDLE_QRY_CUR_TIMEOUT = 10 * 60 * 1000; + + /** Default REST check frequency for query cursor. */ + private static final long DFLT_QRY_CHECK_FRQ = 60 * 1000; + /** Jetty XML configuration path. */ private String jettyPath; @@ -85,6 +91,12 @@ public class ConnectorConfiguration { /** REST TCP receive buffer size. */ private int rcvBufSize = DFLT_SOCK_BUF_SIZE; + /** REST idle timeout for query cursor. */ + private long idleQryCurTimeout = DFLT_IDLE_QRY_CUR_TIMEOUT; + + /** REST idle timeout for query cursor. */ + private long qryCheckFrq = DFLT_QRY_CHECK_FRQ; + /** REST TCP send queue limit. */ private int sndQueueLimit; @@ -148,6 +160,8 @@ public class ConnectorConfiguration { sslClientAuth = cfg.isSslClientAuth(); sslCtxFactory = cfg.getSslContextFactory(); sslEnabled = cfg.isSslEnabled(); + idleQryCurTimeout = cfg.getIdleQueryCursorTimeout(); + qryCheckFrq = cfg.getQueryCheckFrequency(); } /** @@ -547,4 +561,49 @@ public class ConnectorConfiguration { public void setMessageInterceptor(ConnectorMessageInterceptor interceptor) { msgInterceptor = interceptor; } + + /** + * Sets idle query cursors timeout. + * + * @param idleQryCurTimeout Idle query cursors timeout in milliseconds. + * @see #getIdleQueryCursorTimeout() + */ + public void setIdleQueryCursorTimeout(long idleQryCurTimeout) { + this.idleQryCurTimeout = idleQryCurTimeout; + } + + /** + * Gets idle query cursors timeout in milliseconds. + *

+ * This setting is used to reject open query cursors that is not used. If no fetch query request + * come within idle timeout, it will be removed on next check for old query cursors + * (see {@link #getQueryCheckFrequency()}). + * + * @return Idle query cursors timeout in milliseconds + */ + public long getIdleQueryCursorTimeout() { + return idleQryCurTimeout; + } + + /** + * Sets query check frequency. + * + * @param qryCheckFrq Idle query check frequency in milliseconds. + * @see #getQueryCheckFrequency() + */ + public void setQueryCheckFrequency(long qryCheckFrq) { + this.qryCheckFrq = qryCheckFrq; + } + + /** + * Gets query cursors check frequency. + * This setting is used to reject open query cursors that is not used. + *

+ * Scheduler tries with specified period to close queries' cursors that are overtime. + * + * @return Query check frequency in milliseconds. + */ + public long getQueryCheckFrequency() { + return qryCheckFrq; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java index 59f95c9..f845456 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java @@ -26,8 +26,8 @@ import org.apache.ignite.internal.processors.rest.*; import org.apache.ignite.internal.processors.rest.handlers.*; import org.apache.ignite.internal.processors.rest.request.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; import java.util.*; import java.util.concurrent.*; @@ -49,13 +49,36 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { private static final AtomicLong qryIdGen = new AtomicLong(); /** Current queries cursors. */ - private final ConcurrentHashMap> qryCurs = new ConcurrentHashMap<>(); + private final static ConcurrentHashMap> qryCurs = + new ConcurrentHashMap<>(); /** * @param ctx Context. */ public QueryCommandHandler(GridKernalContext ctx) { super(ctx); + + final long idleQryCurTimeout = ctx.config().getConnectorConfiguration().getIdleQueryCursorTimeout(); + + long qryCheckFrq = ctx.config().getConnectorConfiguration().getQueryCheckFrequency(); + + ctx.timeout().schedule(new Runnable() { + @Override public void run() { + long time = System.currentTimeMillis(); + + for (Map.Entry> e : qryCurs.entrySet()) { + synchronized (e.getValue()) { + long createTime = e.getValue().get3(); + + if (createTime + idleQryCurTimeout > time) { + e.getValue().get1().close(); + + qryCurs.remove(e.getKey()); + } + } + } + } + }, qryCheckFrq, qryCheckFrq); } /** {@inheritDoc} */ @@ -74,17 +97,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { case EXECUTE_SQL_QUERY: case EXECUTE_SQL_FIELDS_QUERY: { return ctx.closure().callLocalSafe( - new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, qryCurs), false); + new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req), false); } case FETCH_SQL_QUERY: { return ctx.closure().callLocalSafe( - new FetchQueryCallable((RestSqlQueryRequest)req, qryCurs), false); + new FetchQueryCallable((RestSqlQueryRequest)req), false); } case CLOSE_SQL_QUERY: { return ctx.closure().callLocalSafe( - new CloseQueryCallable((RestSqlQueryRequest)req, qryCurs), false); + new CloseQueryCallable((RestSqlQueryRequest)req), false); } } @@ -101,24 +124,18 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ - private ConcurrentHashMap> qryCurs; - /** * @param ctx Kernal context. * @param req Execute query request. - * @param qryCurs Queries cursors. */ - public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req, - ConcurrentHashMap> qryCurs) { + public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req) { this.ctx = ctx; this.req = req; - this.qryCurs = qryCurs; } /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { - long qryId = qryIdGen.getAndIncrement(); + final long qryId = qryIdGen.getAndIncrement(); try { Query qry; @@ -140,22 +157,27 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { return new GridRestResponse(GridRestResponse.STATUS_FAILED, "No cache with name [cacheName=" + req.cacheName() + "]"); - QueryCursor qryCur = cache.query(qry); + final QueryCursor qryCur = cache.query(qry); Iterator cur = qryCur.iterator(); - qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, cur)); + GridTuple3 val = + new GridTuple3<>(qryCur, cur, System.currentTimeMillis()); + + synchronized (val) { + qryCurs.put(qryId, val); - CacheQueryResult res = createQueryResult(qryCurs, cur, req, qryId); + CacheQueryResult res = createQueryResult(cur, req, qryId); - List fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); + List fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); - res.setFieldsMetadata(convertMetadata(fieldsMeta)); + res.setFieldsMetadata(convertMetadata(fieldsMeta)); - return new GridRestResponse(res); + return new GridRestResponse(res); + } } catch (Exception e) { - qryCurs.remove(qryId); + removeQueryCursor(qryId); return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); } @@ -184,36 +206,34 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ - private final ConcurrentHashMap> qryCurs; - /** * @param req Execute query request. - * @param qryCurs Queries cursors. */ - public CloseQueryCallable(RestSqlQueryRequest req, - ConcurrentHashMap> qryCurs) { + public CloseQueryCallable(RestSqlQueryRequest req) { this.req = req; - this.qryCurs = qryCurs; } /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { try { - QueryCursor cur = qryCurs.get(req.queryId()).get1(); + GridTuple3 val = qryCurs.get(req.queryId()); - if (cur == null) + if (val == null) return new GridRestResponse(GridRestResponse.STATUS_FAILED, "Cannot find query [qryId=" + req.queryId() + "]"); - cur.close(); + synchronized (val) { + QueryCursor cur = val.get1(); + + cur.close(); - qryCurs.remove(req.queryId()); + qryCurs.remove(req.queryId()); + } return new GridRestResponse(true); } catch (Exception e) { - qryCurs.remove(req.queryId()); + removeQueryCursor(req.queryId()); return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); } @@ -227,34 +247,34 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ - private final ConcurrentHashMap> qryCurs; - /** * @param req Execute query request. - * @param qryCurs Queries cursors. */ - public FetchQueryCallable(RestSqlQueryRequest req, - ConcurrentHashMap> qryCurs) { + public FetchQueryCallable(RestSqlQueryRequest req) { this.req = req; - this.qryCurs = qryCurs; } /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { try { - Iterator cur = qryCurs.get(req.queryId()).get2(); + GridTuple3 t = qryCurs.get(req.queryId()); - if (cur == null) + if (t == null) return new GridRestResponse(GridRestResponse.STATUS_FAILED, "Cannot find query [qryId=" + req.queryId() + "]"); - CacheQueryResult res = createQueryResult(qryCurs, cur, req, req.queryId()); + synchronized (t) { + t.set3(System.currentTimeMillis()); + + Iterator cur = t.get2(); - return new GridRestResponse(res); + CacheQueryResult res = createQueryResult(cur, req, req.queryId()); + + return new GridRestResponse(res); + } } catch (Exception e) { - qryCurs.remove(req.queryId()); + removeQueryCursor(req.queryId()); return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); } @@ -262,15 +282,12 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { } /** - * @param qryCurs Query cursors. * @param cur Current cursor. * @param req Sql request. * @param qryId Query id. * @return Query result with items. */ - private static CacheQueryResult createQueryResult( - ConcurrentHashMap> qryCurs, - Iterator cur, RestSqlQueryRequest req, Long qryId) { + private static CacheQueryResult createQueryResult(Iterator cur, RestSqlQueryRequest req, Long qryId) { CacheQueryResult res = new CacheQueryResult(); List items = new ArrayList<>(); @@ -285,8 +302,25 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { res.setQueryId(qryId); if (!cur.hasNext()) - qryCurs.remove(qryId); + removeQueryCursor(qryId); return res; } + + /** + * Removes query cursor. + * + * @param qryId Query id. + */ + private static void removeQueryCursor(Long qryId) { + GridTuple3 t = qryCurs.get(qryId); + + if (t != null) { + synchronized (t) { + t.get1().close(); + + qryCurs.remove(qryId); + } + } + } } -- 1.9.5.msysgit.0