From 40a6d9ee214ad4281441cd92d0297d8ca3fe5a25 Mon Sep 17 00:00:00 2001 From: murkrishn Date: Mon, 14 Sep 2015 23:51:39 -0700 Subject: [PATCH 1/2] In case Kylin is not able to serve result from cubes, the query is re-run in Hive Code and the result is returned on successful query execution. Apache Spark is used with Hive as the execution engine. Code updated with respect to https://issues.apache.org/jira/browse/KYLIN-742 --- .../java/org/apache/kylin/common/KylinConfig.java | 20 ++++++++ examples/test_case_data/sandbox/kylin.properties | 6 ++- .../apache/kylin/rest/service/QueryService.java | 56 +++++++++++++++++++++- 3 files changed, 79 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java index d3220ee..7271d25 100644 --- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -132,6 +132,12 @@ public class KylinConfig { public static final String HBASE_REGION_COUNT_MIN = "kylin.hbase.region.count.min"; public static final String HBASE_REGION_COUNT_MAX = "kylin.hbase.region.count.max"; + + // properties for JDBC Hive2 connection details + public static final String JDBC_HIVE2_URL = "kylin.jdbc.hive2.url"; + public static final String JDB_HIVE2_URL_DEFAULT = "jdbc:hive2://sandbox:10000"; + public static final String JDBC_HIVE2_USERNAME = "kylin.jdbc.hive2.username"; + public static final String JDBC_HIVE2_PASSWORD = "kylin.jdbc.hive2.password"; // static cached instances private static KylinConfig ENV_INSTANCE = null; @@ -265,6 +271,20 @@ public class KylinConfig { // ============================================================================ + // start: properties for JDBC Hive2 connection details + public String getJdbcHive2Url() { + return getOptional(JDBC_HIVE2_URL, "jdbc:hive2://sandbox:10000"); + } + + public String getJdbcHive2Username() { + return getOptional(JDBC_HIVE2_USERNAME, ""); + } + + public String getJdbcHive2Password() { + return getOptional(JDBC_HIVE2_PASSWORD, ""); + } + // end: properties for JDBC Hive2 connection details + public String getStorageUrl() { return storageUrl; } diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index b3ec4d0..8c703b4 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -129,5 +129,7 @@ ext.log.base.dir = /tmp/kylin_log1,/tmp/kylin_log2 #will set to kylin_query_log by default if not config here query.log.parse.result.table = kylin_query_log - - +## config for using hive on spark for querying ## +kylin.jdbc.hive2.url= +kylin.jdbc.hive2.username= +kylin.jdbc.hive2.password= diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java index 6e6d9da..37b4077 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -28,6 +28,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; +import java.sql.DriverManager; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -337,6 +338,8 @@ public class QueryService extends BasicService { List> results = new LinkedList>(); List columnMetas = new LinkedList(); + String driverName = "org.apache.hive.jdbc.HiveDriver"; + try { conn = getOLAPDataSource(sqlRequest.getProject()).getConnection(); @@ -372,7 +375,58 @@ public class QueryService extends BasicService { results.add(new LinkedList(oneRow)); oneRow.clear(); } - } finally { + } catch (SQLException sqlException) { + // unsuccessful statement execution, retry with Hive on Spark. Code modification as part of the jira https://issues.apache.org/jira/browse/KYLIN-742 + logger.error("exception in running the query: " + sql); + logger.debug("retrying query in hive"); + + try { + Class.forName(driverName); + } catch (ClassNotFoundException classNotFoundException) { + logger.error("exception in running the query on hive"); + classNotFoundException.printStackTrace(); + throw classNotFoundException; + } + + // configurations for connection to HiveServer2. + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + String hiveServerUrl = kylinConfig.getJdbcHive2Url(); + String hiveUserName = kylinConfig.getJdbcHive2Username(); + String hivePassword = kylinConfig.getJdbcHive2Password(); + logger.info("connecting to : " + hiveServerUrl + " for executing the query"); + + try { + + conn = DriverManager.getConnection(hiveServerUrl, hiveUserName, hivePassword); + Statement stmt = conn.createStatement(); + resultSet = stmt.executeQuery(sql); + + ResultSetMetaData metaData = resultSet.getMetaData(); + int columnCount = metaData.getColumnCount(); + + // Fill in selected column meta + for (int i = 1; i <= columnCount; ++i) { + columnMetas.add(new SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i), false, metaData.isCurrency(i), metaData.isNullable(i), false, metaData.getColumnDisplaySize(i), metaData.getColumnLabel(i), metaData.getColumnName(i), null, null, null, metaData.getPrecision(i), metaData.getScale(i), metaData.getColumnType(i), metaData.getColumnTypeName(i), metaData.isReadOnly(i), false, false)); + } + + List oneRow = new LinkedList(); + + // fill in results. + while (resultSet.next()) { + //logger.debug("resultSet value: " + resultSet.getString(1)); + for (int i = 0; i < columnCount; i++) { + oneRow.add((resultSet.getString(i + 1))); + } + + results.add(new LinkedList(oneRow)); + oneRow.clear(); + } + } catch (SQLException sqlException1) { + logger.error("exception in running the query on hive"); + sqlException1.printStackTrace(); + throw sqlException1; + } + } finally { close(resultSet, stat, conn); } -- 1.9.1 From 09cf4dcd6eda4fd97040f65083219e99b6dd5767 Mon Sep 17 00:00:00 2001 From: murkrishn Date: Sun, 20 Sep 2015 23:09:16 -0700 Subject: [PATCH 2/2] Code updated based on the review comments mentioned in https://issues.apache.org/jira/browse/KYLIN-742 excluding the usage of the embeeded hive server URL as default URL --- .../java/org/apache/kylin/common/KylinConfig.java | 33 +++-- examples/test_case_data/sandbox/kylin.properties | 12 +- .../apache/kylin/rest/service/QueryService.java | 77 ++++-------- .../apache/kylin/rest/util/HiveRerouteUtil.java | 133 +++++++++++++++++++++ 4 files changed, 186 insertions(+), 69 deletions(-) create mode 100644 server/src/main/java/org/apache/kylin/rest/util/HiveRerouteUtil.java diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java index 7271d25..72b6a65 100644 --- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -133,11 +133,14 @@ public class KylinConfig { public static final String HBASE_REGION_COUNT_MIN = "kylin.hbase.region.count.min"; public static final String HBASE_REGION_COUNT_MAX = "kylin.hbase.region.count.max"; - // properties for JDBC Hive2 connection details - public static final String JDBC_HIVE2_URL = "kylin.jdbc.hive2.url"; - public static final String JDB_HIVE2_URL_DEFAULT = "jdbc:hive2://sandbox:10000"; - public static final String JDBC_HIVE2_USERNAME = "kylin.jdbc.hive2.username"; - public static final String JDBC_HIVE2_PASSWORD = "kylin.jdbc.hive2.password"; + // property for Hive query rerouting enablement + public static final String KYLIN_ROUTE_HIVE_ENABLED = "kylin.route.hive.enabled"; + public static final boolean KYLIN_ROUTE_HIVE_ENABLED_DEFAULT = false; + // JDBC Hive connection details for query rerouting + public static final String KYLIN_ROUTE_HIVE_URL = "kylin.route.hive.url"; + public static final String KYLIN_ROUTE_HIVE_URL_DEFAULT = "jdbc:hive2://sandbox:10000"; + public static final String KYLIN_ROUTE_HIVE_USERNAME = "kylin.route.hive.username"; + public static final String KYLIN_ROUTE_HIVE_PASSWORD = "kylin.route.hive.password"; // static cached instances private static KylinConfig ENV_INSTANCE = null; @@ -271,19 +274,23 @@ public class KylinConfig { // ============================================================================ - // start: properties for JDBC Hive2 connection details - public String getJdbcHive2Url() { - return getOptional(JDBC_HIVE2_URL, "jdbc:hive2://sandbox:10000"); + // start: properties for Hive rerouting + public boolean isHiveReroutingEnabled() { + return Boolean.parseBoolean(getOptional(KYLIN_ROUTE_HIVE_ENABLED)); } - public String getJdbcHive2Username() { - return getOptional(JDBC_HIVE2_USERNAME, ""); + public String getHiveRerouteUrl() { + return getOptional(KYLIN_ROUTE_HIVE_URL, KYLIN_ROUTE_HIVE_URL_DEFAULT); } - public String getJdbcHive2Password() { - return getOptional(JDBC_HIVE2_PASSWORD, ""); + public String getHiveRerouteUsername() { + return getOptional(KYLIN_ROUTE_HIVE_USERNAME, ""); } - // end: properties for JDBC Hive2 connection details + + public String getHiveReroutePassword() { + return getOptional(KYLIN_ROUTE_HIVE_PASSWORD, ""); + } + // end: properties for JDBC Hive rerouting public String getStorageUrl() { return storageUrl; diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 8c703b4..2d47ee7 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -129,7 +129,11 @@ ext.log.base.dir = /tmp/kylin_log1,/tmp/kylin_log2 #will set to kylin_query_log by default if not config here query.log.parse.result.table = kylin_query_log -## config for using hive on spark for querying ## -kylin.jdbc.hive2.url= -kylin.jdbc.hive2.username= -kylin.jdbc.hive2.password= +##### Config for enabling query rerouting to Hive. Disabled by default. ###### +kylin.route.hive.enabled=false + +# If query rerouting is enabled, provide the hive configurations +# Default value for kylin.route.hive.url will be pointing to the embedded server (jdbc:hive2://) +kylin.route.hive.url= +kylin.route.hive.username= +kylin.route.hive.password= diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java index 37b4077..81905c8 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -28,7 +28,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; -import java.sql.DriverManager; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -41,6 +40,9 @@ import java.util.Set; import javax.sql.DataSource; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.calcite.sql.parser.impl.ParseException; +import org.apache.calcite.sql.validate.SqlValidatorException; import org.apache.calcite.avatica.ColumnMetaData.Rep; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.client.Get; @@ -63,6 +65,7 @@ import org.apache.kylin.rest.model.TableMeta; import org.apache.kylin.rest.request.PrepareSqlRequest; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; +import org.apache.kylin.rest.util.HiveRerouteUtil; import org.apache.kylin.rest.util.QueryUtil; import org.apache.kylin.rest.util.Serializer; import org.slf4j.Logger; @@ -338,8 +341,6 @@ public class QueryService extends BasicService { List> results = new LinkedList>(); List columnMetas = new LinkedList(); - String driverName = "org.apache.hive.jdbc.HiveDriver"; - try { conn = getOLAPDataSource(sqlRequest.getProject()).getConnection(); @@ -377,56 +378,28 @@ public class QueryService extends BasicService { } } catch (SQLException sqlException) { // unsuccessful statement execution, retry with Hive on Spark. Code modification as part of the jira https://issues.apache.org/jira/browse/KYLIN-742 - logger.error("exception in running the query: " + sql); - logger.debug("retrying query in hive"); - - try { - Class.forName(driverName); - } catch (ClassNotFoundException classNotFoundException) { - logger.error("exception in running the query on hive"); - classNotFoundException.printStackTrace(); - throw classNotFoundException; - } - - // configurations for connection to HiveServer2. + boolean isExpectedCause = (ExceptionUtils.getRootCause(sqlException).getClass().equals(SqlValidatorException.class)) || (ExceptionUtils.getRootCause(sqlException).getClass().equals(ParseException.class)); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - String hiveServerUrl = kylinConfig.getJdbcHive2Url(); - String hiveUserName = kylinConfig.getJdbcHive2Username(); - String hivePassword = kylinConfig.getJdbcHive2Password(); - logger.info("connecting to : " + hiveServerUrl + " for executing the query"); - - try { - - conn = DriverManager.getConnection(hiveServerUrl, hiveUserName, hivePassword); - Statement stmt = conn.createStatement(); - resultSet = stmt.executeQuery(sql); - - ResultSetMetaData metaData = resultSet.getMetaData(); - int columnCount = metaData.getColumnCount(); - - // Fill in selected column meta - for (int i = 1; i <= columnCount; ++i) { - columnMetas.add(new SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i), false, metaData.isCurrency(i), metaData.isNullable(i), false, metaData.getColumnDisplaySize(i), metaData.getColumnLabel(i), metaData.getColumnName(i), null, null, null, metaData.getPrecision(i), metaData.getScale(i), metaData.getColumnType(i), metaData.getColumnTypeName(i), metaData.isReadOnly(i), false, false)); - } - - List oneRow = new LinkedList(); - - // fill in results. - while (resultSet.next()) { - //logger.debug("resultSet value: " + resultSet.getString(1)); - for (int i = 0; i < columnCount; i++) { - oneRow.add((resultSet.getString(i + 1))); - } - - results.add(new LinkedList(oneRow)); - oneRow.clear(); - } - } catch (SQLException sqlException1) { - logger.error("exception in running the query on hive"); - sqlException1.printStackTrace(); - throw sqlException1; - } - } finally { + if (isExpectedCause && kylinConfig.isHiveReroutingEnabled()) { + logger.debug("query rerouting option enabled for Kylin"); + // running query to hive + HiveRerouteUtil rerouteUtil = new HiveRerouteUtil(); + try { + conn = rerouteUtil.createConnection(kylinConfig.getHiveRerouteUrl(), kylinConfig.getHiveRerouteUsername(), kylinConfig.getHiveReroutePassword()); + resultSet = rerouteUtil.executQuery(conn, sql); + columnMetas = rerouteUtil.extractColumnMetadata(resultSet, columnMetas); + results = rerouteUtil.extractResults(resultSet, results); + } catch (Exception exception) { + logger.error("exception in re-routing the query to hive", exception); + throw exception; + } finally { + rerouteUtil.closeConnection(conn); + } + } else { + logger.error("exception in running the query: " + sql); + throw sqlException; + } + } finally { close(resultSet, stat, conn); } diff --git a/server/src/main/java/org/apache/kylin/rest/util/HiveRerouteUtil.java b/server/src/main/java/org/apache/kylin/rest/util/HiveRerouteUtil.java new file mode 100644 index 0000000..ca1e048 --- /dev/null +++ b/server/src/main/java/org/apache/kylin/rest/util/HiveRerouteUtil.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.util; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.LinkedList; +import java.util.List; + +import org.apache.kylin.rest.model.SelectedColumnMeta; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** @author murkrishn **/ +public class HiveRerouteUtil { + + private static final Logger logger = LoggerFactory.getLogger(HiveRerouteUtil.class); + public static final String driverName = "org.apache.hive.jdbc.HiveDriver"; + + /** + * Create a connection to the Hive server by passing the required connection parameters. + * @param connectionURL: JDBC URL to connect to the Hive server + * @param username: Username to connect with (optional) + * @param password: Password to connect with (optional) + * @return: Connection object to the Hive server + * @throws Exception + */ + public Connection createConnection (String connectionURL, String username, String password) throws Exception { + logger.info("rerouting to : " + connectionURL + " for executing the query"); + + try { + Class.forName(driverName); + } catch (ClassNotFoundException classNotFoundException) { + throw classNotFoundException; + } + + Connection connection = DriverManager.getConnection(connectionURL, username, password); + return connection; + } + + /** + * Close the connection to the Hive server. + * @param connection: Connection object to be closed + */ + public void closeConnection(Connection connection) { + if (null != connection) { + try { + connection.close(); + } catch (SQLException sqlException) { + logger.error("failed to close connection", sqlException); + } + } + } + + /** + * Execute a query in Hive. + * @param connection: Connection object to the Hive server + * @param query: Query to be executed + * @return: ResultSet object of the query executed + * @throws Exception + */ + public ResultSet executQuery (Connection connection, String query) throws Exception { + Statement statement = null; + ResultSet resultSet = null; + + try { + statement = connection.createStatement(); + resultSet = statement.executeQuery(query); + return resultSet; + } catch (SQLException sqlException) { + throw sqlException; + } + } + + public List extractColumnMetadata (ResultSet resultSet, List columnMetas) throws SQLException { + ResultSetMetaData metaData = null; + int columnCount = 0; + + try { + metaData = resultSet.getMetaData(); + columnCount = metaData.getColumnCount(); + + // fill in selected column meta + for (int i = 1; i <= columnCount; ++i) { + columnMetas.add(new SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i), false, metaData.isCurrency(i), metaData.isNullable(i), false, metaData.getColumnDisplaySize(i), metaData.getColumnLabel(i), metaData.getColumnName(i), null, null, null, metaData.getPrecision(i), metaData.getScale(i), metaData.getColumnType(i), metaData.getColumnTypeName(i), metaData.isReadOnly(i), false, false)); + } + + return columnMetas; + } catch (SQLException sqlException) { + throw sqlException; + } + } + + public List> extractResults (ResultSet resultSet, List> results) throws SQLException { + List oneRow = new LinkedList(); + + try { + while (resultSet.next()) { + //logger.debug("resultSet value: " + resultSet.getString(1)); + for (int i = 0; i < resultSet.getMetaData().getColumnCount(); i++) { + oneRow.add((resultSet.getString(i + 1))); + } + + results.add(new LinkedList(oneRow)); + oneRow.clear(); + } + + return results; + } catch (SQLException sqlException) { + throw sqlException; + } + } +} \ No newline at end of file -- 1.9.1