From b72ae5da2fc9239cfebbe8a3b9fbf1ea3019d3aa Mon Sep 17 00:00:00 2001 From: gaodayue Date: Tue, 11 Oct 2016 15:11:38 +0800 Subject: [PATCH] KYLIN-2079 add explicit configuration knob for coprocessor timeout --- build/conf/kylin.properties | 4 ++ .../org/apache/kylin/common/KylinConfigBase.java | 8 +-- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 6 +- .../kylin/storage/hbase/cube/v2/CubeHBaseRPC.java | 24 +++++++ .../hbase/cube/v2/ExpectedSizeIterator.java | 73 +++++++--------------- 5 files changed, 57 insertions(+), 58 deletions(-) diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index ed86bdb..a254b41 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -140,6 +140,10 @@ kylin.query.mem.budget=3221225472 kylin.query.coprocessor.mem.gb=3 +# the default coprocessor timeout is (hbase.rpc.timeout * 0.9) / 1000 seconds, +# you can set it to a smaller value. 0 means use default. +# kylin.query.coprocessor.timeout.seconds=0 + # Enable/disable ACL check for cube query kylin.query.security.enabled=true diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 99c3c5a..85924bd 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -481,10 +481,6 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000")); } - public float getCubeVisitTimeoutTimes() { - return Float.parseFloat(getOptional("kylin.query.cube.visit.timeout.times", "1")); - } - public int getBadQueryStackTraceDepth() { return Integer.parseInt(getOptional("kylin.query.badquery.stacktrace.depth", "10")); } @@ -533,6 +529,10 @@ abstract public class KylinConfigBase implements Serializable { return Double.parseDouble(this.getOptional("kylin.query.coprocessor.mem.gb", "3.0")); } + public int getQueryCoprocessorTimeoutSeconds() { + return Integer.parseInt(this.getOptional("kylin.query.coprocessor.timeout.seconds", "0")); + } + public boolean isQuerySecureEnabled() { return Boolean.parseBoolean(this.getOptional("kylin.query.security.enabled", "true")); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index f285153..d234c1e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -127,10 +127,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { List rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks); rawScanByteString = serializeRawScans(rawScans); + int coprocessorTimeout = getCoprocessorTimeoutMillis(); + scanRequest.setTimeout(coprocessorTimeout); scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it - final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum); - scanRequest.setTimeout(epResultItr.getRpcTimeout()); scanRequestByteString = serializeGTScanReq(scanRequest); + + final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum, coprocessorTimeout); logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size()); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index da087c9..05b34c7 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -22,11 +22,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FuzzyRowFilter; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; @@ -45,6 +48,7 @@ import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRange; import org.apache.kylin.gridtable.IGTStorage; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -274,4 +278,24 @@ public abstract class CubeHBaseRPC implements IGTStorage { logger.info(info.toString()); } + protected int getCoprocessorTimeoutMillis() { + int configTimeout = cubeSeg.getConfig().getQueryCoprocessorTimeoutSeconds() * 1000; + if (configTimeout == 0) { + configTimeout = Integer.MAX_VALUE; + } + + Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); + int rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + // final timeout should be smaller than rpc timeout + int upper = (int) (rpcTimeout * 0.9); + + int timeout = Math.min(upper, configTimeout); + if (BackdoorToggles.getQueryTimeout() != -1) { + timeout = Math.min(upper, BackdoorToggles.getQueryTimeout()); + } + + logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, timeout); + return timeout; + } + } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java index c27e5fc..2d574bd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java @@ -24,50 +24,25 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.gridtable.GTScanRequest; -import org.apache.kylin.gridtable.GTScanSelfTerminatedException; -import org.apache.kylin.storage.hbase.HBaseConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -class ExpectedSizeIterator implements Iterator { - private static final Logger logger = LoggerFactory.getLogger(ExpectedSizeIterator.class); - - BlockingQueue queue; +import com.google.common.base.Throwables; - int expectedSize; - int current = 0; - long rpcTimeout; - long timeout; - long timeoutTS; - volatile Throwable coprocException; - - public ExpectedSizeIterator(int expectedSize) { +class ExpectedSizeIterator implements Iterator { + private BlockingQueue queue; + private int expectedSize; + private int current = 0; + private int coprocessorTimeout; + private long deadline; + private volatile Throwable coprocException; + + public ExpectedSizeIterator(int expectedSize, int coprocessorTimeout) { this.expectedSize = expectedSize; this.queue = new ArrayBlockingQueue(expectedSize); - StringBuilder sb = new StringBuilder(); - Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); - - this.rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - this.timeout = this.rpcTimeout * hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - sb.append("rpc timeout is " + this.rpcTimeout + " and after multiply retry times becomes " + this.timeout); - - this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes(); - sb.append(" after multiply kylin.query.cube.visit.timeout.times becomes " + this.timeout); - - logger.info(sb.toString()); - - if (BackdoorToggles.getQueryTimeout() != -1) { - this.timeout = BackdoorToggles.getQueryTimeout(); - logger.info("rpc timeout is overwritten to " + this.timeout); - } - - this.timeoutTS = System.currentTimeMillis() + 2 * this.timeout;//longer timeout than coprocessor so that query thread will not timeout faster than coprocessor + this.coprocessorTimeout = coprocessorTimeout; + //longer timeout than coprocessor so that query thread will not timeout faster than coprocessor + this.deadline = System.currentTimeMillis() + coprocessorTimeout * 10; } @Override @@ -84,22 +59,20 @@ class ExpectedSizeIterator implements Iterator { current++; byte[] ret = null; - while (ret == null && coprocException == null && timeoutTS > System.currentTimeMillis()) { - ret = queue.poll(10000, TimeUnit.MILLISECONDS); + while (ret == null && coprocException == null && deadline > System.currentTimeMillis()) { + ret = queue.poll(1000, TimeUnit.MILLISECONDS); } if (coprocException != null) { - if (coprocException instanceof GTScanSelfTerminatedException) - throw (GTScanSelfTerminatedException) coprocException; - else - throw new RuntimeException("Error in coprocessor", coprocException); + throw Throwables.propagate(coprocException); + } - } else if (ret == null) { + if (ret == null) { throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + // - GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + timeout + ") cannot support this many scans?"); - } else { - return ret; + GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + coprocessorTimeout + ") cannot support this many scans?"); } + + return ret; } catch (InterruptedException e) { throw new RuntimeException("Error when waiting queue", e); } @@ -118,10 +91,6 @@ class ExpectedSizeIterator implements Iterator { } } - public long getRpcTimeout() { - return this.timeout; - } - public void notifyCoprocException(Throwable ex) { coprocException = ex; } -- 2.8.4 (Apple Git-73)