From 5db4b7910ecae08df544022aa91f5137247c8e68 Mon Sep 17 00:00:00 2001 From: Wang Ken Date: Fri, 1 Dec 2017 16:53:14 +0800 Subject: [PATCH 1/3] APACHE-KYLIN-2902: add query pending request limit to control thread consumption for each project Signed-off-by: Zhong --- .../org/apache/kylin/common/KylinConfigBase.java | 4 ++ .../kylin/metadata/project/ProjectInstance.java | 12 +++- .../kylin/metadata/project/ProjectManager.java | 2 +- .../java/org/apache/kylin/rest/msg/Message.java | 4 ++ .../apache/kylin/rest/service/QueryService.java | 33 +++++++-- .../org/apache/kylin/rest/util/RequestUtil.java | 84 ++++++++++++++++++++++ 6 files changed, 131 insertions(+), 8 deletions(-) create mode 100644 server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index e1a10a8..9a89bc5 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1097,6 +1097,10 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.query.scan-threshold", "10000000")); } + public int getQueryConcurrentRunningThresholdForProject() { + return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "20")); + } + public long getQueryMaxScanBytes() { long value = Long.parseLong(getOptional("kylin.query.max-scan-bytes", "0")); return value > 0 ? value : Long.MAX_VALUE; diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java index 1f54416..a7d37e7 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java @@ -27,6 +27,8 @@ import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.metadata.realization.RealizationType; @@ -84,6 +86,8 @@ public class ProjectInstance extends RootPersistentEntity { @JsonInclude(JsonInclude.Include.NON_NULL) private LinkedHashMap overrideKylinProps; + private KylinConfigExt config; + public String getResourcePath() { return concatResourcePath(name); } @@ -304,7 +308,7 @@ public class ProjectInstance extends RootPersistentEntity { this.overrideKylinProps = overrideKylinProps; } - public void init() { + public void init(KylinConfig config) { if (name == null) name = ProjectInstance.DEFAULT_PROJECT_NAME; @@ -321,6 +325,12 @@ public class ProjectInstance extends RootPersistentEntity { if (StringUtils.isBlank(this.name)) throw new IllegalStateException("Project name must not be blank"); + + this.config = KylinConfigExt.createInstance(config, overrideKylinProps); + } + + public KylinConfig getConfig() { + return config; } @Override diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java index b4431b4..4f2678f 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java @@ -155,7 +155,7 @@ public class ProjectManager { return null; } - projectInstance.init(); + projectInstance.init(config); projectMap.putLocal(projectInstance.getName(), projectInstance); clearL2Cache(); diff --git a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java index a881c86..02e4020 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java +++ b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java @@ -334,6 +334,10 @@ public class Message { return "Not Supported SQL."; } + public String getQUERY_TOO_MANY_RUNNING() { + return "Too many concurrent query requests."; + } + public String getTABLE_META_INCONSISTENT() { return "Table metadata inconsistent with JDBC meta."; } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 9c3d34f..2cdcafb 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -98,6 +98,7 @@ import org.apache.kylin.rest.request.PrepareSqlRequest; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.util.AclEvaluate; +import org.apache.kylin.rest.util.RequestUtil; import org.apache.kylin.rest.util.TableauInterceptor; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; @@ -392,6 +393,10 @@ public class QueryService extends BasicService { if (StringUtils.isBlank(sqlRequest.getProject())) { throw new BadRequestException(msg.getEMPTY_PROJECT_NAME()); } + ProjectInstance projectInstance = getProjectManager().getProject(sqlRequest.getProject()); + if (projectInstance == null) { + throw new BadRequestException(msg.getPROJECT_NOT_FOUND()); + } if (sqlRequest.getBackdoorToggles() != null) BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles()); @@ -405,6 +410,19 @@ public class QueryService extends BasicService { logger.info("The original query: " + sql); final boolean isSelect = QueryUtil.isSelectStatement(sql); + final boolean isPushDownUpdateEnabled = kylinConfig.isPushDownEnabled() + && kylinConfig.isPushDownUpdateEnabled(); + + if (!isSelect && !isPushDownUpdateEnabled) { + logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled"); + throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); + } + + if (!RequestUtil.openQueryRequest(projectInstance.getName(), + projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject())) { + logger.warn("Directly return exception as too many concurrent query requests for project:" + project); + throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING()); + } long startTime = System.currentTimeMillis(); @@ -417,19 +435,20 @@ public class QueryService extends BasicService { checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); if (queryCacheEnabled) { - sqlResponse = searchQueryInCache(sqlRequest); + try { // to deal with the case that cache searching throws exception + sqlResponse = searchQueryInCache(sqlRequest); + } catch (Throwable e) { + RequestUtil.closeQueryRequest(projectInstance.getName()); + throw e; + } } try { if (null == sqlResponse) { if (isSelect) { sqlResponse = query(sqlRequest); - } else if (kylinConfig.isPushDownEnabled() && kylinConfig.isPushDownUpdateEnabled()) { + } else if (isPushDownUpdateEnabled) { sqlResponse = update(sqlRequest); - } else { - logger.debug( - "Directly return exception as the sql is unsupported, and query pushdown is disabled"); - throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); } long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); @@ -481,6 +500,8 @@ public class QueryService extends BasicService { Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse)); } + } finally { + RequestUtil.closeQueryRequest(projectInstance.getName()); } logQuery(sqlRequest, sqlResponse); diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java new file mode 100644 index 0000000..0155306 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.util; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +public class RequestUtil { + private static final Logger logger = LoggerFactory.getLogger(RequestUtil.class); + + private static LoadingCache queryRequestMap = CacheBuilder.newBuilder() + .removalListener(new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + logger.info("Current running query number " + notification.getValue().get() + " for project " + + notification.getKey() + " is removed due to " + notification.getCause()); + } + }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader() { + @Override + public AtomicInteger load(String s) throws Exception { + return new AtomicInteger(0); + } + }); + + public static boolean openQueryRequest(String project, int maxConcurrentQuery) { + try { + AtomicInteger nRunningQueries = queryRequestMap.get(project); + for (;;) { + int nRunning = nRunningQueries.get(); + if (nRunning < maxConcurrentQuery) { + if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) { + return true; + } + } else { + return false; + } + } + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + public static void closeQueryRequest(String project) { + AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); + if (nRunningQueries != null) { + nRunningQueries.decrementAndGet(); + } + } + + public static Integer getCurrentRunningQuery(String project) { + AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); + if (nRunningQueries != null) { + return nRunningQueries.get(); + } else { + return null; + } + } +} -- 2.5.4 (Apple Git-61) From 063a6d8bc2f9bd2d41e4bf4063501c03cdecd487 Mon Sep 17 00:00:00 2001 From: Zhong Date: Sun, 3 Dec 2017 19:21:38 +0800 Subject: [PATCH 2/3] APACHE-KYLIN-2902: add unit test --- .../apache/kylin/rest/util/RequestUtilTest.java | 69 ++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java new file mode 100644 index 0000000..5445a86 --- /dev/null +++ b/server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Test; + +public class RequestUtilTest { + + @Test + public void testOpenAndCloseQueryRequest() { + int nThread = 5; + + final Integer maxConcurrentQuery = 2; + final String project = "test"; + + final AtomicInteger nQueryFailed = new AtomicInteger(0); + + Thread[] threads = new Thread[nThread]; + final CountDownLatch lock = new CountDownLatch(nThread); + for (int i = 0; i < nThread; i++) { + final int j = i; + threads[j] = new Thread(new Runnable() { + @Override + public void run() { + try { + boolean ifOpen = RequestUtil.openQueryRequest(project, maxConcurrentQuery); + lock.countDown(); + if (ifOpen) { + lock.await(); + RequestUtil.closeQueryRequest(project); + } else { + nQueryFailed.incrementAndGet(); + } + } catch (InterruptedException e) { + } + } + }); + threads[j].start(); + } + for (int i = 0; i < nThread; i++) { + try { + threads[i].join(); + } catch (InterruptedException e) { + } + } + Assert.assertEquals(new Integer(0), RequestUtil.getCurrentRunningQuery(project)); + Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get()); + } +} -- 2.5.4 (Apple Git-61) From db2335f0283d64bd67c1897201ece211d58ff8c3 Mon Sep 17 00:00:00 2001 From: Zhong Date: Mon, 11 Dec 2017 19:08:55 +0800 Subject: [PATCH 3/3] APACHE-KYLIN-2902: disable this feature by default --- .../org/apache/kylin/common/KylinConfigBase.java | 3 +- .../apache/kylin/rest/service/QueryService.java | 125 ++++++++++----------- .../apache/kylin/rest/util/QueryRequestUtil.java | 90 +++++++++++++++ .../org/apache/kylin/rest/util/RequestUtil.java | 84 -------------- .../kylin/rest/util/QueryRequestUtilTest.java | 69 ++++++++++++ .../apache/kylin/rest/util/RequestUtilTest.java | 69 ------------ 6 files changed, 222 insertions(+), 218 deletions(-) create mode 100644 server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java delete mode 100644 server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java create mode 100644 server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java delete mode 100644 server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 9a89bc5..acd48d4 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1098,7 +1098,8 @@ abstract public class KylinConfigBase implements Serializable { } public int getQueryConcurrentRunningThresholdForProject() { - return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "20")); + // by default there's no limitation + return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "0")); } public long getQueryMaxScanBytes() { diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 2cdcafb..84a184e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -98,7 +98,7 @@ import org.apache.kylin.rest.request.PrepareSqlRequest; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.util.AclEvaluate; -import org.apache.kylin.rest.util.RequestUtil; +import org.apache.kylin.rest.util.QueryRequestUtil; import org.apache.kylin.rest.util.TableauInterceptor; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; @@ -418,8 +418,8 @@ public class QueryService extends BasicService { throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); } - if (!RequestUtil.openQueryRequest(projectInstance.getName(), - projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject())) { + int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject(); + if (!QueryRequestUtil.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) { logger.warn("Directly return exception as too many concurrent query requests for project:" + project); throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING()); } @@ -430,78 +430,75 @@ public class QueryService extends BasicService { OLAPContext.clearThreadLocalContexts(); SQLResponse sqlResponse = null; - boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(), - "query cache disabled in KylinConfig") && // - checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); - - if (queryCacheEnabled) { - try { // to deal with the case that cache searching throws exception + try { // to deal with the case that cache searching throws exception + boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(), + "query cache disabled in KylinConfig") && // + checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); + if (queryCacheEnabled) { sqlResponse = searchQueryInCache(sqlRequest); - } catch (Throwable e) { - RequestUtil.closeQueryRequest(projectInstance.getName()); - throw e; } - } - try { - if (null == sqlResponse) { - if (isSelect) { - sqlResponse = query(sqlRequest); - } else if (isPushDownUpdateEnabled) { - sqlResponse = update(sqlRequest); - } + try { + if (null == sqlResponse) { + if (isSelect) { + sqlResponse = query(sqlRequest); + } else if (isPushDownUpdateEnabled) { + sqlResponse = update(sqlRequest); + } - long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); - long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold(); - long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold(); - sqlResponse.setDuration(System.currentTimeMillis() - startTime); - logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", // - String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()), - String.valueOf(sqlResponse.getTotalScanCount())); - if (checkCondition(queryCacheEnabled, "query cache is disabled") // - && checkCondition(!sqlResponse.getIsException(), "query has exception") // - && checkCondition(!(sqlResponse.isPushDown() - && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)), - "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") // - && checkCondition( - sqlResponse.getDuration() > durationThreshold - || sqlResponse.getTotalScanCount() > scanCountThreshold - || sqlResponse.getTotalScanBytes() > scanBytesThreshold, // - "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})", - sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(), - scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold) - && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(), - "query response is too large: {} ({})", sqlResponse.getResults().size(), - kylinConfig.getLargeQueryThreshold())) { - cacheManager.getCache(SUCCESS_QUERY_CACHE) - .put(new Element(sqlRequest.getCacheKey(), sqlResponse)); - } + long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); + long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold(); + long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold(); + sqlResponse.setDuration(System.currentTimeMillis() - startTime); + logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", // + String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()), + String.valueOf(sqlResponse.getTotalScanCount())); + if (checkCondition(queryCacheEnabled, "query cache is disabled") // + && checkCondition(!sqlResponse.getIsException(), "query has exception") // + && checkCondition(!(sqlResponse.isPushDown() + && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)), + "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") // + && checkCondition( + sqlResponse.getDuration() > durationThreshold + || sqlResponse.getTotalScanCount() > scanCountThreshold + || sqlResponse.getTotalScanBytes() > scanBytesThreshold, // + "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})", + sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(), + scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold) + && checkCondition( + sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(), + "query response is too large: {} ({})", sqlResponse.getResults().size(), + kylinConfig.getLargeQueryThreshold())) { + cacheManager.getCache(SUCCESS_QUERY_CACHE) + .put(new Element(sqlRequest.getCacheKey(), sqlResponse)); + } - } else { - sqlResponse.setDuration(System.currentTimeMillis() - startTime); - sqlResponse.setTotalScanCount(0); - sqlResponse.setTotalScanBytes(0); - } + } else { + sqlResponse.setDuration(System.currentTimeMillis() - startTime); + sqlResponse.setTotalScanCount(0); + sqlResponse.setTotalScanBytes(0); + } - checkQueryAuth(sqlResponse, project, secureEnabled); + checkQueryAuth(sqlResponse, project, secureEnabled); - } catch (Throwable e) { // calcite may throw AssertError - logger.error("Exception while executing query", e); - String errMsg = makeErrorMsgUserFriendly(e); + } catch (Throwable e) { // calcite may throw AssertError + logger.error("Exception while executing query", e); + String errMsg = makeErrorMsgUserFriendly(e); - sqlResponse = new SQLResponse(null, null, 0, true, errMsg); - sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e)); - sqlResponse.setTotalScanCount(queryContext.getScannedRows()); - sqlResponse.setTotalScanBytes(queryContext.getScannedBytes()); - sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList()); + sqlResponse = new SQLResponse(null, null, 0, true, errMsg); + sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e)); + sqlResponse.setTotalScanCount(queryContext.getScannedRows()); + sqlResponse.setTotalScanBytes(queryContext.getScannedBytes()); + sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList()); - if (queryCacheEnabled && e.getCause() != null - && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) { - Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); - exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse)); + if (queryCacheEnabled && e.getCause() != null + && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) { + Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); + exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse)); + } } } finally { - RequestUtil.closeQueryRequest(projectInstance.getName()); + QueryRequestUtil.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery); } logQuery(sqlRequest, sqlResponse); diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java new file mode 100644 index 0000000..3eb1670 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.util; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +public class QueryRequestUtil { + private static final Logger logger = LoggerFactory.getLogger(QueryRequestUtil.class); + + private static LoadingCache queryRequestMap = CacheBuilder.newBuilder() + .removalListener(new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + logger.info("Current running query number " + notification.getValue().get() + " for project " + + notification.getKey() + " is removed due to " + notification.getCause()); + } + }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader() { + @Override + public AtomicInteger load(String s) throws Exception { + return new AtomicInteger(0); + } + }); + + public static boolean openQueryRequest(String project, int maxConcurrentQuery) { + if (maxConcurrentQuery == 0) { + return true; + } + try { + AtomicInteger nRunningQueries = queryRequestMap.get(project); + for (;;) { + int nRunning = nRunningQueries.get(); + if (nRunning < maxConcurrentQuery) { + if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) { + return true; + } + } else { + return false; + } + } + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + public static void closeQueryRequest(String project, int maxConcurrentQuery) { + if (maxConcurrentQuery == 0) { + return; + } + AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); + if (nRunningQueries != null) { + nRunningQueries.decrementAndGet(); + } + } + + public static Integer getCurrentRunningQuery(String project) { + AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); + if (nRunningQueries != null) { + return nRunningQueries.get(); + } else { + return null; + } + } +} diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java deleted file mode 100644 index 0155306..0000000 --- a/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.rest.util; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; - -public class RequestUtil { - private static final Logger logger = LoggerFactory.getLogger(RequestUtil.class); - - private static LoadingCache queryRequestMap = CacheBuilder.newBuilder() - .removalListener(new RemovalListener() { - @Override - public void onRemoval(RemovalNotification notification) { - logger.info("Current running query number " + notification.getValue().get() + " for project " - + notification.getKey() + " is removed due to " + notification.getCause()); - } - }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader() { - @Override - public AtomicInteger load(String s) throws Exception { - return new AtomicInteger(0); - } - }); - - public static boolean openQueryRequest(String project, int maxConcurrentQuery) { - try { - AtomicInteger nRunningQueries = queryRequestMap.get(project); - for (;;) { - int nRunning = nRunningQueries.get(); - if (nRunning < maxConcurrentQuery) { - if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) { - return true; - } - } else { - return false; - } - } - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - - public static void closeQueryRequest(String project) { - AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); - if (nRunningQueries != null) { - nRunningQueries.decrementAndGet(); - } - } - - public static Integer getCurrentRunningQuery(String project) { - AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); - if (nRunningQueries != null) { - return nRunningQueries.get(); - } else { - return null; - } - } -} diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java new file mode 100644 index 0000000..fb6d2ff --- /dev/null +++ b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Test; + +public class QueryRequestUtilTest { + + @Test + public void testOpenAndCloseQueryRequest() { + int nThread = 5; + + final Integer maxConcurrentQuery = 2; + final String project = "test"; + + final AtomicInteger nQueryFailed = new AtomicInteger(0); + + Thread[] threads = new Thread[nThread]; + final CountDownLatch lock = new CountDownLatch(nThread); + for (int i = 0; i < nThread; i++) { + final int j = i; + threads[j] = new Thread(new Runnable() { + @Override + public void run() { + try { + boolean ifOpen = QueryRequestUtil.openQueryRequest(project, maxConcurrentQuery); + lock.countDown(); + if (ifOpen) { + lock.await(); + QueryRequestUtil.closeQueryRequest(project, maxConcurrentQuery); + } else { + nQueryFailed.incrementAndGet(); + } + } catch (InterruptedException e) { + } + } + }); + threads[j].start(); + } + for (int i = 0; i < nThread; i++) { + try { + threads[i].join(); + } catch (InterruptedException e) { + } + } + Assert.assertEquals(new Integer(0), QueryRequestUtil.getCurrentRunningQuery(project)); + Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get()); + } +} diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java deleted file mode 100644 index 5445a86..0000000 --- a/server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.rest.util; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Assert; -import org.junit.Test; - -public class RequestUtilTest { - - @Test - public void testOpenAndCloseQueryRequest() { - int nThread = 5; - - final Integer maxConcurrentQuery = 2; - final String project = "test"; - - final AtomicInteger nQueryFailed = new AtomicInteger(0); - - Thread[] threads = new Thread[nThread]; - final CountDownLatch lock = new CountDownLatch(nThread); - for (int i = 0; i < nThread; i++) { - final int j = i; - threads[j] = new Thread(new Runnable() { - @Override - public void run() { - try { - boolean ifOpen = RequestUtil.openQueryRequest(project, maxConcurrentQuery); - lock.countDown(); - if (ifOpen) { - lock.await(); - RequestUtil.closeQueryRequest(project); - } else { - nQueryFailed.incrementAndGet(); - } - } catch (InterruptedException e) { - } - } - }); - threads[j].start(); - } - for (int i = 0; i < nThread; i++) { - try { - threads[i].join(); - } catch (InterruptedException e) { - } - } - Assert.assertEquals(new Integer(0), RequestUtil.getCurrentRunningQuery(project)); - Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get()); - } -} -- 2.5.4 (Apple Git-61)