From f4d736be1ef4ed4a0763b9efc50c9f5abf546ca9 Mon Sep 17 00:00:00 2001 From: Zhong Date: Wed, 20 Sep 2017 09:46:44 +0800 Subject: [PATCH 1/2] APACHE-KYLIN-2881: Improve hbase coprocessor exception handling at kylin server side --- .../java/org/apache/kylin/common/QueryContext.java | 1 + .../apache/kylin/rest/service/QueryService.java | 2 + .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 252 +++++++++++++-------- .../hbase/cube/v2/ExpectedSizeIterator.java | 34 +-- 4 files changed, 186 insertions(+), 103 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 5ab3192..9499f56 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -48,6 +48,7 @@ public class QueryContext { private final String queryId; private String username; + private AtomicLong scannedRows = new AtomicLong(); private AtomicLong scannedBytes = new AtomicLong(); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 41e3d28..41529c7 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -468,6 +468,8 @@ public class QueryService extends BasicService { checkQueryAuth(sqlResponse, project, secureEnabled); } catch (Throwable e) { // calcite may throw AssertError + queryContext.stop(e); + logger.error("Exception while executing query", e); String errMsg = makeErrorMsgUserFriendly(e); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 96f8f06..87b9ca2 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -19,20 +19,23 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicReference; import java.util.zip.DataFormatException; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.exceptions.KylinTimeoutException; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; import org.apache.kylin.common.util.Bytes; @@ -52,7 +55,6 @@ import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer; import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; -import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats; @@ -103,6 +105,16 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { return Pair.newPair(cubeSeg.getCuboidShardNum(cuboid.getId()), cubeSeg.getCuboidBaseShard(cuboid.getId())); } + static Field channelRowField = null; + static { + try { + channelRowField = RegionCoprocessorRpcChannel.class.getDeclaredField("row"); + channelRowField.setAccessible(true); + } catch (Throwable t) { + logger.warn("error when get row field from RegionCoprocessorRpcChannel class", t); + } + } + @SuppressWarnings("checkstyle:methodlength") @Override public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { @@ -135,7 +147,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it scanRequestByteString = serializeGTScanReq(scanRequest); - final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum, coprocessorTimeout); + final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(queryContext, shardNum, coprocessorTimeout); logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size()); @@ -165,97 +177,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { builder.setMaxScanBytes(cubeSeg.getConfig().getPartitionMaxScanBytes()); builder.setIsExactAggregate(storageContext.isExactAggregation()); + final String logHeader = String.format("", queryContext.getQueryId(), + Integer.toHexString(System.identityHashCode(scanRequest))); for (final Pair epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) { executorService.submit(new Runnable() { @Override public void run() { - - final String logHeader = String.format("", queryId, Integer.toHexString(System.identityHashCode(scanRequest))); - final AtomicReference regionErrorHolder = new AtomicReference<>(); - - try { - Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool()); - - final CubeVisitRequest request = builder.build(); - final byte[] startKey = epRange.getFirst(); - final byte[] endKey = epRange.getSecond(); - - table.coprocessorService(CubeVisitService.class, startKey, endKey, // - new Batch.Call() { - public CubeVisitResponse call(CubeVisitService rowsService) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback rpcCallback = new BlockingRpcCallback<>(); - rowsService.visitCube(controller, request, rpcCallback); - CubeVisitResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - return response; - } - }, new Batch.Callback() { - @Override - public void update(byte[] region, byte[] row, CubeVisitResponse result) { - if (region == null) { - return; - } - - logger.info(logHeader + getStatsString(region, result)); - - Stats stats = result.getStats(); - queryContext.addAndGetScannedRows(stats.getScannedRowCount()); - queryContext.addAndGetScannedBytes(stats.getScannedBytes()); - - RuntimeException rpcException = null; - if (result.getStats().getNormalComplete() != 1) { - rpcException = getCoprocessorException(result); - } - queryContext.addRPCStatistics(storageContext.ctxId, stats.getHostname(), - cubeSeg.getCubeDesc().getName(), cubeSeg.getName(), cuboid.getInputID(), - cuboid.getId(), storageContext.getFilterMask(), rpcException, - stats.getServiceEndTime() - stats.getServiceStartTime(), 0, - stats.getScannedRowCount(), - stats.getScannedRowCount() - stats.getAggregatedRowCount() - - stats.getFilteredRowCount(), - stats.getAggregatedRowCount(), stats.getScannedBytes()); - - // if any other region has responded with error, skip further processing - if (regionErrorHolder.get() != null) { - return; - } - - // record coprocessor error if happened - if (rpcException != null) { - regionErrorHolder.compareAndSet(null, rpcException); - return; - } - - if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) { - throw new ResourceLimitExceededException("Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes()); - } - - try { - if (compressionResult) { - epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()))); - } else { - epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())); - } - } catch (IOException | DataFormatException e) { - throw new RuntimeException(logHeader + "Error when decompressing", e); - } - } - }); - - } catch (Throwable ex) { - logger.error(logHeader + "Error when visiting cubes by endpoint", ex); // double log coz the query thread may already timeout - epResultItr.notifyCoprocException(ex); - return; - } - - if (regionErrorHolder.get() != null) { - RuntimeException exception = regionErrorHolder.get(); - logger.error(logHeader + "Error when visiting cubes by endpoint", exception); // double log coz the query thread may already timeout - epResultItr.notifyCoprocException(exception); - } + runEPRange(queryContext, logHeader, compressionResult, builder.build(), conn, epRange.getFirst(), + epRange.getSecond(), epResultItr); } }); } @@ -263,6 +192,149 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr), storageContext); } + private void runEPRange(final QueryContext queryContext, final String logHeader, final boolean compressionResult, + final CubeVisitProtos.CubeVisitRequest request, final Connection conn, byte[] startKey, byte[] endKey, + final ExpectedSizeIterator epResultItr) { + + final String queryId = queryContext.getQueryId(); + + try { + final Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), + HBaseConnection.getCoprocessorPool()); + + table.coprocessorService(CubeVisitService.class, startKey, endKey, // + new Batch.Call() { + public CubeVisitResponse call(CubeVisitService rowsService) throws IOException { + if (queryContext.isStopped()) { + logger.warn( + "Query-{}: the query has been stopped, not send request to region server any more.", + queryId); + return null; + } + + HRegionLocation regionLocation = getStartRegionLocation(rowsService); + String regionServerName = regionLocation == null ? "UNKNOWN" : regionLocation.getHostname(); + logger.info("Query-{}: send request to the init region server {} on table {} ", queryId, + regionServerName, table.getName()); + + queryContext.addQueryStopListener(new QueryContext.QueryStopListener() { + private Thread hConnThread = Thread.currentThread(); + + @Override + public void stop(QueryContext query) { + try { + hConnThread.interrupt(); + } catch (Exception e) { + logger.warn("Exception happens during interrupt thread {} due to {}", + hConnThread.getName(), e); + } + } + }); + + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = new BlockingRpcCallback<>(); + try { + rowsService.visitCube(controller, request, rpcCallback); + CubeVisitResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + return response; + } catch (Exception e) { + throw e; + } finally { + // Reset the interrupted state + Thread.interrupted(); + } + } + + private HRegionLocation getStartRegionLocation(CubeVisitProtos.CubeVisitService rowsService) { + try { + CubeVisitProtos.CubeVisitService.Stub rowsServiceStub = (CubeVisitProtos.CubeVisitService.Stub) rowsService; + RegionCoprocessorRpcChannel channel = (RegionCoprocessorRpcChannel) rowsServiceStub + .getChannel(); + byte[] row = (byte[]) channelRowField.get(channel); + return conn.getRegionLocator(table.getName()).getRegionLocation(row, false); + } catch (Throwable throwable) { + logger.warn("error when get region server name", throwable); + } + return null; + } + }, new Batch.Callback() { + @Override + public void update(byte[] region, byte[] row, CubeVisitResponse result) { + if (result == null) { + return; + } + if (region == null) { + return; + } + + // if the query is stopped, skip further processing + // this may be caused by + // * Any other region has responded with error + // * ServerRpcController.failedOnException + // * ResourceLimitExceededException + // * Exception happened during CompressionUtils.decompress() + // * Outside exceptions, like KylinTimeoutException in SequentialCubeTupleIterator + if (queryContext.isStopped()) { + return; + } + + logger.info(logHeader + getStatsString(region, result)); + + Stats stats = result.getStats(); + queryContext.addAndGetScannedRows(stats.getScannedRowCount()); + queryContext.addAndGetScannedBytes(stats.getScannedBytes()); + + RuntimeException rpcException = null; + if (result.getStats().getNormalComplete() != 1) { + // record coprocessor error if happened + rpcException = getCoprocessorException(result); + } + queryContext.addRPCStatistics(storageContext.ctxId, stats.getHostname(), + cubeSeg.getCubeDesc().getName(), cubeSeg.getName(), cuboid.getInputID(), + cuboid.getId(), storageContext.getFilterMask(), rpcException, + stats.getServiceEndTime() - stats.getServiceStartTime(), 0, + stats.getScannedRowCount(), + stats.getScannedRowCount() - stats.getAggregatedRowCount() + - stats.getFilteredRowCount(), + stats.getAggregatedRowCount(), stats.getScannedBytes()); + + if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) { + rpcException = new ResourceLimitExceededException( + "Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + + cubeSeg.getConfig().getQueryMaxScanBytes()); + } + + if (rpcException != null) { + queryContext.stop(rpcException); + return; + } + + try { + if (compressionResult) { + epResultItr.append(CompressionUtils.decompress( + HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()))); + } else { + epResultItr.append( + HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())); + } + } catch (IOException | DataFormatException e) { + throw new RuntimeException(logHeader + "Error when decompressing", e); + } + } + }); + + } catch (Throwable ex) { + queryContext.stop(ex); + } + + if (queryContext.isStopped()) { + logger.error(logHeader + "Error when visiting cubes by endpoint", queryContext.getThrowable()); // double log coz the query thread may already timeout + } + } + private ByteString serializeGTScanReq(GTScanRequest scanRequest) { ByteString scanRequestByteString; int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE; diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java index 68075ae..09cc2c9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java @@ -24,19 +24,21 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.NotImplementedException; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.gridtable.GTScanRequest; import com.google.common.base.Throwables; class ExpectedSizeIterator implements Iterator { - private BlockingQueue queue; - private int expectedSize; + private final QueryContext queryContext; + private final int expectedSize; + private final BlockingQueue queue; + private final long coprocessorTimeout; + private final long deadline; private int current = 0; - private long coprocessorTimeout; - private long deadline; - private volatile Throwable coprocException; - public ExpectedSizeIterator(int expectedSize, long coprocessorTimeout) { + public ExpectedSizeIterator(QueryContext queryContext, int expectedSize, long coprocessorTimeout) { + this.queryContext = queryContext; this.expectedSize = expectedSize; this.queue = new ArrayBlockingQueue(expectedSize); @@ -59,14 +61,11 @@ class ExpectedSizeIterator implements Iterator { current++; byte[] ret = null; - while (ret == null && coprocException == null && deadline > System.currentTimeMillis()) { + while (ret == null && deadline > System.currentTimeMillis()) { + checkState(); ret = queue.poll(1000, TimeUnit.MILLISECONDS); } - if (coprocException != null) { - throw Throwables.propagate(coprocException); - } - if (ret == null) { throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + // GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + coprocessorTimeout + ") cannot support this many scans?"); @@ -85,6 +84,8 @@ class ExpectedSizeIterator implements Iterator { } public void append(byte[] data) { + checkState(); + try { queue.put(data); } catch (InterruptedException e) { @@ -93,7 +94,14 @@ class ExpectedSizeIterator implements Iterator { } } - public void notifyCoprocException(Throwable ex) { - coprocException = ex; + private void checkState() { + if (queryContext.isStopped()) { + Throwable throwable = queryContext.getThrowable(); + if (throwable != null) { + throw Throwables.propagate(throwable); + } else { + throw new IllegalStateException("the query is stopped: " + queryContext.getStopReason()); + } + } } } \ No newline at end of file -- 2.5.4 (Apple Git-61) From 9579724b5055092b626aef2cb45f23c6c3234bf2 Mon Sep 17 00:00:00 2001 From: Zhong Date: Mon, 4 Dec 2017 00:09:23 +0800 Subject: [PATCH 2/2] APACHE-KYLIN-2881: fix ci by controlling QueryContext lifecycle --- .../org/apache/kylin/query/ITKylinQueryTest.java | 4 +-- .../org/apache/kylin/query/ITMassInQueryTest.java | 4 +-- .../java/org/apache/kylin/query/KylinTestBase.java | 34 ++++++++++++++++------ 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index 60633c9..a08e198 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -154,7 +154,7 @@ public class ITKylinQueryTest extends KylinTestBase { String sql = getTextFromFile(sqlFile); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - executeQuery(kylinConn, queryFileName, sql, true); + execQueryUsingKylin(kylinConn, queryFileName, sql, true); } @Ignore @@ -395,7 +395,7 @@ public class ITKylinQueryTest extends KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql, false); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false); String queriedVersion = String.valueOf(kylinTable.getValue(0, "version")); // compare the result diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java index cca0be6..16395fc 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java @@ -118,7 +118,7 @@ public class ITMassInQueryTest extends KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort); printResult(kylinTable); } @@ -139,7 +139,7 @@ public class ITMassInQueryTest extends KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort); // execute H2 sql = sql.replace("massin(test_kylin_fact.SELLER_ID,'vip_customers')", "test_kylin_fact.SELLER_ID in ( " + org.apache.commons.lang.StringUtils.join(vipSellers, ",") + ")"); diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java index 0f11996..f3f4192 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java @@ -44,6 +44,7 @@ import java.util.logging.LogManager; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.QueryContextManager; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.common.util.Pair; @@ -223,10 +224,19 @@ public class KylinTestBase { // //////////////////////////////////////////////////////////////////////////////////////// // execute + private void initExecQueryUsingKylin(String sql) { + QueryContextManager.resetCurrent(); + QueryContextManager.current(); + } - protected ITable executeQuery(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort) + protected ITable execQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort) throws Exception { + initExecQueryUsingKylin(sql); + return executeQuery(dbConn, queryName, sql, needSort); + } + protected ITable executeQuery(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort) + throws Exception { // change join type to match current setting sql = changeJoinType(sql, joinType); @@ -246,6 +256,7 @@ public class KylinTestBase { } protected int executeQuery(String sql, boolean needDisplay) throws Exception { + initExecQueryUsingKylin(sql); // change join type to match current setting sql = changeJoinType(sql, joinType); @@ -284,9 +295,14 @@ public class KylinTestBase { } } - protected ITable executeDynamicQuery(IDatabaseConnection dbConn, String queryName, String sql, + protected ITable execDynamicQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql, List parameters, boolean needSort) throws Exception { + initExecQueryUsingKylin(sql); + return executeDynamicQuery(dbConn, queryName, sql, parameters, needSort); + } + protected ITable executeDynamicQuery(IDatabaseConnection dbConn, String queryName, String sql, + List parameters, boolean needSort) throws Exception { // change join type to match current setting sql = changeJoinType(sql, joinType); @@ -364,7 +380,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql, false); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false); // compare the result if (BackdoorToggles.getPrepareOnly()) @@ -408,7 +424,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql, false); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false); // compare the result assertTableEquals(expectTable, kylinTable); @@ -431,7 +447,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort); // execute H2 logger.info("Query Result from H2 - " + queryName); @@ -460,7 +476,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + sql); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, sql, sql, false); + ITable kylinTable = execQueryUsingKylin(kylinConn, sql, sql, false); try { // compare the result @@ -492,7 +508,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sqlWithLimit, false); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sqlWithLimit, false); // execute H2 logger.info("Query Result from H2 - " + queryName); @@ -543,7 +559,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql1, needSort); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql1, needSort); // execute H2 logger.info("Query Result from H2 - " + queryName); @@ -583,7 +599,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeDynamicQuery(kylinConn, queryName, sql, parameters, needSort); + ITable kylinTable = execDynamicQueryUsingKylin(kylinConn, queryName, sql, parameters, needSort); // execute H2 logger.info("Query Result from H2 - " + queryName); -- 2.5.4 (Apple Git-61)