From 8bd772f2a133f557917ed4d6809e2aaab1579775 Mon Sep 17 00:00:00 2001 From: Wang Ken Date: Mon, 11 Dec 2017 15:37:33 +0800 Subject: [PATCH] APACHE-KYLIN-3098: add a new config Signed-off-by: Zhong --- .../src/main/java/org/apache/kylin/common/KylinConfigBase.java | 4 ++++ .../src/main/java/org/apache/kylin/common/QueryContext.java | 9 +++++++++ .../apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java | 6 ++++++ 3 files changed, 19 insertions(+) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 66805df..8491c62 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1110,6 +1110,10 @@ abstract public class KylinConfigBase implements Serializable { return value > 0 ? value : Long.MAX_VALUE; } + public long getQueryMaxReturnRows() { + return Integer.parseInt(this.getOptional("kylin.query.max-return-rows", "5000000")); + } + public int getTranslatedInClauseMaxSize() { return Integer.parseInt(getOptional("kylin.query.translated-in-clause-max-size", String.valueOf(1024 * 1024))); } diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 9499f56..3f7fe3e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -50,6 +50,7 @@ public class QueryContext { private String username; private AtomicLong scannedRows = new AtomicLong(); + private AtomicLong returnedRows = new AtomicLong(); private AtomicLong scannedBytes = new AtomicLong(); private AtomicBoolean isRunning = new AtomicBoolean(true); @@ -115,6 +116,14 @@ public class QueryContext { return scannedRows.addAndGet(deltaRows); } + public long getReturnedRows() { + return returnedRows.get(); + } + + public long addAndGetReturnedRows(long deltaRows) { + return returnedRows.addAndGet(deltaRows); + } + public long getScannedBytes() { return scannedBytes.get(); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 87b9ca2..f5fdcd2 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -286,6 +286,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { Stats stats = result.getStats(); queryContext.addAndGetScannedRows(stats.getScannedRowCount()); queryContext.addAndGetScannedBytes(stats.getScannedBytes()); + queryContext.addAndGetReturnedRows(stats.getScannedRowCount() + - stats.getAggregatedRowCount() - stats.getFilteredRowCount()); RuntimeException rpcException = null; if (result.getStats().getNormalComplete() != 1) { @@ -305,6 +307,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { rpcException = new ResourceLimitExceededException( "Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes()); + } else if (queryContext.getReturnedRows() > cubeSeg.getConfig().getQueryMaxReturnRows()) { + rpcException = new ResourceLimitExceededException( + "Query returned " + queryContext.getReturnedRows() + " rows exceeds threshold " + + cubeSeg.getConfig().getQueryMaxReturnRows()); } if (rpcException != null) { -- 2.5.4 (Apple Git-61)