From 852bbbfb00183ab368c99e2260b9877a4047e473 Mon Sep 17 00:00:00 2001 From: gaodayue Date: Tue, 5 Sep 2017 17:04:30 +0800 Subject: [PATCH] KYLIN-2847 avoid doing useless work by checking query deadline --- .../java/org/apache/kylin/common/QueryContext.java | 33 +++++++++++++++++++++- .../org/apache/kylin/storage/StorageContext.java | 14 --------- .../storage/gtrecord/GTCubeStorageQueryBase.java | 6 ++-- .../gtrecord/SequentialCubeTupleIterator.java | 6 ++-- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 2 +- .../kylin/storage/hbase/cube/v2/CubeHBaseRPC.java | 12 +++++--- .../hbase/cube/v2/ExpectedSizeIterator.java | 4 +-- .../v2/coprocessor/endpoint/CubeVisitService.java | 29 +++++++++++++------ 8 files changed, 71 insertions(+), 35 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 0b8d519e2..a1eedf8ad 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -18,6 +18,8 @@ package org.apache.kylin.common; +import org.apache.kylin.common.exceptions.KylinTimeoutException; + import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -33,13 +35,16 @@ public class QueryContext { } }; + private long queryStartMillis; + private long deadline = Long.MAX_VALUE; + private String queryId; private AtomicLong scannedRows = new AtomicLong(); private AtomicLong scannedBytes = new AtomicLong(); private QueryContext() { // use QueryContext.current() instead - + queryStartMillis = System.currentTimeMillis(); queryId = UUID.randomUUID().toString(); } @@ -51,6 +56,32 @@ public class QueryContext { contexts.remove(); } + public long getQueryStartMillis() { + return queryStartMillis; + } + + public void setDeadline(long timeoutMillis) { + if (timeoutMillis > 0) { + deadline = queryStartMillis + timeoutMillis; + } + } + + public long getDeadline() { + return deadline; + } + + /** + * @return millis before deadline + * @throws KylinTimeoutException if deadline has passed + */ + public long checkMillisBeforeDeadline() { + long remain = deadline - System.currentTimeMillis(); + if (remain <= 0) { + throw new KylinTimeoutException("Query timeout"); + } + return remain; + } + public String getQueryId() { return queryId == null ? "" : queryId; } diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index 78cf97c37..3d71764b8 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -43,7 +43,6 @@ public class StorageContext { private int finalPushDownLimit = Integer.MAX_VALUE; private boolean hasSort = false; private boolean acceptPartialResult = false; - private long deadline; private boolean exactAggregation = false; private boolean needStorageAggregation = false; @@ -139,19 +138,6 @@ public class StorageContext { return isValidPushDownLimit(finalPushDownLimit); } - public long getDeadline() { - return this.deadline; - } - - public void setDeadline(IRealization realization) { - int timeout = realization.getConfig().getQueryTimeoutSeconds() * 1000; - if (timeout == 0) { - this.deadline = Long.MAX_VALUE; - } else { - this.deadline = timeout + System.currentTimeMillis(); - } - } - public void markSort() { this.hasSort = true; } diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index a3af51128..fb1fb56d9 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -154,8 +155,9 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { sqlDigest.aggregations, context); // set whether to aggregate results from multiple partitions enableStreamAggregateIfBeneficial(cuboid, groupsD, context); - // set query deadline - context.setDeadline(cubeInstance); + // set and check query deadline + QueryContext.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000); + QueryContext.current().checkMillisBeforeDeadline(); // push down having clause filter if possible TupleFilter havingFilter = checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations, diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java index 3cbb538cc..ede5ff9cc 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Set; import java.util.TreeSet; -import org.apache.kylin.common.exceptions.KylinTimeoutException; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -141,8 +141,8 @@ public class SequentialCubeTupleIterator implements ITupleIterator { @Override public ITuple next() { - if (scanCount++ % 100 == 1 && System.currentTimeMillis() > context.getDeadline()) { - throw new KylinTimeoutException("Query timeout after \"kylin.query.timeout-seconds\" seconds"); + if (scanCount++ % 100 == 1) { + QueryContext.current().checkMillisBeforeDeadline(); } if (++scanCountDelta >= 1000) diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 4ac237bc9..c1007a54f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -130,7 +130,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { List rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks); rawScanByteString = serializeRawScans(rawScans); - int coprocessorTimeout = getCoprocessorTimeoutMillis(); + long coprocessorTimeout = getCoprocessorTimeoutMillis(); scanRequest.setTimeout(coprocessorTimeout); scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it scanRequestByteString = serializeGTScanReq(scanRequest); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index db81646ab..2de755d27 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -285,8 +285,8 @@ public abstract class CubeHBaseRPC implements IGTStorage { logger.info(info.toString()); } - protected int getCoprocessorTimeoutMillis() { - int coopTimeout; + protected long getCoprocessorTimeoutMillis() { + long coopTimeout; if (BackdoorToggles.getQueryTimeout() != -1) { coopTimeout = BackdoorToggles.getQueryTimeout(); } else { @@ -305,10 +305,14 @@ public abstract class CubeHBaseRPC implements IGTStorage { // coprocessor timeout is 0 by default if (coopTimeout <= 0) { - coopTimeout = (int) (rpcTimeout * 0.9); + coopTimeout = (long) (rpcTimeout * 0.9); } + + long millisBeforeDeadline = queryContext.checkMillisBeforeDeadline(); + coopTimeout = Math.min(coopTimeout, millisBeforeDeadline); - logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, coopTimeout); + logger.debug("{} = {} ms, {} ms before deadline, use {} ms as timeout for coprocessor", + HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, millisBeforeDeadline, coopTimeout); return coopTimeout; } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java index 59fe9e0ab..60d85b4cb 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java @@ -32,11 +32,11 @@ class ExpectedSizeIterator implements Iterator { private BlockingQueue queue; private int expectedSize; private int current = 0; - private int coprocessorTimeout; + private long coprocessorTimeout; private long deadline; private volatile Throwable coprocException; - public ExpectedSizeIterator(int expectedSize, int coprocessorTimeout) { + public ExpectedSizeIterator(int expectedSize, long coprocessorTimeout) { this.expectedSize = expectedSize; this.queue = new ArrayBlockingQueue(expectedSize); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 3791e63a9..3051f4b52 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; @@ -81,6 +82,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement private static final int MEMORY_LIMIT = 500 * 1024 * 1024; private RegionCoprocessorEnvironment env; + private long deadline; abstract static class BaseCellListIterator implements CellListIterator { @Override @@ -141,19 +143,17 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement private final Iterator> delegate; private final long rowCountLimit; private final long bytesLimit; - private final long timeout; private final long deadline; private long rowCount; private long rowBytes; ResourceTrackingCellListIterator(Iterator> delegate, - long rowCountLimit, long bytesLimit, long timeout) { + long rowCountLimit, long bytesLimit, long deadline) { this.delegate = delegate; this.rowCountLimit = rowCountLimit; this.bytesLimit = bytesLimit; - this.timeout = timeout; - this.deadline = System.currentTimeMillis() + timeout; + this.deadline = deadline; } @Override @@ -164,8 +164,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement if (rowBytes > bytesLimit) { throw new ResourceLimitExceededException("scanned bytes " + rowBytes + " exceeds threshold " + bytesLimit); } - if ((rowCount % GTScanRequest.terminateCheckInterval == 1) && System.currentTimeMillis() > deadline) { - throw new KylinTimeoutException("coprocessor timeout after " + timeout + " ms"); + if ((rowCount % GTScanRequest.terminateCheckInterval == 0) && System.currentTimeMillis() > deadline) { + throw new KylinTimeoutException("coprocessor timeout after scanning " + rowCount + " rows"); } return delegate.hasNext(); } @@ -216,6 +216,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement sb.append(","); } + private void checkDeadline() throws DoNotRetryIOException { + if (System.currentTimeMillis() > deadline) { + logger.info("Deadline has passed, abort now!"); + throw new DoNotRetryIOException("Coprocessor passed deadline! Maybe server is overloaded"); + } + } + @SuppressWarnings("checkstyle:methodlength") @Override public void visitCube(final RpcController controller, final CubeVisitProtos.CubeVisitRequest request, RpcCallback done) { @@ -229,6 +236,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement CubeVisitProtos.CubeVisitResponse.ErrorInfo errorInfo = null; String queryId = request.hasQueryId() ? request.getQueryId() : "UnknownId"; + logger.info("start query {} in thread {}", queryId, Thread.currentThread().getName()); try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) { final long serviceStartTime = System.currentTimeMillis(); @@ -242,6 +250,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement debugGitTag = region.getTableDesc().getValue(IRealizationConstants.HTableGitTag); final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()))); + this.deadline = scanReq.getStartTime() + scanReq.getTimeout(); + checkDeadline(); // in case RPC spends too much time in queue + List> hbaseColumnsToGT = Lists.newArrayList(); for (IntList intList : request.getHbaseColumnsToGTList()) { hbaseColumnsToGT.add(intList.getIntsList()); @@ -291,7 +302,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement allCellLists, scanReq.getStorageScanRowNumThreshold(), // for old client (scan threshold) !request.hasMaxScanBytes() ? Long.MAX_VALUE : request.getMaxScanBytes(), // for new client - scanReq.getTimeout()); + deadline); IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn(), request.getIsExactAggregate()); @@ -388,6 +399,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement setNormalComplete(errorInfo == null ? 1 : 0).build()) .build()); + } catch (DoNotRetryIOException e) { + ResponseConverter.setControllerException(controller, e); + } catch (IOException ioe) { logger.error(ioe.toString(), ioe); IOException wrapped = new IOException("Error in coprocessor " + debugGitTag, ioe); @@ -400,7 +414,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement try { region.closeRegionOperation(); } catch (IOException e) { - e.printStackTrace(); throw new RuntimeException(e); } } -- 2.11.0 (Apple Git-81)