From 8711f661ededf3e81b29c9aca1446d81098f76ec Mon Sep 17 00:00:00 2001 From: ivasilinets Date: Fri, 31 Jul 2015 15:59:15 +0300 Subject: [PATCH] # ignite-1161 --- .../rest/AbstractRestProcessorSelfTest.java | 3 + .../rest/JettyRestProcessorAbstractSelfTest.java | 37 ++++ .../configuration/ConnectorConfiguration.java | 59 ++++++ .../rest/handlers/query/QueryCommandHandler.java | 231 +++++++++++++++++---- 4 files changed, 289 insertions(+), 41 deletions(-) diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java index 8310b0f..e39a7811 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java @@ -73,6 +73,9 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest { clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml"); + clientCfg.setIdleQueryCursorTimeout(5000); + clientCfg.setIdleQueryCursorCheckFrequency(5000); + cfg.setConnectorConfiguration(clientCfg); TcpDiscoverySpi disco = new TcpDiscoverySpi(); diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java index 090e030..9e82f13 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java @@ -25,6 +25,7 @@ import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.rest.handlers.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; import java.io.*; @@ -1194,6 +1195,42 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro assertFalse(queryCursorFound()); } + /** + * @throws Exception If failed. + */ + public void testQueryDelay() throws Exception { + String qry = "salary > ? and salary <= ?"; + + Map params = new HashMap<>(); + params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key()); + params.put("type", "Person"); + params.put("pageSize", "1"); + params.put("cacheName", "person"); + params.put("qry", URLEncoder.encode(qry)); + params.put("arg1", "1000"); + params.put("arg2", "2000"); + + String ret = null; + + for (int i = 0; i < 10; ++i) + ret = content(params); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + JSONObject json = JSONObject.fromObject(ret); + + List items = (List)((Map)json.get("response")).get("items"); + + assertEquals(1, items.size()); + + assertTrue(queryCursorFound()); + + U.sleep(10000); + + assertFalse(queryCursorFound()); + } + protected abstract String signature() throws Exception; /** diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java index 98753e2..9006440 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java @@ -61,6 +61,12 @@ public class ConnectorConfiguration { /** Default socket send and receive buffer size. */ public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024; + /** Default REST idle timeout for query cursor. */ + private static final long DFLT_IDLE_QRY_CUR_TIMEOUT = 10 * 60 * 1000; + + /** Default REST check frequency for idle query cursor. */ + private static final long DFLT_IDLE_QRY_CUR_CHECK_FRQ = 60 * 1000; + /** Jetty XML configuration path. */ private String jettyPath; @@ -85,6 +91,12 @@ public class ConnectorConfiguration { /** REST TCP receive buffer size. */ private int rcvBufSize = DFLT_SOCK_BUF_SIZE; + /** REST idle timeout for query cursor. */ + private long idleQryCurTimeout = DFLT_IDLE_QRY_CUR_TIMEOUT; + + /** REST idle check frequency for query cursor. */ + private long idleQryCurCheckFreq = DFLT_IDLE_QRY_CUR_CHECK_FRQ; + /** REST TCP send queue limit. */ private int sndQueueLimit; @@ -148,6 +160,8 @@ public class ConnectorConfiguration { sslClientAuth = cfg.isSslClientAuth(); sslCtxFactory = cfg.getSslContextFactory(); sslEnabled = cfg.isSslEnabled(); + idleQryCurTimeout = cfg.getIdleQueryCursorTimeout(); + idleQryCurCheckFreq = cfg.getIdleQueryCursorCheckFrequency(); } /** @@ -547,4 +561,49 @@ public class ConnectorConfiguration { public void setMessageInterceptor(ConnectorMessageInterceptor interceptor) { msgInterceptor = interceptor; } + + /** + * Sets idle query cursors timeout. + * + * @param idleQryCurTimeout Idle query cursors timeout in milliseconds. + * @see #getIdleQueryCursorTimeout() + */ + public void setIdleQueryCursorTimeout(long idleQryCurTimeout) { + this.idleQryCurTimeout = idleQryCurTimeout; + } + + /** + * Gets idle query cursors timeout in milliseconds. + *

+ * This setting is used to reject open query cursors that is not used. If no fetch query request + * come within idle timeout, it will be removed on next check for old query cursors + * (see {@link #getIdleQueryCursorCheckFrequency()}). + * + * @return Idle query cursors timeout in milliseconds + */ + public long getIdleQueryCursorTimeout() { + return idleQryCurTimeout; + } + + /** + * Sets idle query cursor check frequency. + * + * @param idleQryCurCheckFreq Idle query check frequency in milliseconds. + * @see #getIdleQueryCursorCheckFrequency() + */ + public void setIdleQueryCursorCheckFrequency(long idleQryCurCheckFreq) { + this.idleQryCurCheckFreq = idleQryCurCheckFreq; + } + + /** + * Gets idle query cursors check frequency. + * This setting is used to reject open query cursors that is not used. + *

+ * Scheduler tries with specified period to close queries' cursors that are overtime. + * + * @return Idle query cursor check frequency in milliseconds. + */ + public long getIdleQueryCursorCheckFrequency() { + return idleQryCurCheckFreq; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java index 1712dd4..44d99b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java @@ -27,11 +27,11 @@ import org.apache.ignite.internal.processors.rest.handlers.*; import org.apache.ignite.internal.processors.rest.request.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; @@ -49,13 +49,43 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { private static final AtomicLong qryIdGen = new AtomicLong(); /** Current queries cursors. */ - private final ConcurrentHashMap> qryCurs = new ConcurrentHashMap<>(); + private final ConcurrentHashMap qryCurs = new ConcurrentHashMap<>(); /** * @param ctx Context. */ public QueryCommandHandler(GridKernalContext ctx) { super(ctx); + + final long idleQryCurTimeout = ctx.config().getConnectorConfiguration().getIdleQueryCursorTimeout(); + + long qryCheckFrq = ctx.config().getConnectorConfiguration().getIdleQueryCursorCheckFrequency(); + + ctx.timeout().schedule(new Runnable() { + @Override public void run() { + long time = U.currentTimeMillis(); + + for (Map.Entry e : qryCurs.entrySet()) { + QueryCursorIterator val = e.getValue(); + + long createTime = val.timestamp(); + + if (createTime + idleQryCurTimeout > time) + if (val.lock().tryLock()) { + try { + val.timestamp(-1); + + qryCurs.remove(e.getKey(), val); + + val.close(); + } + finally { + val.lock().unlock(); + } + } + } + } + }, qryCheckFrq, qryCheckFrq); } /** {@inheritDoc} */ @@ -101,16 +131,16 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ - private ConcurrentHashMap> qryCurs; + /** Current queries cursors. */ + private final ConcurrentHashMap qryCurs; /** * @param ctx Kernal context. * @param req Execute query request. - * @param qryCurs Queries cursors. + * @param qryCurs Query cursors. */ public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req, - ConcurrentHashMap> qryCurs) { + ConcurrentHashMap qryCurs) { this.ctx = ctx; this.req = req; this.qryCurs = qryCurs; @@ -118,7 +148,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { - long qryId = qryIdGen.getAndIncrement(); + final long qryId = qryIdGen.getAndIncrement(); try { Query qry; @@ -140,22 +170,31 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { return new GridRestResponse(GridRestResponse.STATUS_FAILED, "Failed to find cache with name: " + req.cacheName()); - QueryCursor qryCur = cache.query(qry); + final QueryCursor qryCur = cache.query(qry); Iterator cur = qryCur.iterator(); - qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, cur)); + QueryCursorIterator val = new QueryCursorIterator(qryCur, cur); - CacheQueryResult res = createQueryResult(qryCurs, cur, req, qryId); + val.lock().lock(); - List fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); + try { + qryCurs.put(qryId, val); - res.setFieldsMetadata(convertMetadata(fieldsMeta)); + CacheQueryResult res = createQueryResult(cur, req, qryId, qryCurs); - return new GridRestResponse(res); + List fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); + + res.setFieldsMetadata(convertMetadata(fieldsMeta)); + + return new GridRestResponse(res); + } + finally { + val.lock().unlock(); + } } catch (Exception e) { - qryCurs.remove(qryId); + removeQueryCursor(qryId, qryCurs); return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); } @@ -184,15 +223,14 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ - private final ConcurrentHashMap> qryCurs; + /** Current queries cursors. */ + private final ConcurrentHashMap qryCurs; /** * @param req Execute query request. - * @param qryCurs Queries cursors. + * @param qryCurs Query cursors. */ - public CloseQueryCallable(RestSqlQueryRequest req, - ConcurrentHashMap> qryCurs) { + public CloseQueryCallable(RestSqlQueryRequest req, ConcurrentHashMap qryCurs) { this.req = req; this.qryCurs = qryCurs; } @@ -200,20 +238,29 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { try { - QueryCursor cur = qryCurs.get(req.queryId()).get1(); + QueryCursorIterator val = qryCurs.get(req.queryId()); - if (cur == null) - return new GridRestResponse(GridRestResponse.STATUS_FAILED, - "Failed to find query with ID: " + req.queryId()); + if (val == null) + return new GridRestResponse(true); + + val.lock().lock(); - cur.close(); + try{ + if (val.timestamp() == -1) + return new GridRestResponse(true); - qryCurs.remove(req.queryId()); + val.close(); + + qryCurs.remove(req.queryId()); + } + finally { + val.lock().unlock(); + } return new GridRestResponse(true); } catch (Exception e) { - qryCurs.remove(req.queryId()); + removeQueryCursor(req.queryId(), qryCurs); return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); } @@ -227,15 +274,14 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ - private final ConcurrentHashMap> qryCurs; + /** Current queries cursors. */ + private final ConcurrentHashMap qryCurs; /** * @param req Execute query request. - * @param qryCurs Queries cursors. + * @param qryCurs Query cursors. */ - public FetchQueryCallable(RestSqlQueryRequest req, - ConcurrentHashMap> qryCurs) { + public FetchQueryCallable(RestSqlQueryRequest req, ConcurrentHashMap qryCurs) { this.req = req; this.qryCurs = qryCurs; } @@ -243,18 +289,33 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { try { - Iterator cur = qryCurs.get(req.queryId()).get2(); + QueryCursorIterator val = qryCurs.get(req.queryId()); - if (cur == null) + if (val == null) return new GridRestResponse(GridRestResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId()); - CacheQueryResult res = createQueryResult(qryCurs, cur, req, req.queryId()); + val.lock().lock(); + + try { + if (val.timestamp() == -1) + return new GridRestResponse(GridRestResponse.STATUS_FAILED, + "Query is closed by timeout. Restart query with ID: " + req.queryId()); + + val.timestamp(U.currentTimeMillis()); + + Iterator cur = val.iterator(); + + CacheQueryResult res = createQueryResult(cur, req, req.queryId(), qryCurs); - return new GridRestResponse(res); + return new GridRestResponse(res); + } + finally { + val.lock().unlock(); + } } catch (Exception e) { - qryCurs.remove(req.queryId()); + removeQueryCursor(req.queryId(), qryCurs); return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); } @@ -262,15 +323,14 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { } /** - * @param qryCurs Query cursors. * @param cur Current cursor. * @param req Sql request. * @param qryId Query id. + * @param qryCurs Query cursors. * @return Query result with items. */ - private static CacheQueryResult createQueryResult( - ConcurrentHashMap> qryCurs, - Iterator cur, RestSqlQueryRequest req, Long qryId) { + private static CacheQueryResult createQueryResult(Iterator cur, RestSqlQueryRequest req, Long qryId, + ConcurrentHashMap qryCurs) { CacheQueryResult res = new CacheQueryResult(); List items = new ArrayList<>(); @@ -285,8 +345,97 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { res.setQueryId(qryId); if (!cur.hasNext()) - qryCurs.remove(qryId); + removeQueryCursor(qryId, qryCurs); return res; } + + /** + * Removes query cursor. + * + * @param qryId Query id. + * @param qryCurs Query cursors. + */ + private static void removeQueryCursor(Long qryId, ConcurrentHashMap qryCurs) { + QueryCursorIterator t = qryCurs.get(qryId); + + if (t == null) + return; + + t.lock().lock(); + + try{ + if (t.timestamp() == -1) + return; + + t.close(); + + qryCurs.remove(qryId); + } + finally { + t.lock().unlock(); + } + } + + /** + * Query cursor iterator. + */ + private static class QueryCursorIterator { + /** Query cursor. */ + private QueryCursor cur; + + /** Query iterator. */ + private Iterator iter; + + /** Last timestamp. */ + private volatile long timestamp; + + /** Reentrant lock. */ + private final ReentrantLock lock = new ReentrantLock(); + + /** + * @param cur Query cursor. + * @param iter Query iterator. + */ + public QueryCursorIterator(QueryCursor cur, Iterator iter) { + this.cur = cur; + this.iter = iter; + timestamp = U.currentTimeMillis(); + } + + /** + * @return Lock. + */ + public ReentrantLock lock() { + return lock; + } + + /** + * @return Query iterator. + */ + public Iterator iterator() { + return iter; + } + + /** + * @return Timestamp. + */ + public long timestamp() { + return timestamp; + } + + /** + * @param time Current time or -1 if cursor is closed. + */ + public void timestamp(long time) { + timestamp = time; + } + + /** + * Close query cursor. + */ + public void close() { + cur.close(); + } + } } -- 1.9.5.msysgit.0