From acdf2d1fe6a22d969c1a3ccbb2b98c407e29f4aa Mon Sep 17 00:00:00 2001 From: gaodayue Date: Tue, 5 Sep 2017 17:04:30 +0800 Subject: [PATCH] KYLIN-2847 avoid doing useless work by checking query deadline --- .../java/org/apache/kylin/common/QueryContext.java | 33 +++++++++++++++++++++- .../org/apache/kylin/storage/StorageContext.java | 14 --------- .../storage/gtrecord/GTCubeStorageQueryBase.java | 6 ++-- .../gtrecord/SequentialCubeTupleIterator.java | 6 ++-- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 2 +- .../kylin/storage/hbase/cube/v2/CubeHBaseRPC.java | 12 +++++--- .../hbase/cube/v2/ExpectedSizeIterator.java | 4 +-- .../v2/coprocessor/endpoint/CubeVisitService.java | 30 ++++++++++++++------ 8 files changed, 71 insertions(+), 36 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 9e0c33b5a..9b728a746 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -18,6 +18,8 @@ package org.apache.kylin.common; +import org.apache.kylin.common.exceptions.KylinTimeoutException; + import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -33,6 +35,9 @@ public class QueryContext { } }; + private long queryStartMillis; + private long deadline = Long.MAX_VALUE; + private String queryId; private String username; private AtomicLong scannedRows = new AtomicLong(); @@ -40,7 +45,7 @@ public class QueryContext { private QueryContext() { // use QueryContext.current() instead - + queryStartMillis = System.currentTimeMillis(); queryId = UUID.randomUUID().toString(); } @@ -52,6 +57,32 @@ public class QueryContext { contexts.remove(); } + public long getQueryStartMillis() { + return queryStartMillis; + } + + public void setDeadline(long timeoutMillis) { + if (timeoutMillis > 0) { + deadline = queryStartMillis + timeoutMillis; + } + } + + public long getDeadline() { + return deadline; + } + + /** + * @return millis before deadline + * @throws KylinTimeoutException if deadline has passed + */ + public long checkMillisBeforeDeadline() { + long remain = deadline - System.currentTimeMillis(); + if (remain <= 0) { + throw new KylinTimeoutException("Query timeout"); + } + return remain; + } + public String getQueryId() { return queryId == null ? "" : queryId; } diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index a2e286948..8d11ea513 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -45,7 +45,6 @@ public class StorageContext { private StorageLimitLevel storageLimitLevel = StorageLimitLevel.NO_LIMIT; private boolean hasSort = false; private boolean acceptPartialResult = false; - private long deadline; private boolean exactAggregation = false; private boolean needStorageAggregation = false; @@ -154,19 +153,6 @@ public class StorageContext { return isValidPushDownLimit(finalPushDownLimit); } - public long getDeadline() { - return this.deadline; - } - - public void setDeadline(IRealization realization) { - int timeout = realization.getConfig().getQueryTimeoutSeconds() * 1000; - if (timeout == 0) { - this.deadline = Long.MAX_VALUE; - } else { - this.deadline = timeout + System.currentTimeMillis(); - } - } - public void markSort() { this.hasSort = true; } diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index a6d30bb1e..9fb2924bb 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -156,8 +157,9 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { sqlDigest.aggregations, context); // set whether to aggregate results from multiple partitions enableStreamAggregateIfBeneficial(cuboid, groupsD, context); - // set query deadline - context.setDeadline(cubeInstance); + // set and check query deadline + QueryContext.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000); + QueryContext.current().checkMillisBeforeDeadline(); // push down having clause filter if possible TupleFilter havingFilter = checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations, diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java index 3cbb538cc..ede5ff9cc 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Set; import java.util.TreeSet; -import org.apache.kylin.common.exceptions.KylinTimeoutException; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -141,8 +141,8 @@ public class SequentialCubeTupleIterator implements ITupleIterator { @Override public ITuple next() { - if (scanCount++ % 100 == 1 && System.currentTimeMillis() > context.getDeadline()) { - throw new KylinTimeoutException("Query timeout after \"kylin.query.timeout-seconds\" seconds"); + if (scanCount++ % 100 == 1) { + QueryContext.current().checkMillisBeforeDeadline(); } if (++scanCountDelta >= 1000) diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 4ac237bc9..c1007a54f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -130,7 +130,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { List rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks); rawScanByteString = serializeRawScans(rawScans); - int coprocessorTimeout = getCoprocessorTimeoutMillis(); + long coprocessorTimeout = getCoprocessorTimeoutMillis(); scanRequest.setTimeout(coprocessorTimeout); scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it scanRequestByteString = serializeGTScanReq(scanRequest); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 188f55405..6b4ac321e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -287,8 +287,8 @@ public abstract class CubeHBaseRPC implements IGTStorage { logger.info(info.toString()); } - protected int getCoprocessorTimeoutMillis() { - int coopTimeout; + protected long getCoprocessorTimeoutMillis() { + long coopTimeout; if (BackdoorToggles.getQueryTimeout() != -1) { coopTimeout = BackdoorToggles.getQueryTimeout(); } else { @@ -307,10 +307,14 @@ public abstract class CubeHBaseRPC implements IGTStorage { // coprocessor timeout is 0 by default if (coopTimeout <= 0) { - coopTimeout = (int) (rpcTimeout * 0.9); + coopTimeout = (long) (rpcTimeout * 0.9); } + + long millisBeforeDeadline = queryContext.checkMillisBeforeDeadline(); + coopTimeout = Math.min(coopTimeout, millisBeforeDeadline); - logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, coopTimeout); + logger.debug("{} = {} ms, {} ms before deadline, use {} ms as timeout for coprocessor", + HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, millisBeforeDeadline, coopTimeout); return coopTimeout; } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java index 59fe9e0ab..60d85b4cb 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java @@ -32,11 +32,11 @@ class ExpectedSizeIterator implements Iterator { private BlockingQueue queue; private int expectedSize; private int current = 0; - private int coprocessorTimeout; + private long coprocessorTimeout; private long deadline; private volatile Throwable coprocException; - public ExpectedSizeIterator(int expectedSize, int coprocessorTimeout) { + public ExpectedSizeIterator(int expectedSize, long coprocessorTimeout) { this.expectedSize = expectedSize; this.queue = new ArrayBlockingQueue(expectedSize); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index d94b54745..bac4bafc2 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; @@ -142,19 +143,17 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement private final Iterator> delegate; private final long rowCountLimit; private final long bytesLimit; - private final long timeout; private final long deadline; private long rowCount; private long rowBytes; - ResourceTrackingCellListIterator(Iterator> delegate, long rowCountLimit, long bytesLimit, - long timeout) { + ResourceTrackingCellListIterator(Iterator> delegate, + long rowCountLimit, long bytesLimit, long deadline) { this.delegate = delegate; this.rowCountLimit = rowCountLimit; this.bytesLimit = bytesLimit; - this.timeout = timeout; - this.deadline = System.currentTimeMillis() + timeout; + this.deadline = deadline; } @Override @@ -166,8 +165,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement throw new ResourceLimitExceededException( "scanned bytes " + rowBytes + " exceeds threshold " + bytesLimit); } - if ((rowCount % GTScanRequest.terminateCheckInterval == 1) && System.currentTimeMillis() > deadline) { - throw new KylinTimeoutException("coprocessor timeout after " + timeout + " ms"); + if ((rowCount % GTScanRequest.terminateCheckInterval == 0) && System.currentTimeMillis() > deadline) { + throw new KylinTimeoutException("coprocessor timeout after scanning " + rowCount + " rows"); } return delegate.hasNext(); } @@ -219,6 +218,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement sb.append(","); } + private void checkDeadline(long deadline) throws DoNotRetryIOException { + if (System.currentTimeMillis() > deadline) { + logger.info("Deadline has passed, abort now!"); + throw new DoNotRetryIOException("Coprocessor passed deadline! Maybe server is overloaded"); + } + } + @SuppressWarnings("checkstyle:methodlength") @Override public void visitCube(final RpcController controller, final CubeVisitProtos.CubeVisitRequest request, @@ -233,6 +239,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement CubeVisitProtos.CubeVisitResponse.ErrorInfo errorInfo = null; String queryId = request.hasQueryId() ? request.getQueryId() : "UnknownId"; + logger.info("start query {} in thread {}", queryId, Thread.currentThread().getName()); try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) { final long serviceStartTime = System.currentTimeMillis(); @@ -247,6 +254,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement final GTScanRequest scanReq = GTScanRequest.serializer .deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()))); + final long deadline = scanReq.getStartTime() + scanReq.getTimeout(); + checkDeadline(deadline); + List> hbaseColumnsToGT = Lists.newArrayList(); for (IntList intList : request.getHbaseColumnsToGTList()) { hbaseColumnsToGT.add(intList.getIntsList()); @@ -297,7 +307,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator(allCellLists, scanReq.getStorageScanRowNumThreshold(), // for old client (scan threshold) !request.hasMaxScanBytes() ? Long.MAX_VALUE : request.getMaxScanBytes(), // for new client - scanReq.getTimeout()); + deadline); IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn(), @@ -395,6 +405,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement .setNormalComplete(errorInfo == null ? 1 : 0).build()) .build()); + } catch (DoNotRetryIOException e) { + ResponseConverter.setControllerException(controller, e); + } catch (IOException ioe) { logger.error(ioe.toString(), ioe); IOException wrapped = new IOException("Error in coprocessor " + debugGitTag, ioe); @@ -407,7 +420,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement try { region.closeRegionOperation(); } catch (IOException e) { - e.printStackTrace(); throw new RuntimeException(e); } } -- 2.11.0 (Apple Git-81)