From eb1307604b6b29e2f39ed157016d3cd0e03442f3 Mon Sep 17 00:00:00 2001 From: "Ma,Gang" Date: Mon, 4 Jul 2016 21:24:22 +0800 Subject: [PATCH] KYLIN-1872 Make query visible and interruptible, improve server's stablility --- .../apache/kylin/metadata/query/QueryManager.java | 92 +++++++++ .../org/apache/kylin/metadata/query/QueryStep.java | 220 +++++++++++++++++++++ .../apache/kylin/metadata/query/RunningQuery.java | 138 +++++++++++++ .../storage/cache/CacheFledgedStaticQuery.java | 9 +- .../query/relnode/OLAPToEnumerableConverter.java | 6 +- .../kylin/rest/controller/QueryController.java | 16 ++ .../apache/kylin/rest/service/QueryService.java | 8 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 86 +++++++- .../storage/hbase/cube/v2/CubeStorageQuery.java | 4 +- 9 files changed, 565 insertions(+), 14 deletions(-) create mode 100644 core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryManager.java create mode 100644 core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryStep.java create mode 100644 core-metadata/src/main/java/org/apache/kylin/metadata/query/RunningQuery.java diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryManager.java new file mode 100644 index 0000000..f5fc455 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryManager.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.query; + +import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; + +public class QueryManager { + private static final Logger logger = LoggerFactory.getLogger(QueryManager.class); + private static QueryManager instance = new QueryManager(); + + private final ConcurrentMap threadQueryMap = Maps.newConcurrentMap(); + private final ConcurrentMap idQueryMap = Maps.newConcurrentMap(); + + public static QueryManager getInstance() { + return instance; + } + + public String startQuery(String project, String sql) { + String queryId = UUID.randomUUID().toString(); + Thread currThread = Thread.currentThread(); + RunningQuery query = new RunningQuery(project, queryId, sql, currThread); + threadQueryMap.put(currThread, query); + idQueryMap.put(queryId, query); + return queryId; + } + + public void endQuery(String queryId) { + if (queryId == null) { + logger.warn("query id is null"); + return; + } + cleanQuery(queryId); + } + + public void stopQuery(String queryId, String info) { + RunningQuery query = idQueryMap.get(queryId); + if (query != null) { + query.stop(info); + cleanQuery(queryId); + } else { + logger.info("the query:{} is not existed", queryId); + } + } + + private void cleanQuery(String queryId) { + RunningQuery query = idQueryMap.remove(queryId); + if (query != null) { + threadQueryMap.remove(query.getExecThread()); + } + } + + public RunningQuery getCurrentRunningQuery() { + Thread thread = Thread.currentThread(); + return getRunningQuery(thread); + } + + public RunningQuery getRunningQuery(Thread thread) { + return threadQueryMap.get(thread); + } + + public List getAllRunningQueries() { + TreeSet queriesSet = new TreeSet<>(); + for (RunningQuery runningQuery : idQueryMap.values()) { + queriesSet.add(runningQuery); + } + return new ArrayList<>(queriesSet); + } +} diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryStep.java b/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryStep.java new file mode 100644 index 0000000..00f8fe3 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryStep.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.query; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) +public class QueryStep { + public static final String LINE_SEP = System.getProperty("line.separator"); + + public enum Status { + RUNNING, STOPPED, SUCCEED + } + + @JsonProperty("name") + private String name; + + @JsonProperty("start_time") + private long startTime; + + @JsonProperty("end_time") + private long endTime; + + @JsonProperty("status") + private volatile Status status = Status.RUNNING; + + @JsonProperty("attributes") + private Map attributes = new HashMap<>(); + + @JsonProperty("sub_steps") + private List subSteps = new CopyOnWriteArrayList<>(); + + private Thread execThread; + + public QueryStep(String name) { + this.name = name; + } + + public QueryStep startStep(String stepName) { + return startStep(stepName, Thread.currentThread()); + } + + public QueryStep startStep(String stepName, Thread execThread) { + QueryStep step = new QueryStep(stepName); + step.setStartTime(System.currentTimeMillis()); + step.setExecThread(execThread); + addSubStep(step); + + return step; + } + + public void finishStep(String stepName) { + finishStep(stepName, Thread.currentThread()); + } + + public void finishStep(String stepName, Thread thread) { + QueryStep step = getStepByName(stepName, thread); + if (step == null) { + throw new IllegalStateException("the step:" + stepName + " is not exist."); + } + finishStep(step); + } + + public void finishStep(QueryStep step) { + step.setEndTime(System.currentTimeMillis()); + step.status = Status.SUCCEED; + } + + public boolean isRunning(){ + return this.status == Status.RUNNING; + } + + public QueryStep addAttribute(String key, String value) { + attributes.put(key, value); + return this; + } + + public QueryStep getRunningStep(Thread thread) { + for (QueryStep step : getSubSteps()) { + if (step.isRunning() && thread.equals(step.getExecThread())) { + return step; + } + } + return null; + } + + private void addSubStep(QueryStep step) { + subSteps.add(step); + } + + @JsonProperty("thread") + public ThreadInfo getThread(){ + String stackTraceStr = null; + if (isRunning()) { + int maxStackTraceDepth = 50; + int current = 0; + + StackTraceElement[] stackTrace = execThread.getStackTrace(); + StringBuilder buf = new StringBuilder(); + buf.append(LINE_SEP); + for (StackTraceElement e : stackTrace) { + if (++current > maxStackTraceDepth) { + break; + } + buf.append("\t").append("at ").append(e.toString()).append(LINE_SEP); + } + stackTraceStr = buf.toString(); + } + + + ThreadInfo threadInfo = new ThreadInfo(); + threadInfo.threadName = execThread.getName(); + threadInfo.stackTrace = stackTraceStr; + return threadInfo; + } + + /** + * + * @param name + * @param thread + * @return null if step not exist + */ + private QueryStep getStepByName(String name, Thread thread) { + for (QueryStep step : subSteps) { + if (step.name.equals(name) && step.execThread.equals(thread)) { + return step; + } + } + return null; + } + + public String getName() { + return name; + } + + public Thread getExecThread() { + return execThread; + } + + public void setExecThread(Thread execThread) { + this.execThread = execThread; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getEndTime() { + return endTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + this.status = status; + } + + public List getSubSteps() { + return subSteps; + } + + @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) + public static class ThreadInfo{ + @JsonProperty("name") + public String threadName; + @JsonProperty("stack_trace") + public String stackTrace; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + QueryStep queryStep = (QueryStep) o; + + if (!name.equals(queryStep.name)) return false; + return execThread.equals(queryStep.execThread); + + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + execThread.hashCode(); + return result; + } +} diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/query/RunningQuery.java b/core-metadata/src/main/java/org/apache/kylin/metadata/query/RunningQuery.java new file mode 100644 index 0000000..b6b23e9 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/query/RunningQuery.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.query; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) +public class RunningQuery extends QueryStep implements Comparable { + public static final String SQL_PARSE_STEP = "parse_sql"; + public static final String CUBE_PLAN_STEP = "cube_plan"; + + @JsonProperty("project") + private String projectName; + @JsonProperty("query_id") + private String queryId; + @JsonProperty("sql") + private String sql; + + private String stopReason = ""; + + private AtomicInteger numIncomingRecords = new AtomicInteger(0); + + private List stopListeners = new ArrayList<>(); + + public RunningQuery(String projectName, String queryId, String sql, Thread mainQueryThread){ + super("main_query"); + setExecThread(mainQueryThread); + setStartTime(System.currentTimeMillis()); + this.projectName = projectName; + this.queryId = queryId; + this.sql = sql; + } + + public void startSqlParse() { + startStep(SQL_PARSE_STEP); + } + + public void finishSqlParse() { + finishStep(SQL_PARSE_STEP); + } + + public void startCubePlan() { + startStep(CUBE_PLAN_STEP); + } + + public void finishCubePlan() { + finishStep(CUBE_PLAN_STEP); + } + + + /** + * stop the query + */ + public void stop(String reason) { + if (isStopped()){ + return; + } + setStatus(Status.STOPPED); + setStopReason(reason); + for (QueryStopListener stopListener : stopListeners) { + stopListener.stop(this); + } + } + + public boolean isStopped() { + return getStatus() == Status.STOPPED; + } + + public String getStopReason() { + return stopReason; + } + + public void setStopReason(String stopReason) { + this.stopReason = stopReason; + } + + public String getProjectName() { + return projectName; + } + + public String getQueryId() { + return queryId; + } + + public String getSql() { + return sql; + } + + public int incAndGetIncomingRecords(int recordCnt) { + return numIncomingRecords.addAndGet(recordCnt); + } + + public void addQueryStopListener(QueryStopListener listener) { + this.stopListeners.add(listener); + } + + public String toString(){ + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + for (QueryStep step : getSubSteps()) { + pw.print(step.getName() + ":"); + } + + return sw.toString(); + } + + @Override + public int compareTo(RunningQuery q) { + return (int)(this.getStartTime() - q.getStartTime()); + } + + public interface QueryStopListener { + void stop(RunningQuery query); + } +} diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java index 2bc3bb9..a9de916 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java @@ -22,6 +22,9 @@ import java.util.List; import net.sf.ehcache.Element; +import org.apache.kylin.metadata.query.QueryManager; +import org.apache.kylin.metadata.query.QueryStep; +import org.apache.kylin.metadata.query.RunningQuery; import org.apache.kylin.metadata.realization.SQLDigest; import org.apache.kylin.metadata.realization.StreamSQLDigest; import org.apache.kylin.metadata.tuple.ITuple; @@ -45,17 +48,21 @@ public class CacheFledgedStaticQuery extends AbstractCacheFledgedQuery { @Override public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo) { - + RunningQuery currQuery = QueryManager.getInstance().getCurrentRunningQuery(); + QueryStep cacheStep = currQuery.startStep("query_cache"); streamSQLDigest = new StreamSQLDigest(sqlDigest, null); StreamSQLResult cachedResult = getStreamSQLResult(streamSQLDigest); + currQuery.finishStep("query_cache"); ITupleIterator ret; if (cachedResult != null) { logger.info("using existing cache"); + cacheStep.addAttribute("cache_hit", Boolean.TRUE.toString()); context.setReusedPeriod(Ranges. all()); return new SimpleTupleIterator(cachedResult.reuse(Ranges. all())); } else { logger.info("no existing cache to use"); + cacheStep.addAttribute("cache_hit", Boolean.FALSE.toString()); ret = underlyingStorage.search(context, sqlDigest, returnTupleInfo); //use another nested ITupleIterator to deal with cache diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java index 7053694..4ede7ff 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java @@ -37,6 +37,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.convert.ConverterImpl; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.kylin.metadata.query.QueryManager; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.query.routing.NoRealizationFoundException; import org.apache.kylin.query.routing.QueryRouter; @@ -102,8 +103,9 @@ public class OLAPToEnumerableConverter extends ConverterImpl implements Enumerab System.out.println("EXECUTION PLAN AFTER REWRITE"); System.out.println(dumpPlan); } - - return impl.visitChild(this, 0, inputAsEnum, pref); + Result result= impl.visitChild(this, 0, inputAsEnum, pref); + QueryManager.getInstance().getCurrentRunningQuery().finishSqlParse(); + return result; } private Result buildHiveResult(EnumerableRelImplementor enumImplementor, Prefer pref, OLAPContext context) { diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java index f60894e..59987d3 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java @@ -34,6 +34,8 @@ import org.apache.commons.io.IOUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.metadata.query.QueryManager; +import org.apache.kylin.metadata.query.RunningQuery; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.model.Query; @@ -166,6 +168,20 @@ public class QueryController extends BasicController { } } + @RequestMapping(value = "/query/runningQueries", method = RequestMethod.GET) + @ResponseBody + public List getRunningQueries() { + return QueryManager.getInstance().getAllRunningQueries(); + } + + @RequestMapping(value = "/query/{queryId}/stop", method = RequestMethod.PUT) + @ResponseBody + public void stopQuery(@PathVariable String queryId) { + final String user = SecurityContextHolder.getContext().getAuthentication().getName(); + logger.info("{} stop the query: {}", new Object[]{user,queryId}); + QueryManager.getInstance().stopQuery(queryId, "stopped by " + user); + } + private SQLResponse doQueryWithCache(SQLRequest sqlRequest) { try { BackdoorToggles.setToggles(sqlRequest.getBackdoorToggles()); diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java index bf371be..383db26 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -53,6 +53,7 @@ import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.metadata.query.QueryManager; import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.model.ColumnMeta; @@ -112,13 +113,16 @@ public class QueryService extends BasicService { } public SQLResponse query(SQLRequest sqlRequest) throws Exception { + QueryManager queryManager = QueryManager.getInstance(); + String queryId = null; try { badQueryDetector.queryStart(Thread.currentThread(), sqlRequest); - + queryId = queryManager.startQuery(sqlRequest.getProject(),sqlRequest.getSql()); return queryWithSqlMassage(sqlRequest); } finally { badQueryDetector.queryEnd(Thread.currentThread()); + queryManager.endQuery(queryId); } } @@ -347,7 +351,7 @@ public class QueryService extends BasicService { try { conn = cacheService.getOLAPDataSource(sqlRequest.getProject()).getConnection(); - + QueryManager.getInstance().getCurrentRunningQuery().startSqlParse(); if (sqlRequest instanceof PrepareSqlRequest) { PreparedStatement preparedState = conn.prepareStatement(sql); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index d6ef16c..8817b97 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesSerializer; @@ -52,6 +53,9 @@ import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.metadata.query.QueryManager; +import org.apache.kylin.metadata.query.QueryStep; +import org.apache.kylin.metadata.query.RunningQuery; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; @@ -77,9 +81,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { int expectedSize; int current = 0; BlockingQueue queue; + RunningQuery runningQuery; - public ExpectedSizeIterator(int expectedSize) { + + public ExpectedSizeIterator(RunningQuery runningQuery, int expectedSize) { this.expectedSize = expectedSize; + this.runningQuery = runningQuery; this.queue = new ArrayBlockingQueue(expectedSize); } @@ -93,6 +100,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { if (current >= expectedSize) { throw new IllegalStateException("Won't have more data"); } + checkQueryState(); try { current++; return queue.poll(1, TimeUnit.HOURS); @@ -108,11 +116,18 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { public void append(byte[] data) { try { + checkQueryState(); queue.put(data); } catch (InterruptedException e) { throw new RuntimeException("error when waiting queue", e); } } + + private void checkQueryState() { + if (runningQuery.isStopped()) { + throw new IllegalStateException("the query is stopped: " + runningQuery.getStopReason()); + } + } } static class EndpointResultsAsGTScanner implements IGTScanner { @@ -249,16 +264,42 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } } - logger.debug("Submitting rpc to {} shards starting from shard {}, scan requests count {}", new Object[] { shardNum, cuboidBaseShard, scanRequests.size() }); - + final RunningQuery query = QueryManager.getInstance().getCurrentRunningQuery(); + final QueryStep segmentQueryStep = query.startStep("segment_query"); + segmentQueryStep.addAttribute("segment", cubeSeg.getName()); + segmentQueryStep.addAttribute("htable", cubeSeg.getStorageLocationIdentifier()); + + query.addQueryStopListener(new RunningQuery.QueryStopListener() { + @Override + public void stop(RunningQuery query) { + // first interrupt the query thread + List epRangeSteps = segmentQueryStep.getSubSteps(); + for (QueryStep epRangeStep : epRangeSteps) { + if (epRangeStep.isRunning()) { + Thread epRangeThread = epRangeStep.getExecThread(); + epRangeThread.interrupt(); + } + } + if (segmentQueryStep.isRunning()) { + segmentQueryStep.getExecThread().interrupt(); + } + } + }); + logger.debug("Submitting rpc to {} shards starting from shard {}, scan requests count {}, query id {}", new Object[] { shardNum, cuboidBaseShard, scanRequests.size(), query.getQueryId()}); final AtomicInteger totalScannedCount = new AtomicInteger(0); - final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(scanRequests.size() * shardNum); + final AtomicInteger totalResponseCount = new AtomicInteger(0); + final int expectedSize = scanRequests.size() * shardNum; + final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(query, expectedSize); for (final Pair epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) { executorService.submit(new Runnable() { @Override public void run() { + QueryStep epRangeStep = segmentQueryStep.startStep("endpoint_range_request"); + epRangeStep.addAttribute("ep_range", BytesUtil.toHex(epRange.getFirst()) + "-" + BytesUtil.toHex(epRange.getSecond())); for (int i = 0; i < scanRequests.size(); ++i) { + QueryStep coprocessorRequestStep = epRangeStep.startStep("coprocessor_request:" + i); + int scanIndex = i; CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder(); builder.setGtScanRequest(scanRequestByteStrings.get(scanIndex)).setHbaseRawScan(rawScanByteStrings.get(scanIndex)); @@ -270,20 +311,27 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { Map results; try { - results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond()); + results = getResults(query, coprocessorRequestStep, builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond()); } catch (Throwable throwable) { throw new RuntimeException("Error when visiting cubes by endpoint:", throwable); } for (Map.Entry result : results.entrySet()) { totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount()); - logger.info(getStatsString(result)); + totalResponseCount.incrementAndGet(); + logger.info(getStatsString(result, query)); try { epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows()))); } catch (IOException | DataFormatException e) { throw new RuntimeException("Error when decompressing", e); } } + + epRangeStep.finishStep(coprocessorRequestStep); + } + segmentQueryStep.finishStep(epRangeStep); + if (totalResponseCount.get() == expectedSize) { + query.finishStep(segmentQueryStep); } } }); @@ -292,9 +340,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequests.get(0).getColumns(), totalScannedCount.get()); } - private String getStatsString(Map.Entry result) { + private String getStatsString(Map.Entry result, RunningQuery query) { StringBuilder sb = new StringBuilder(); Stats stats = result.getValue().getStats(); + sb.append("Query ID: " + query.getQueryId() + ". "); sb.append("Endpoint RPC returned from HTable " + cubeSeg.getStorageLocationIdentifier() + " Shard " + BytesUtil.toHex(result.getKey()) + " on host: " + stats.getHostname() + "."); sb.append("Total scanned row: " + stats.getScannedRowCount() + ". "); sb.append("Total filtered/aggred row: " + stats.getAggregatedRowCount() + ". "); @@ -305,9 +354,11 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } - private Map getResults(final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable { + private Map getResults(final RunningQuery query, final QueryStep coprocessorRequestStep, final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable { + final int limitCount = getConfigLimit(); Map results = table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, new Batch.Call() { public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException { + QueryStep regionRPCStep = coprocessorRequestStep.startStep("region_server_rpc"); ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback rpcCallback = new BlockingRpcCallback<>(); rowsService.visitCube(controller, request, rpcCallback); @@ -315,10 +366,29 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { if (controller.failedOnException()) { throw controller.getFailedOn(); } + + int totalIncomingRecords = query.incAndGetIncomingRecords(getIncomingRecordSize(response)); + + if (totalIncomingRecords >= limitCount) { + logger.warn("the query result size {} is too large to return, stop the query", totalIncomingRecords); + query.stop("Scan row count exceeded, please add filter condition to narrow down backend scan range, like where clause."); + throw new RuntimeException("the query is stopped because of too large result"); + } + coprocessorRequestStep.finishStep(regionRPCStep); return response; } }); return results; } + + private int getConfigLimit() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + return config.getScanThreshold(); + } + + private int getIncomingRecordSize(CubeVisitProtos.CubeVisitResponse response) { + Stats stats = response.getStats(); + return stats.getScannedRowCount() - stats.getAggregatedRowCount(); + } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java index c820bd0..7ed6171 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java @@ -42,6 +42,7 @@ import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.query.QueryManager; import org.apache.kylin.metadata.realization.SQLDigest; import org.apache.kylin.metadata.tuple.ITupleIterator; import org.apache.kylin.storage.ICachableStorageQuery; @@ -69,6 +70,7 @@ public class CubeStorageQuery implements ICachableStorageQuery { @Override public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { + QueryManager.getInstance().getCurrentRunningQuery().startCubePlan(); // check whether this is a TopN query checkAndRewriteTopN(sqlDigest); @@ -126,7 +128,7 @@ public class CubeStorageQuery implements ICachableStorageQuery { } scanners.add(scanner); } - + QueryManager.getInstance().getCurrentRunningQuery().finishCubePlan(); if (scanners.isEmpty()) return ITupleIterator.EMPTY_TUPLE_ITERATOR; -- 2.6.4