From 8ffcbf3c0fa25f1b7da8d6bff43c607d94d469f2 Mon Sep 17 00:00:00 2001 From: "Ma,Gang" Date: Wed, 13 Jul 2016 11:51:15 +0800 Subject: [PATCH] KYLIN-1872 Make query visible and interruptible, improve server's stablility --- .../apache/kylin/metadata/query/QueryManager.java | 114 +++++++++++ .../org/apache/kylin/metadata/query/QueryStep.java | 223 +++++++++++++++++++++ .../apache/kylin/metadata/query/RunningQuery.java | 139 +++++++++++++ .../kylin/metadata/query/QueryManagerTest.java | 66 ++++++ .../storage/gtrecord/GTCubeStorageQueryBase.java | 6 + .../query/relnode/OLAPToEnumerableConverter.java | 6 +- .../kylin/rest/controller/QueryController.java | 45 ++++- .../apache/kylin/rest/service/QueryService.java | 4 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 68 ++++++- 9 files changed, 653 insertions(+), 18 deletions(-) create mode 100644 core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryManager.java create mode 100644 core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryStep.java create mode 100644 core-metadata/src/main/java/org/apache/kylin/metadata/query/RunningQuery.java create mode 100644 core-metadata/src/test/java/org/apache/kylin/metadata/query/QueryManagerTest.java diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryManager.java new file mode 100644 index 0000000..75ee1a2 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryManager.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.query; + +import java.util.ArrayList; +import java.util.List; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +public class QueryManager { + private static final Logger logger = LoggerFactory.getLogger(QueryManager.class); + private static QueryManager instance = new QueryManager(); + + private final ConcurrentMap threadQueryMap = Maps.newConcurrentMap(); + private final ConcurrentMap idQueryMap = Maps.newConcurrentMap(); + + public static QueryManager getInstance() { + return instance; + } + + public RunningQuery startQuery(String project, String sql) { + String queryId = genQueryId(); + Thread currThread = Thread.currentThread(); + RunningQuery query = new RunningQuery(project, queryId, sql, currThread); + threadQueryMap.put(currThread, query); + idQueryMap.put(queryId, query); + return query; + } + + private String genQueryId() { + return UUID.randomUUID().toString(); + } + + public void endQuery(String queryId) { + if (queryId == null) { + logger.warn("query id is null"); + return; + } + cleanQuery(queryId); + } + + public void stopQuery(String queryId, String info) { + RunningQuery query = idQueryMap.get(queryId); + if (query != null) { + query.stop(info); + cleanQuery(queryId); + } else { + logger.info("the query:{} is not existed", queryId); + } + } + + private void cleanQuery(String queryId) { + RunningQuery query = idQueryMap.remove(queryId); + if (query != null) { + threadQueryMap.remove(query.getExecThread()); + } + } + + public RunningQuery getCurrentRunningQuery() { + Thread thread = Thread.currentThread(); + return getRunningQuery(thread); + } + + public RunningQuery getRunningQuery(Thread thread) { + return threadQueryMap.get(thread); + } + + public List getAllRunningQueries() { + TreeSet queriesSet = new TreeSet<>(); + for (RunningQuery runningQuery : idQueryMap.values()) { + queriesSet.add(runningQuery); + } + return new ArrayList<>(queriesSet); + } + + /** + * + * @param runningTime in milliseconds + * @return running queries that have run more than specified time + */ + public List getLongRunningQueries(int runningTime) { + List allRunningQueries = getAllRunningQueries(); + long targetStartTime = System.currentTimeMillis() - runningTime; + int i=0; + for (;i targetStartTime){ + break; + } + } + return allRunningQueries.subList(0,i); + } +} diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryStep.java b/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryStep.java new file mode 100644 index 0000000..eb9a9cc --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryStep.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.query; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonProperty; + +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) +public class QueryStep { + public static final String LINE_SEP = System.getProperty("line.separator"); + + public enum Status { + RUNNING, STOPPED, SUCCEED + } + + @JsonProperty("name") + private String name; + + @JsonProperty("start_time") + private long startTime; + + @JsonProperty("end_time") + private long endTime; + + @JsonProperty("status") + private volatile Status status = Status.RUNNING; + + @JsonProperty("attributes") + private Map attributes = new HashMap<>(); + + @JsonProperty("sub_steps") + private List subSteps = new CopyOnWriteArrayList<>(); + + private Thread execThread; + + public QueryStep(String name) { + this.name = name; + } + + public QueryStep startSubStep(String stepName) { + return startSubStep(stepName, Thread.currentThread()); + } + + public QueryStep startSubStep(String stepName, Thread execThread) { + QueryStep step = new QueryStep(stepName); + step.setStartTime(System.currentTimeMillis()); + step.setExecThread(execThread); + addSubStep(step); + + return step; + } + + public void finishSubStep(String stepName) { + finishSubStep(stepName, Thread.currentThread()); + } + + public void finishSubStep(String stepName, Thread thread) { + QueryStep step = getStepByName(stepName, thread, Status.RUNNING); + if (step == null) { + throw new IllegalStateException("the step:" + stepName + " is not exist."); + } + finishSubStep(step); + } + + public void finishSubStep(QueryStep step) { + step.setEndTime(System.currentTimeMillis()); + step.status = Status.SUCCEED; + } + + + + public boolean isRunning(){ + return this.status == Status.RUNNING; + } + + public QueryStep addAttribute(String key, String value) { + attributes.put(key, value); + return this; + } + + public QueryStep getRunningStep(Thread thread) { + for (QueryStep step : getSubSteps()) { + if (step.isRunning() && thread.equals(step.getExecThread())) { + return step; + } + } + return null; + } + + private void addSubStep(QueryStep step) { + subSteps.add(step); + } + + @JsonProperty("thread") + public ThreadInfo getThread(){ + String stackTraceStr = null; + if (isRunning()) { + int maxStackTraceDepth = 50; + int current = 0; + + StackTraceElement[] stackTrace = execThread.getStackTrace(); + StringBuilder buf = new StringBuilder(); + buf.append(LINE_SEP); + for (StackTraceElement e : stackTrace) { + if (++current > maxStackTraceDepth) { + break; + } + buf.append("\t").append("at ").append(e.toString()).append(LINE_SEP); + } + stackTraceStr = buf.toString(); + } + + + ThreadInfo threadInfo = new ThreadInfo(); + threadInfo.threadName = execThread.getName(); + threadInfo.stackTrace = stackTraceStr; + return threadInfo; + } + + /** + * + * @param name + * @param thread + * @param status + * @return null if step not exist + */ + private QueryStep getStepByName(String name, Thread thread, Status status) { + for (QueryStep step : subSteps) { + if (step.name.equals(name) && step.execThread.equals(thread) && step.status == status) { + return step; + } + } + return null; + } + + public String getName() { + return name; + } + + public Thread getExecThread() { + return execThread; + } + + public void setExecThread(Thread execThread) { + this.execThread = execThread; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getEndTime() { + return endTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + this.status = status; + } + + public List getSubSteps() { + return subSteps; + } + + @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) + public static class ThreadInfo{ + @JsonProperty("name") + public String threadName; + @JsonProperty("stack_trace") + public String stackTrace; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + QueryStep queryStep = (QueryStep) o; + + if (!name.equals(queryStep.name)) return false; + return execThread.equals(queryStep.execThread); + + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + execThread.hashCode(); + return result; + } +} diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/query/RunningQuery.java b/core-metadata/src/main/java/org/apache/kylin/metadata/query/RunningQuery.java new file mode 100644 index 0000000..fb016b1 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/query/RunningQuery.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.query; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonProperty; + +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) +public class RunningQuery extends QueryStep implements Comparable { + public static final String SQL_PARSE_STEP = "parse_sql"; + public static final String QUERY_PLAN_STEP = "query_plan"; + + @JsonProperty("project") + private String projectName; + @JsonProperty("query_id") + private String queryId; + @JsonProperty("sql") + private String sql; + + private String stopReason = ""; + + @JsonProperty("incoming_count") + private AtomicInteger numIncomingRecords = new AtomicInteger(0); + + private List stopListeners = new ArrayList<>(); + + public RunningQuery(String projectName, String queryId, String sql, Thread mainQueryThread){ + super("main_query"); + setExecThread(mainQueryThread); + setStartTime(System.currentTimeMillis()); + this.projectName = projectName; + this.queryId = queryId; + this.sql = sql; + } + + public QueryStep startSqlParse() { + return startSubStep(SQL_PARSE_STEP); + } + + public void finishSqlParse() { + finishSubStep(SQL_PARSE_STEP); + } + + public QueryStep startQueryPlan() { + return startSubStep(QUERY_PLAN_STEP); + } + + public void finishQueryPlan() { + finishSubStep(QUERY_PLAN_STEP); + } + + + /** + * stop the query + */ + public void stop(String reason) { + if (isStopped()){ + return; + } + setStatus(Status.STOPPED); + setStopReason(reason); + for (QueryStopListener stopListener : stopListeners) { + stopListener.stop(this); + } + } + + public boolean isStopped() { + return getStatus() == Status.STOPPED; + } + + public String getStopReason() { + return stopReason; + } + + public void setStopReason(String stopReason) { + this.stopReason = stopReason; + } + + public String getProjectName() { + return projectName; + } + + public String getQueryId() { + return queryId; + } + + public String getSql() { + return sql; + } + + public int incAndGetIncomingRecords(int recordCnt) { + return numIncomingRecords.addAndGet(recordCnt); + } + + public void addQueryStopListener(QueryStopListener listener) { + this.stopListeners.add(listener); + } + + public String toString(){ + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + for (QueryStep step : getSubSteps()) { + pw.print(step.getName() + ":"); + } + + return sw.toString(); + } + + @Override + public int compareTo(RunningQuery q) { + return (int)(this.getStartTime() - q.getStartTime()); + } + + public interface QueryStopListener { + void stop(RunningQuery query); + } +} diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/query/QueryManagerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/query/QueryManagerTest.java new file mode 100644 index 0000000..c99bd36 --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/metadata/query/QueryManagerTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.query; + + +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +import java.util.List; + +public class QueryManagerTest { + private QueryManager queryManager; + + @Before + public void setup(){ + queryManager = QueryManager.getInstance(); + } + + @Test + public void testGetLongRunningQueries() throws Exception{ + List queries = queryManager.getLongRunningQueries(10000); + assertEquals(0,queries.size()); + + queryManager.startQuery("test","select * from test"); + Thread.sleep(1000); + queries = queryManager.getLongRunningQueries(1000); + assertEquals(1,queries.size()); + } + + @Test + public void testGetAllRunningQueries() throws Exception{ + List queries = queryManager.getAllRunningQueries(); + assertEquals(0,queries.size()); + + RunningQuery query= queryManager.startQuery("test","select * from test"); + Thread t = new Thread(){ + public void run(){ + List queries = queryManager.getAllRunningQueries(); + assertEquals(1,queries.size()); + } + }; + + t.start(); + t.join(); + queryManager.endQuery(query.getQueryId()); + + assertEquals(0, queryManager.getAllRunningQueries().size()); + } +} diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index e58e74a..ff12f6a 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -43,6 +43,8 @@ import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.PartitionDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.query.QueryManager; +import org.apache.kylin.metadata.query.QueryStep; import org.apache.kylin.metadata.realization.SQLDigest; import org.apache.kylin.metadata.tuple.ITupleIterator; import org.apache.kylin.metadata.tuple.TupleInfo; @@ -69,6 +71,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { @Override public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { + QueryStep queryPlanStep = QueryManager.getInstance().getCurrentRunningQuery().startQueryPlan(); // allow custom measures hack notifyBeforeStorageQuery(sqlDigest); @@ -96,6 +99,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { dimensionsD.addAll(otherDimsD); Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc, dimensionsD, metrics); context.setCuboid(cuboid); + queryPlanStep.addAttribute("input_cuboid_id", String.valueOf(cuboid.getInputID())); + queryPlanStep.addAttribute("cuboid_id", String.valueOf(cuboid.getId())); // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine Set singleValuesD = findSingleValueColumns(filter); @@ -118,6 +123,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { scanners.add(scanner); } + QueryManager.getInstance().getCurrentRunningQuery().finishQueryPlan(); if (scanners.isEmpty()) return ITupleIterator.EMPTY_TUPLE_ITERATOR; diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java index de7e7e2..8fd81c6 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java @@ -38,6 +38,7 @@ import org.apache.calcite.rel.convert.ConverterImpl; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.kylin.metadata.query.QueryManager; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.query.routing.NoRealizationFoundException; import org.apache.kylin.query.routing.QueryRouter; @@ -104,8 +105,9 @@ public class OLAPToEnumerableConverter extends ConverterImpl implements Enumerab System.out.println("EXECUTION PLAN AFTER REWRITE"); System.out.println(dumpPlan); } - - return impl.visitChild(this, 0, inputAsEnum, pref); + Result result= impl.visitChild(this, 0, inputAsEnum, pref); + QueryManager.getInstance().getCurrentRunningQuery().finishSqlParse(); + return result; } private Result buildHiveResult(EnumerableRelImplementor enumImplementor, Prefer pref, OLAPContext context) { diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java index 93b71ad..93d4cc6 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java @@ -30,6 +30,9 @@ import org.apache.commons.io.IOUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.metadata.query.QueryManager; +import org.apache.kylin.metadata.query.QueryStep; +import org.apache.kylin.metadata.query.RunningQuery; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.model.Query; @@ -49,11 +52,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.access.AccessDeniedException; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Controller; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; -import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.*; import org.supercsv.io.CsvListWriter; import org.supercsv.io.ICsvListWriter; import org.supercsv.prefs.CsvPreference; @@ -165,7 +164,32 @@ public class QueryController extends BasicController { } } + /** + * + * @param runTimeMoreThan in seconds + * @return + */ + @RequestMapping(value = "/query/runningQueries", method = RequestMethod.GET) + @ResponseBody + public List getRunningQueries(@RequestParam(value = "runTimeMoreThan", required = false, defaultValue = "-1") int runTimeMoreThan) { + if (runTimeMoreThan == -1) { + return QueryManager.getInstance().getAllRunningQueries(); + }else { + return QueryManager.getInstance().getLongRunningQueries(runTimeMoreThan * 1000); + } + } + + @RequestMapping(value = "/query/{queryId}/stop", method = RequestMethod.PUT) + @ResponseBody + public void stopQuery(@PathVariable String queryId) { + final String user = SecurityContextHolder.getContext().getAuthentication().getName(); + logger.info("{} stop the query: {}", new Object[]{user,queryId}); + QueryManager.getInstance().stopQuery(queryId, "stopped by " + user); + } + private SQLResponse doQueryWithCache(SQLRequest sqlRequest) { + QueryManager queryManager = QueryManager.getInstance(); + RunningQuery query = queryManager.startQuery(sqlRequest.getProject(),sqlRequest.getSql());; try { BackdoorToggles.setToggles(sqlRequest.getBackdoorToggles()); @@ -183,10 +207,9 @@ public class QueryController extends BasicController { logger.debug("Directly return exception as not supported"); throw new InternalErrorException("Not Supported SQL."); } - long startTime = System.currentTimeMillis(); - SQLResponse sqlResponse = searchQueryInCache(sqlRequest); + SQLResponse sqlResponse = searchQueryInCache(sqlRequest,query); try { if (null == sqlResponse) { sqlResponse = queryService.query(sqlRequest); @@ -227,10 +250,13 @@ public class QueryController extends BasicController { } finally { BackdoorToggles.cleanToggles(); + queryManager.endQuery(query.getQueryId()); } } - private SQLResponse searchQueryInCache(SQLRequest sqlRequest) { + private SQLResponse searchQueryInCache(SQLRequest sqlRequest, RunningQuery query) { + QueryStep cacheStep = query.startSubStep("query_cache"); + SQLResponse response = null; Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); Cache successCache = cacheManager.getCache(SUCCESS_QUERY_CACHE); @@ -241,14 +267,17 @@ public class QueryController extends BasicController { Element element = exceptionCache.get(sqlRequest); response = (SQLResponse) element.getObjectValue(); response.setHitExceptionCache(true); + cacheStep.addAttribute("hit_cache","exception_cache"); } else if (successCache.get(sqlRequest) != null) { logger.info("The sqlResponse is found in SUCCESS_QUERY_CACHE"); Element element = successCache.get(sqlRequest); response = (SQLResponse) element.getObjectValue(); response.setStorageCacheUsed(true); + cacheStep.addAttribute("hit_cache","success_cache"); } } + query.finishSubStep(cacheStep); return response; } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 84a5c67..dcedfce 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -54,6 +54,7 @@ import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.metadata.query.QueryManager; import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.model.ColumnMeta; @@ -117,9 +118,7 @@ public class QueryService extends BasicService { public SQLResponse query(SQLRequest sqlRequest) throws Exception { try { badQueryDetector.queryStart(Thread.currentThread(), sqlRequest); - return queryWithSqlMassage(sqlRequest); - } finally { badQueryDetector.queryEnd(Thread.currentThread()); } @@ -352,6 +351,7 @@ public class QueryService extends BasicService { try { conn = cacheService.getOLAPDataSource(sqlRequest.getProject()).getConnection(); + QueryManager.getInstance().getCurrentRunningQuery().startSqlParse(); if (sqlRequest instanceof PrepareSqlRequest) { PreparedStatement preparedState = conn.prepareStatement(sql); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 78ad18d..eb6be70 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -56,6 +56,9 @@ import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRange; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.metadata.query.QueryManager; +import org.apache.kylin.metadata.query.QueryStep; +import org.apache.kylin.metadata.query.RunningQuery; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; @@ -88,10 +91,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { long timeout; long timeoutTS; volatile Throwable coprocException; + RunningQuery runningQuery; - public ExpectedSizeIterator(int expectedSize) { + public ExpectedSizeIterator(RunningQuery runningQuery, int expectedSize) { this.expectedSize = expectedSize; this.queue = new ArrayBlockingQueue(expectedSize); + this.runningQuery = runningQuery; Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); this.timeout = hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5) * hconf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60000); @@ -123,6 +128,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { current++; byte[] ret = null; + checkQueryState(); while (ret == null && coprocException == null && timeoutTS - System.currentTimeMillis() > 0) { ret = queue.poll(5000, TimeUnit.MILLISECONDS); } @@ -135,6 +141,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { return ret; } } catch (InterruptedException e) { + checkQueryState(); throw new RuntimeException("Error when waiting queue", e); } } @@ -146,12 +153,19 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { public void append(byte[] data) { try { + checkQueryState(); queue.put(data); } catch (InterruptedException e) { throw new RuntimeException("error when waiting queue", e); } } + private void checkQueryState() { + if (runningQuery.isStopped()) { + throw new IllegalStateException("the query is stopped: " + runningQuery.getStopReason()); + } + } + public long getTimeout() { return timeout; } @@ -268,7 +282,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { logger.debug("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle); Pair shardNumAndBaseShard = getShardNumAndBaseShard(); - short shardNum = shardNumAndBaseShard.getFirst(); + final short shardNum = shardNumAndBaseShard.getFirst(); short cuboidBaseShard = shardNumAndBaseShard.getSecond(); int totalShards = cubeSeg.getTotalShards(); @@ -328,10 +342,25 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { logScan(rs, cubeSeg.getStorageLocationIdentifier()); } - logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum, cuboidBaseShard, rawScans.size()); + final RunningQuery query = QueryManager.getInstance().getCurrentRunningQuery(); + final QueryStep segmentQueryStep = query.startSubStep("segment_query"); + segmentQueryStep.addAttribute("segment", cubeSeg.getName()); + segmentQueryStep.addAttribute("htable", cubeSeg.getStorageLocationIdentifier()); + + query.addQueryStopListener(new RunningQuery.QueryStopListener() { + @Override + public void stop(RunningQuery query) { + if (segmentQueryStep.isRunning()) { + segmentQueryStep.getExecThread().interrupt(); + } + } + }); + + logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}, query id {}", shardNum, cuboidBaseShard, rawScans.size(), query.getQueryId()); final AtomicInteger totalScannedCount = new AtomicInteger(0); - final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum); + final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(query,shardNum); + final AtomicInteger totalResponseCount = new AtomicInteger(0); // KylinConfig: use env instance instead of CubeSegment, because KylinConfig will share among queries // for different cubes until redeployment of coprocessor jar. @@ -348,10 +377,13 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { builder.setTimeout(epResultItr.getTimeout()); builder.setKylinProperties(kylinConfig.getConfigAsString()); + final int limitCount = getConfigLimit(); for (final Pair epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) { executorService.submit(new Runnable() { @Override public void run() { + final QueryStep epRangeStep = segmentQueryStep.startSubStep("endpoint_range_request"); + epRangeStep.addAttribute("ep_range", BytesUtil.toHex(epRange.getFirst()) + "-" + BytesUtil.toHex(epRange.getSecond())); final String logHeader = " "; final boolean[] abnormalFinish = new boolean[1]; @@ -366,6 +398,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { table.coprocessorService(CubeVisitService.class, startKey, endKey, // new Batch.Call() { public CubeVisitResponse call(CubeVisitService rowsService) throws IOException { + QueryStep regionRPCStep = epRangeStep.startSubStep("region_server_rpc"); ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback rpcCallback = new BlockingRpcCallback<>(); rowsService.visitCube(controller, request, rpcCallback); @@ -373,6 +406,13 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { if (controller.failedOnException()) { throw controller.getFailedOn(); } + int totalIncomingRecords = query.incAndGetIncomingRecords(getIncomingRecordSize(response)); + if (totalIncomingRecords >= limitCount) { + logger.warn("the query result size {} is too large to return, stop the query", totalIncomingRecords); + query.stop("Scan row count exceeded, please add filter condition to narrow down backend scan range, like where clause."); + throw new RuntimeException("the query is stopped because of too large result"); + } + epRangeStep.finishSubStep(regionRPCStep); return response; } }, new Batch.Callback() { @@ -382,12 +422,13 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { return; totalScannedCount.addAndGet(result.getStats().getScannedRowCount()); - logger.info(logHeader + getStatsString(region, result)); + logger.info(logHeader + getStatsString(region, result, query)); if (result.getStats().getNormalComplete() != 1) { abnormalFinish[0] = true; return; } + totalResponseCount.incrementAndGet(); try { if (compressionResult) { epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()))); @@ -400,6 +441,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } }); + segmentQueryStep.finishSubStep(epRangeStep); + if (totalResponseCount.get() == shardNum) { + query.finishSubStep(segmentQueryStep); + } } catch (Throwable ex) { logger.error(logHeader + "Error when visiting cubes by endpoint", ex); // double log coz the query thread may already timeout epResultItr.notifyCoprocException(ex); @@ -419,9 +464,20 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get()); } - private String getStatsString(byte[] region, CubeVisitResponse result) { + private int getConfigLimit() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + return config.getScanThreshold(); + } + + private int getIncomingRecordSize(CubeVisitProtos.CubeVisitResponse response) { + Stats stats = response.getStats(); + return stats.getScannedRowCount() - stats.getAggregatedRowCount(); + } + + private String getStatsString(byte[] region, CubeVisitResponse result, RunningQuery query) { StringBuilder sb = new StringBuilder(); Stats stats = result.getStats(); + sb.append("Query ID: " + query.getQueryId() + ". "); sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname()).append("."); sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". "); sb.append("Total filtered/aggred row: ").append(stats.getAggregatedRowCount()).append(". "); -- 2.6.4