From 5db4b7910ecae08df544022aa91f5137247c8e68 Mon Sep 17 00:00:00 2001 From: Wang Ken Date: Fri, 1 Dec 2017 16:53:14 +0800 Subject: [PATCH 1/2] APACHE-KYLIN-2902: add query pending request limit to control thread consumption for each project Signed-off-by: Zhong --- .../org/apache/kylin/common/KylinConfigBase.java | 4 ++ .../kylin/metadata/project/ProjectInstance.java | 12 +++- .../kylin/metadata/project/ProjectManager.java | 2 +- .../java/org/apache/kylin/rest/msg/Message.java | 4 ++ .../apache/kylin/rest/service/QueryService.java | 33 +++++++-- .../org/apache/kylin/rest/util/RequestUtil.java | 84 ++++++++++++++++++++++ 6 files changed, 131 insertions(+), 8 deletions(-) create mode 100644 server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index e1a10a8..9a89bc5 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1097,6 +1097,10 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.query.scan-threshold", "10000000")); } + public int getQueryConcurrentRunningThresholdForProject() { + return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "20")); + } + public long getQueryMaxScanBytes() { long value = Long.parseLong(getOptional("kylin.query.max-scan-bytes", "0")); return value > 0 ? value : Long.MAX_VALUE; diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java index 1f54416..a7d37e7 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java @@ -27,6 +27,8 @@ import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.metadata.realization.RealizationType; @@ -84,6 +86,8 @@ public class ProjectInstance extends RootPersistentEntity { @JsonInclude(JsonInclude.Include.NON_NULL) private LinkedHashMap overrideKylinProps; + private KylinConfigExt config; + public String getResourcePath() { return concatResourcePath(name); } @@ -304,7 +308,7 @@ public class ProjectInstance extends RootPersistentEntity { this.overrideKylinProps = overrideKylinProps; } - public void init() { + public void init(KylinConfig config) { if (name == null) name = ProjectInstance.DEFAULT_PROJECT_NAME; @@ -321,6 +325,12 @@ public class ProjectInstance extends RootPersistentEntity { if (StringUtils.isBlank(this.name)) throw new IllegalStateException("Project name must not be blank"); + + this.config = KylinConfigExt.createInstance(config, overrideKylinProps); + } + + public KylinConfig getConfig() { + return config; } @Override diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java index b4431b4..4f2678f 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java @@ -155,7 +155,7 @@ public class ProjectManager { return null; } - projectInstance.init(); + projectInstance.init(config); projectMap.putLocal(projectInstance.getName(), projectInstance); clearL2Cache(); diff --git a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java index a881c86..02e4020 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java +++ b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java @@ -334,6 +334,10 @@ public class Message { return "Not Supported SQL."; } + public String getQUERY_TOO_MANY_RUNNING() { + return "Too many concurrent query requests."; + } + public String getTABLE_META_INCONSISTENT() { return "Table metadata inconsistent with JDBC meta."; } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 9c3d34f..2cdcafb 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -98,6 +98,7 @@ import org.apache.kylin.rest.request.PrepareSqlRequest; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.util.AclEvaluate; +import org.apache.kylin.rest.util.RequestUtil; import org.apache.kylin.rest.util.TableauInterceptor; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; @@ -392,6 +393,10 @@ public class QueryService extends BasicService { if (StringUtils.isBlank(sqlRequest.getProject())) { throw new BadRequestException(msg.getEMPTY_PROJECT_NAME()); } + ProjectInstance projectInstance = getProjectManager().getProject(sqlRequest.getProject()); + if (projectInstance == null) { + throw new BadRequestException(msg.getPROJECT_NOT_FOUND()); + } if (sqlRequest.getBackdoorToggles() != null) BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles()); @@ -405,6 +410,19 @@ public class QueryService extends BasicService { logger.info("The original query: " + sql); final boolean isSelect = QueryUtil.isSelectStatement(sql); + final boolean isPushDownUpdateEnabled = kylinConfig.isPushDownEnabled() + && kylinConfig.isPushDownUpdateEnabled(); + + if (!isSelect && !isPushDownUpdateEnabled) { + logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled"); + throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); + } + + if (!RequestUtil.openQueryRequest(projectInstance.getName(), + projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject())) { + logger.warn("Directly return exception as too many concurrent query requests for project:" + project); + throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING()); + } long startTime = System.currentTimeMillis(); @@ -417,19 +435,20 @@ public class QueryService extends BasicService { checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); if (queryCacheEnabled) { - sqlResponse = searchQueryInCache(sqlRequest); + try { // to deal with the case that cache searching throws exception + sqlResponse = searchQueryInCache(sqlRequest); + } catch (Throwable e) { + RequestUtil.closeQueryRequest(projectInstance.getName()); + throw e; + } } try { if (null == sqlResponse) { if (isSelect) { sqlResponse = query(sqlRequest); - } else if (kylinConfig.isPushDownEnabled() && kylinConfig.isPushDownUpdateEnabled()) { + } else if (isPushDownUpdateEnabled) { sqlResponse = update(sqlRequest); - } else { - logger.debug( - "Directly return exception as the sql is unsupported, and query pushdown is disabled"); - throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); } long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); @@ -481,6 +500,8 @@ public class QueryService extends BasicService { Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse)); } + } finally { + RequestUtil.closeQueryRequest(projectInstance.getName()); } logQuery(sqlRequest, sqlResponse); diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java new file mode 100644 index 0000000..0155306 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.util; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +public class RequestUtil { + private static final Logger logger = LoggerFactory.getLogger(RequestUtil.class); + + private static LoadingCache queryRequestMap = CacheBuilder.newBuilder() + .removalListener(new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + logger.info("Current running query number " + notification.getValue().get() + " for project " + + notification.getKey() + " is removed due to " + notification.getCause()); + } + }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader() { + @Override + public AtomicInteger load(String s) throws Exception { + return new AtomicInteger(0); + } + }); + + public static boolean openQueryRequest(String project, int maxConcurrentQuery) { + try { + AtomicInteger nRunningQueries = queryRequestMap.get(project); + for (;;) { + int nRunning = nRunningQueries.get(); + if (nRunning < maxConcurrentQuery) { + if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) { + return true; + } + } else { + return false; + } + } + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + public static void closeQueryRequest(String project) { + AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); + if (nRunningQueries != null) { + nRunningQueries.decrementAndGet(); + } + } + + public static Integer getCurrentRunningQuery(String project) { + AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); + if (nRunningQueries != null) { + return nRunningQueries.get(); + } else { + return null; + } + } +} -- 2.5.4 (Apple Git-61) From 063a6d8bc2f9bd2d41e4bf4063501c03cdecd487 Mon Sep 17 00:00:00 2001 From: Zhong Date: Sun, 3 Dec 2017 19:21:38 +0800 Subject: [PATCH 2/2] APACHE-KYLIN-2902: add unit test --- .../apache/kylin/rest/util/RequestUtilTest.java | 69 ++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java new file mode 100644 index 0000000..5445a86 --- /dev/null +++ b/server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Test; + +public class RequestUtilTest { + + @Test + public void testOpenAndCloseQueryRequest() { + int nThread = 5; + + final Integer maxConcurrentQuery = 2; + final String project = "test"; + + final AtomicInteger nQueryFailed = new AtomicInteger(0); + + Thread[] threads = new Thread[nThread]; + final CountDownLatch lock = new CountDownLatch(nThread); + for (int i = 0; i < nThread; i++) { + final int j = i; + threads[j] = new Thread(new Runnable() { + @Override + public void run() { + try { + boolean ifOpen = RequestUtil.openQueryRequest(project, maxConcurrentQuery); + lock.countDown(); + if (ifOpen) { + lock.await(); + RequestUtil.closeQueryRequest(project); + } else { + nQueryFailed.incrementAndGet(); + } + } catch (InterruptedException e) { + } + } + }); + threads[j].start(); + } + for (int i = 0; i < nThread; i++) { + try { + threads[i].join(); + } catch (InterruptedException e) { + } + } + Assert.assertEquals(new Integer(0), RequestUtil.getCurrentRunningQuery(project)); + Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get()); + } +} -- 2.5.4 (Apple Git-61)