From 9172a5303daa3366be3a2978e75b4bb1d47ad07d Mon Sep 17 00:00:00 2001 From: etherge Date: Wed, 22 Nov 2017 00:26:49 +0800 Subject: [PATCH] KYLIN-3044, support SQLServer as kylin data source --- .../org/apache/kylin/common/KylinConfigBase.java | 31 ++- .../src/main/resources/kylin-defaults.properties | 9 + .../java/org/apache/kylin/job/JoinedFlatTable.java | 45 ++-- pom.xml | 2 +- source-hive/pom.xml | 25 ++ .../org/apache/kylin/source/jdbc/JdbcDialect.java | 26 ++ .../org/apache/kylin/source/jdbc/JdbcExplorer.java | 288 +++++++++------------ .../apache/kylin/source/jdbc/JdbcHiveMRInput.java | 129 +++++++-- .../apache/kylin/source/jdbc/JdbcTableReader.java | 32 ++- .../java/org/apache/kylin/source/jdbc/SqlUtil.java | 140 +++++++--- .../source/jdbc/metadata/DefaultJdbcMetadata.java | 76 ++++++ .../kylin/source/jdbc/metadata/IJdbcMetadata.java | 33 +++ .../source/jdbc/metadata/JdbcMetadataFactory.java | 35 +++ .../source/jdbc/metadata/MySQLJdbcMetadata.java | 69 +++++ .../jdbc/metadata/SQLServerJdbcMetadata.java | 61 +++++ .../apache/kylin/source/jdbc/JdbcExplorerTest.java | 156 +++++++++++ .../org/apache/kylin/source/jdbc/SqlUtilTest.java | 46 ++++ .../jdbc/metadata/DefaultJdbcMetadataTest.java | 126 +++++++++ .../jdbc/metadata/JdbcMetadataFactoryTest.java | 35 +++ .../jdbc/metadata/MySQLJdbcMetadataTest.java | 104 ++++++++ .../jdbc/metadata/SQLServerJdbcMetadataTest.java | 68 +++++ 21 files changed, 1258 insertions(+), 278 deletions(-) create mode 100644 source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java create mode 100644 source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java create mode 100644 source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java create mode 100644 source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java create mode 100644 source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java create mode 100644 source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java create mode 100644 source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java create mode 100644 source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java create mode 100644 source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java create mode 100644 source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java create mode 100644 source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java create mode 100644 source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadataTest.java diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 3d67ee3..b2368ce 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -287,7 +287,7 @@ abstract public class KylinConfigBase implements Serializable { r.putAll(getPropertiesByPrefix("kylin.metadata.resource-store-provider.")); // note the naming convention -- http://kylin.apache.org/development/coding_naming_convention.html return r; } - + public String getDataModelImpl() { return getOptional("kylin.metadata.data-model-impl", null); } @@ -295,7 +295,7 @@ abstract public class KylinConfigBase implements Serializable { public String getDataModelManagerImpl() { return getOptional("kylin.metadata.data-model-manager-impl", null); } - + public String[] getRealizationProviders() { return getOptionalStringArray("kylin.metadata.realization-providers", // new String[] { "org.apache.kylin.cube.CubeManager", "org.apache.kylin.storage.hybrid.HybridManager" }); @@ -314,7 +314,7 @@ abstract public class KylinConfigBase implements Serializable { "org.apache.kylin.storage.hbase.util.ZookeeperDistributedLock$Factory"); return (DistributedLockFactory) ClassUtil.newInstance(clsName); } - + public String getHBaseMappingAdapter() { return getOptional("kylin.metadata.hbasemapping-adapter"); } @@ -431,11 +431,11 @@ abstract public class KylinConfigBase implements Serializable { // ============================================================================ // Cube Planner // ============================================================================ - + public boolean isCubePlannerEnabled() { return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled", "false")); } - + public boolean isCubePlannerEnabledForExistingCube() { return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled-for-existing-cube", "false")); } @@ -724,23 +724,23 @@ abstract public class KylinConfigBase implements Serializable { // SOURCE.JDBC // ============================================================================ - public String getJdbcConnectionUrl() { + public String getJdbcSourceConnectionUrl() { return getOptional("kylin.source.jdbc.connection-url"); } - public String getJdbcDriver() { + public String getJdbcSourceDriver() { return getOptional("kylin.source.jdbc.driver"); } - public String getJdbcDialect() { + public String getJdbcSourceDialect() { return getOptional("kylin.source.jdbc.dialect"); } - public String getJdbcUser() { + public String getJdbcSourceUser() { return getOptional("kylin.source.jdbc.user"); } - public String getJdbcPass() { + public String getJdbcSourcePass() { return getOptional("kylin.source.jdbc.pass"); } @@ -748,6 +748,14 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.source.jdbc.sqoop-home"); } + public int getSqoopMapperNum() { + return Integer.parseInt(getOptional("kylin.source.jdbc.sqoop-mapper-num", "4")); + } + + public String getFieldDelimiter() { + return getOptional("kylin.source.jdbc.field-delimiter", "|"); + } + // ============================================================================ // STORAGE.HBASE // ============================================================================ @@ -1008,7 +1016,6 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-uhc-dict", "true")); } - public boolean isBuildDictInReducerEnabled() { return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-dict-in-reducer", "true")); } @@ -1279,7 +1286,7 @@ abstract public class KylinConfigBase implements Serializable { public int getServerUserCacheMaxEntries() { return Integer.valueOf(this.getOptional("kylin.server.auth-user-cache.max-entries", "100")); } - + public String getExternalAclProvider() { return getOptional("kylin.server.external-acl-provider", ""); } diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index 1602087..475deb3 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -290,3 +290,12 @@ kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false #kylin.query.pushdown.jdbc.pool-max-total=8 #kylin.query.pushdown.jdbc.pool-max-idle=8 #kylin.query.pushdown.jdbc.pool-min-idle=0 + +### JDBC Data Source +#kylin.source.jdbc.connection-url= +#kylin.source.jdbc.driver= +#kylin.source.jdbc.dialect= +#kylin.source.jdbc.user= +#kylin.source.jdbc.pass= +#kylin.source.jdbc.sqoop-home= +#kylin.source.jdbc.filed-delimiter=| diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 9593718..0ab3d3d 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.job.engine.JobEngineConfig; @@ -59,7 +60,7 @@ public class JoinedFlatTable { } public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir, - String format) { + String format, String filedDelimiter) { StringBuilder ddl = new StringBuilder(); ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getTableName() + "\n"); @@ -74,7 +75,7 @@ public class JoinedFlatTable { } ddl.append(")" + "\n"); if ("TEXTFILE".equals(format)) { - ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n"); + ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '" + filedDelimiter + "'\n"); } ddl.append("STORED AS " + format + "\n"); ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + "';").append("\n"); @@ -82,6 +83,11 @@ public class JoinedFlatTable { return ddl.toString(); } + public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir, + String format) { + return generateCreateTableStatement(flatDesc, storageDfsDir, format, "|"); + } + public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) { StringBuilder ddl = new StringBuilder(); ddl.append("DROP TABLE IF EXISTS " + flatDesc.getTableName() + ";").append("\n"); @@ -94,7 +100,7 @@ public class JoinedFlatTable { if (null == segment) { kylinConfig = KylinConfig.getInstanceFromEnv(); } else { - kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig(); + kylinConfig = (flatDesc.getSegment()).getConfig(); } if (kylinConfig.isAdvancedFlatTableUsed()) { @@ -210,15 +216,12 @@ public class JoinedFlatTable { private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) { final String sep = singleLine ? " " : "\n"; - boolean hasCondition = false; StringBuilder whereBuilder = new StringBuilder(); - whereBuilder.append("WHERE"); + whereBuilder.append("WHERE 1=1"); DataModelDesc model = flatDesc.getDataModel(); - - if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) { - whereBuilder.append(" (").append(model.getFilterCondition()).append(") "); - hasCondition = true; + if (StringUtils.isNotEmpty(model.getFilterCondition())) { + whereBuilder.append(" AND (").append(model.getFilterCondition()).append(") "); } if (flatDesc.getSegment() != null) { @@ -227,18 +230,15 @@ public class JoinedFlatTable { SegmentRange segRange = flatDesc.getSegRange(); if (segRange != null && !segRange.isInfinite()) { - whereBuilder.append(hasCondition ? " AND (" : " ("); + whereBuilder.append(" AND ("); whereBuilder.append( partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, segRange)); whereBuilder.append(")" + sep); - hasCondition = true; } } } - if (hasCondition) { - sql.append(whereBuilder.toString()); - } + sql.append(whereBuilder.toString()); } private static String colName(TblColRef col) { @@ -246,10 +246,19 @@ public class JoinedFlatTable { } private static String getHiveDataType(String javaDataType) { - String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType; - hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType; + String originDataType = javaDataType.toLowerCase(); + String hiveDataType; + if (originDataType.startsWith("varchar")) { + hiveDataType = "string"; + } else if (originDataType.startsWith("integer")) { + hiveDataType = "int"; + } else if (originDataType.startsWith("bigint")) { + hiveDataType = "bigint"; + } else { + hiveDataType = originDataType; + } - return hiveDataType.toLowerCase(); + return hiveDataType; } public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc) { @@ -267,4 +276,4 @@ public class JoinedFlatTable { return sql.toString(); } -} +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index c85974a..7a40076 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ 9.2.20.v20161216 0.3.1 2.7.14 - + 1.7.0 3.4 diff --git a/source-hive/pom.xml b/source-hive/pom.xml index 9a4d537..b9f87ee 100644 --- a/source-hive/pom.xml +++ b/source-hive/pom.xml @@ -77,6 +77,31 @@ junit test + + org.powermock + powermock-module-junit4 + ${powermock.version} + test + + + org.powermock + powermock-core + ${powermock.version} + test + + + org.powermock + powermock-api-mockito + ${powermock.version} + test + + + org.mockito + mockito-core + + 1.10.19 + test + diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java new file mode 100644 index 0000000..7e5ecee --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.jdbc; + +public class JdbcDialect { + public static final String DIALECT_VERTICA = "vertica"; + public static final String DIALECT_ORACLE = "oracle"; + public static final String DIALECT_MYSQL = "mysql"; + public static final String DIALECT_HIVE = "hive"; + public static final String DIALECT_MSSQL = "mssql"; +} diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java index 736cf2e..1278128 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -31,74 +32,153 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.DBUtils; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.source.ISampleDataDeployer; import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata; +import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeployer { private static final Logger logger = LoggerFactory.getLogger(JdbcExplorer.class); - - public static final String DIALECT_VERTICA="vertica"; - public static final String DIALECT_ORACLE="oracle"; - public static final String DIALECT_MYSQL="mysql"; - public static final String DIALECT_HIVE="hive"; - - public static final String TABLE_TYPE_TABLE="TABLE"; - public static final String TABLE_TYPE_VIEW="VIEW"; - - private KylinConfig config; - private DBConnConf dbconf; - private String dialect; + + private final KylinConfig config; + private final String dialect; + private final DBConnConf dbconf; + private final IJdbcMetadata jdbcMetadataDialect; public JdbcExplorer() { config = KylinConfig.getInstanceFromEnv(); - String connectionUrl = config.getJdbcConnectionUrl(); - String driverClass = config.getJdbcDriver(); - String jdbcUser = config.getJdbcUser(); - String jdbcPass = config.getJdbcPass(); - dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); - this.dialect = config.getJdbcDialect(); + String connectionUrl = config.getJdbcSourceConnectionUrl(); + String driverClass = config.getJdbcSourceDriver(); + String jdbcUser = config.getJdbcSourceUser(); + String jdbcPass = config.getJdbcSourcePass(); + this.dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); + this.dialect = config.getJdbcSourceDialect(); + this.jdbcMetadataDialect = JdbcMetadataFactory.getJdbcMetadata(dialect, dbconf); + } + + @Override + public List listDatabases() throws SQLException { + return jdbcMetadataDialect.listDatabases(); + } + + @Override + public List listTables(String schema) throws SQLException { + return jdbcMetadataDialect.listTables(schema); + } + + @Override + public Pair loadTableMetadata(String database, String table, String prj) + throws SQLException { + TableDesc tableDesc = new TableDesc(); + tableDesc.setDatabase(database.toUpperCase()); + tableDesc.setName(table.toUpperCase()); + tableDesc.setUuid(UUID.randomUUID().toString()); + tableDesc.setLastModified(0); + tableDesc.setSourceType(ISourceAware.ID_JDBC); + + Connection con = SqlUtil.getConnection(dbconf); + DatabaseMetaData dbmd = con.getMetaData(); + + try (ResultSet rs = jdbcMetadataDialect.getTable(dbmd, database, table)) { + String tableType = null; + while (rs.next()) { + tableType = rs.getString("TABLE_TYPE"); + } + if (tableType != null) { + tableDesc.setTableType(tableType); + } else { + throw new RuntimeException(String.format("table %s not found in schema:%s", table, database)); + } + } + + List columns = new ArrayList<>(); + try (ResultSet rs = jdbcMetadataDialect.listColumns(dbmd, database, table)) { + while (rs.next()) { + String cname = rs.getString("COLUMN_NAME"); + int type = rs.getInt("DATA_TYPE"); + int csize = rs.getInt("COLUMN_SIZE"); + int digits = rs.getInt("DECIMAL_DIGITS"); + int pos = rs.getInt("ORDINAL_POSITION"); + String remarks = rs.getString("REMARKS"); + + ColumnDesc cdesc = new ColumnDesc(); + cdesc.setName(cname.toUpperCase()); + + String kylinType = SqlUtil.jdbcTypetoKylinDataType(type); + int precision = (SqlUtil.isPrecisionApplicable(kylinType) && csize > 0) ? csize : -1; + int scale = (SqlUtil.isScaleApplicable(kylinType) && digits > 0) ? digits : -1; + + cdesc.setDatatype(new DataType(kylinType, precision, scale).toString()); + cdesc.setId(String.valueOf(pos)); + cdesc.setComment(remarks); + columns.add(cdesc); + } + } finally { + DBUtils.closeQuietly(con); + } + + tableDesc.setColumns(columns.toArray(new ColumnDesc[columns.size()])); + + TableExtDesc tableExtDesc = new TableExtDesc(); + tableExtDesc.setIdentity(tableDesc.getIdentity()); + tableExtDesc.setUuid(UUID.randomUUID().toString()); + tableExtDesc.setLastModified(0); + tableExtDesc.init(prj); + + return Pair.newPair(tableDesc, tableExtDesc); } - + private String getSqlDataType(String javaDataType) { - if (DIALECT_VERTICA.equals(dialect)){ - if (javaDataType.toLowerCase().equals("double")){ + if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MSSQL.equals(dialect)) { + if (javaDataType.toLowerCase().equals("double")) { return "float"; } } return javaDataType.toLowerCase(); } - + @Override public void createSampleDatabase(String database) throws Exception { executeSQL(generateCreateSchemaSql(database)); } - private String generateCreateSchemaSql(String schemaName){ - if (DIALECT_VERTICA.equals(dialect)){ + private String generateCreateSchemaSql(String schemaName) { + if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MYSQL.equals(dialect)) { return String.format("CREATE schema IF NOT EXISTS %s", schemaName); - }else{ + } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) { + return String.format("IF NOT EXISTS (SELECT name FROM sys.schemas WHERE name = N'%s') EXEC('CREATE SCHEMA" + + " [%s] AUTHORIZATION [dbo]')", schemaName, schemaName); + } else { logger.error(String.format("unsupported dialect %s.", dialect)); return null; } } - + @Override public void loadSampleData(String tableName, String tmpDataDir) throws Exception { executeSQL(generateLoadDataSql(tableName, tmpDataDir)); } private String generateLoadDataSql(String tableName, String tableFileDir) { - if (DIALECT_VERTICA.equals(dialect)){ - return String.format("copy %s from local '%s/%s.csv' delimiter as ',';", tableName, tableFileDir, tableName); - }else{ + if (JdbcDialect.DIALECT_VERTICA.equals(dialect)) { + return String.format("copy %s from local '%s/%s.csv' delimiter as ',';", tableName, tableFileDir, + tableName); + } else if (JdbcDialect.DIALECT_MYSQL.equals(dialect)) { + return String.format("LOAD DATA INFILE '%s/%s.csv' INTO %s FIELDS TERMINATED BY ',';", tableFileDir, + tableName, tableName); + } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) { + return String.format("BULK INSERT %s FROM '%s/%s.csv' WITH(FIELDTERMINATOR = ',')", tableName, tableFileDir, + tableName); + } else { logger.error(String.format("unsupported dialect %s.", dialect)); return null; } @@ -111,7 +191,8 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye private String[] generateCreateTableSql(TableDesc tableDesc) { logger.info(String.format("gen create table sql:%s", tableDesc)); - String tableIdentity = String.format("%s.%s", tableDesc.getDatabase().toUpperCase(), tableDesc.getName()).toUpperCase(); + String tableIdentity = String.format("%s.%s", tableDesc.getDatabase().toUpperCase(), tableDesc.getName()) + .toUpperCase(); String dropsql = "DROP TABLE IF EXISTS " + tableIdentity; String dropsql2 = "DROP VIEW IF EXISTS " + tableIdentity; @@ -147,157 +228,20 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye return new String[] { dropView, dropTable, createSql }; } - private void executeSQL(String sql) throws CommandNeedRetryException, IOException { + private void executeSQL(String sql) throws CommandNeedRetryException, IOException, SQLException { Connection con = SqlUtil.getConnection(dbconf); logger.info(String.format(sql)); SqlUtil.execUpdateSQL(con, sql); - SqlUtil.closeResources(con, null); + DBUtils.closeQuietly(con); } - private void executeSQL(String[] sqls) throws CommandNeedRetryException, IOException { + private void executeSQL(String[] sqls) throws CommandNeedRetryException, IOException, SQLException { Connection con = SqlUtil.getConnection(dbconf); - for (String sql : sqls){ + for (String sql : sqls) { logger.info(String.format(sql)); SqlUtil.execUpdateSQL(con, sql); } - SqlUtil.closeResources(con, null); - } - - @Override - public List listDatabases() throws Exception { - Connection con = SqlUtil.getConnection(dbconf); - DatabaseMetaData dbmd = con.getMetaData(); - ResultSet rs = dbmd.getSchemas(); - List ret = new ArrayList(); - /* - The schema columns are: - - TABLE_SCHEM String => schema name - - TABLE_CATALOG String => catalog name (may be null) - */ - while (rs.next()){ - String schema = rs.getString(1); - String catalog = rs.getString(2); - logger.info(String.format("%s,%s", schema, catalog)); - ret.add(schema); - } - SqlUtil.closeResources(con, null); - return ret; - } - - @Override - public List listTables(String database) throws Exception { - Connection con = SqlUtil.getConnection(dbconf); - DatabaseMetaData dbmd = con.getMetaData(); - ResultSet rs = dbmd.getTables(null, database, null, null); - List ret = new ArrayList(); - /* - - TABLE_CAT String => table catalog (may be null) - - TABLE_SCHEM String => table schema (may be null) - - TABLE_NAME String => table name - - TABLE_TYPE String => table type. Typical types are "TABLE", "VIEW", "SYSTEM TABLE", "GLOBAL - TEMPORARY", "LOCAL TEMPORARY", "ALIAS", "SYNONYM". - - REMARKS String => explanatory comment on the table - - TYPE_CAT String => the types catalog (may be null) - - TYPE_SCHEM String => the types schema (may be null) - - TYPE_NAME String => type name (may be null) - - SELF_REFERENCING_COL_NAME String => name of the designated "identifier" column of a typed - table (may be null) - - REF_GENERATION String => specifies how values in SELF_REFERENCING_COL_NAME are created. - Values are "SYSTEM", "USER", "DERIVED". (may be null) - */ - while (rs.next()){ - String catalog = rs.getString(1); - String schema = rs.getString(2); - String name = rs.getString(3); - String type = rs.getString(4); - logger.info(String.format("%s,%s,%s,%s", schema, catalog, name, type)); - ret.add(name); - } - SqlUtil.closeResources(con, null); - return ret; - } - - @Override - public Pair loadTableMetadata(String database, String table, String prj) throws Exception { - - TableDesc tableDesc = new TableDesc(); - tableDesc.setDatabase(database.toUpperCase()); - tableDesc.setName(table.toUpperCase()); - tableDesc.setUuid(UUID.randomUUID().toString()); - tableDesc.setLastModified(0); - - Connection con = SqlUtil.getConnection(dbconf); - DatabaseMetaData dbmd = con.getMetaData(); - ResultSet rs = dbmd.getTables(null, database, table, null); - String tableType=null; - while (rs.next()){ - tableType = rs.getString(4); - } - DBUtils.closeQuietly(rs); - if (tableType!=null){ - tableDesc.setTableType(tableType); - }else{ - logger.error(String.format("table %s not found in schema:%s", table, database)); - } - /* - - 1. TABLE_CAT String => table catalog (may be null) - - 2. TABLE_SCHEM String => table schema (may be null) - - 3. TABLE_NAME String => table name - - 4. COLUMN_NAME String => column name - - 5. DATA_TYPE int => SQL type from java.sql.Types - - 6. TYPE_NAME String => Data source dependent type name, for a UDT the type name is fully qualified - - 7. COLUMN_SIZE int => column size. - - 8. BUFFER_LENGTH is not used. - - 9. DECIMAL_DIGITS int => the number of fractional digits. Null is returned for data types where DECIMAL_DIGITS is not applicable. - - 10.NUM_PREC_RADIX int => Radix (typically either 10 or 2) - - 11.NULLABLE int => is NULL allowed. - - columnNoNulls - might not allow NULL values - - columnNullable - definitely allows NULL values - - columnNullableUnknown - nullability unknown - - 12.REMARKS String => comment describing column (may be null) - - 13.COLUMN_DEF String => default value for the column, which should be interpreted as a string when the value is enclosed in single quotes (may be null) - - 14.SQL_DATA_TYPE int => unused - - 15.SQL_DATETIME_SUB int => unused - - 16.CHAR_OCTET_LENGTH int => for char types the maximum number of bytes in the column - - 17.ORDINAL_POSITION int => index of column in table (starting at 1) - - 18.IS_NULLABLE String => ISO rules are used to determine the nullability for a column. - - YES --- if the column can include NULLs - - NO --- if the column cannot include NULLs - - empty string --- if the nullability for the column is unknown - */ - List columns = new ArrayList(); - rs = dbmd.getColumns(null, database, table, null); - while (rs.next()){ - String tname = rs.getString(3); - String cname = rs.getString(4); - int type=rs.getInt(5); - String typeName=rs.getString(6); - int csize=rs.getInt(7); - int digits = rs.getInt(9); - int nullable = rs.getInt(11); - String comment = rs.getString(12); - int pos = rs.getInt(17); - logger.info(String.format("%s,%s,%d,%d,%d,%d,%s,%d", tname, cname, type, csize, digits, nullable, comment, pos)); - - ColumnDesc cdesc = new ColumnDesc(); - cdesc.setName(cname.toUpperCase()); - // use "double" in kylin for "float" - cdesc.setDatatype(typeName); - cdesc.setId(String.valueOf(pos)); - columns.add(cdesc); - } - DBUtils.closeQuietly(rs); DBUtils.closeQuietly(con); - - tableDesc.setColumns(columns.toArray(new ColumnDesc[columns.size()])); - - TableExtDesc tableExtDesc = new TableExtDesc(); - tableExtDesc.setIdentity(tableDesc.getIdentity()); - tableExtDesc.setUuid(UUID.randomUUID().toString()); - tableExtDesc.setLastModified(0); - tableExtDesc.init(prj); - - return Pair.newPair(tableDesc, tableExtDesc); } @Override diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java index ddd38db..e83518a 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java @@ -18,6 +18,8 @@ package org.apache.kylin.source.jdbc; +import java.util.List; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; @@ -25,21 +27,27 @@ import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.PartitionDesc; +import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats; +import org.apache.kylin.metadata.model.TableRef; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.hive.HiveMRInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class JdbcHiveMRInput extends HiveMRInput { - + private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class); - + public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { return new BatchCubingInputSide(flatDesc); } public static class BatchCubingInputSide extends HiveMRInput.BatchCubingInputSide { - + public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { super(flatDesc); } @@ -49,42 +57,123 @@ public class JdbcHiveMRInput extends HiveMRInput { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); final String jobWorkingDir = getJobWorkingDir(jobFlow); - + jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, cubeName)); jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir)); } private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) { final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); - final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir, "TEXTFILE"); - + KylinConfig config = KylinConfig.getInstanceFromEnv(); + String filedDelimiter = config.getFieldDelimiter(); + // Sqoop does not support exporting SEQUENSEFILE to Hive now SQOOP-869 + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir, + "TEXTFILE", filedDelimiter); + HiveCmdStep step = new HiveCmdStep(); step.setCmd(hiveInitStatements + dropTableHql + createTableHql); return step; } - + + /** + * Choose a better split-by column for sqoop. The strategy is: + * 1. Prefer ClusteredBy column + * 2. Prefer DistributedBy column + * 3. Prefer Partition date column + * 4. Prefer Higher cardinality column + * 5. Prefer numeric column + * 6. Pick a column at first glance + * @return A column reference TblColReffor sqoop split-by + */ + private TblColRef determineSplitColumn() { + if (null != flatDesc.getClusterBy()) { + return flatDesc.getClusterBy(); + } + if (null != flatDesc.getDistributedBy()) { + return flatDesc.getDistributedBy(); + } + PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc(); + if (partitionDesc.isPartitioned()) { + return partitionDesc.getPartitionDateColumnRef(); + } + TblColRef splitColumn = null; + TableMetadataManager tblManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); + long maxCardinality = 0; + for (TableRef tableRef : flatDesc.getDataModel().getAllTables()) { + TableExtDesc tableExtDesc = tblManager.getTableExt(tableRef.getTableDesc()); + List columnStatses = tableExtDesc.getColumnStats(); + if (!columnStatses.isEmpty()) { + for (TblColRef colRef : tableRef.getColumns()) { + long cardinality = columnStatses.get(colRef.getColumnDesc().getZeroBasedIndex()) + .getCardinality(); + splitColumn = cardinality > maxCardinality ? colRef : splitColumn; + } + } + } + if (null == splitColumn) { + for (TblColRef colRef : flatDesc.getAllColumns()) { + if (colRef.getType().isIntegerFamily()) { + return colRef; + } + } + splitColumn = flatDesc.getAllColumns().get(0); + } + + return splitColumn; + } + private AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) { - KylinConfig config = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig(); - String partCol = flatDesc.getDataModel().getPartitionDesc().getPartitionDateColumn();//tablename.colname + KylinConfig config = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName) + .getConfig(); + PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc(); + String partCol = null; + String partitionString = null; + + if (partitionDesc.isPartitioned()) { + partCol = partitionDesc.getPartitionDateColumn();//tablename.colname + partitionString = partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc, + flatDesc.getSegRange()); + } + + String splitTable; + String splitColumn; + String splitDatabase; + TblColRef splitColRef = determineSplitColumn(); + splitTable = splitColRef.getTableRef().getTableName(); + splitColumn = splitColRef.getName(); + splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase(); + //using sqoop to extract data from jdbc source and dump them to hive - String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[]{partCol}); + String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol }); String hiveTable = flatDesc.getTableName(); - String connectionUrl = config.getJdbcConnectionUrl(); - String driverClass = config.getJdbcDriver(); - String jdbcUser = config.getJdbcUser(); - String jdbcPass = config.getJdbcPass(); + String connectionUrl = config.getJdbcSourceConnectionUrl(); + String driverClass = config.getJdbcSourceDriver(); + String jdbcUser = config.getJdbcSourceUser(); + String jdbcPass = config.getJdbcSourcePass(); String sqoopHome = config.getSqoopHome(); - String cmd= String.format(String.format("%s/sqoop import " - + "--connect %s --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" " - + "--target-dir %s/%s --split-by %s", sqoopHome, connectionUrl, driverClass, jdbcUser, - jdbcPass, selectSql, jobWorkingDir, hiveTable, partCol)); - logger.info(String.format("sqoop cmd:%s", cmd)); + String filedDelimiter = config.getFieldDelimiter(); + int mapperNum = config.getSqoopMapperNum(); + + String bquery = String.format("SELECT min(%s), max(%s) FROM %s.%s", splitColumn, splitColumn, splitDatabase, + splitTable); + if (partitionString != null) { + bquery += " WHERE " + partitionString; + } + + String cmd = String.format(String.format( + "%s/sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true " + + "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" " + + "--target-dir %s/%s --split-by %s.%s --boundary-query \"%s\" --null-string '' " + + "--fields-terminated-by '%s' --num-mappers %d", + sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable, + splitTable, splitColumn, bquery, filedDelimiter, mapperNum)); + logger.debug(String.format("sqoop cmd:%s", cmd)); CmdStep step = new CmdStep(); step.setCmd(cmd); step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); return step; } - + @Override protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) { // skip diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java index b8865d6..e2616b7 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java @@ -23,6 +23,7 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.source.IReadableTable.TableReader; import org.apache.kylin.source.hive.DBConnConf; @@ -30,23 +31,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * An implementation of TableReader with HCatalog for Hive table. + * An implementation of TableReader with JDBC. */ public class JdbcTableReader implements TableReader { private static final Logger logger = LoggerFactory.getLogger(JdbcTableReader.class); - + private String dbName; private String tableName; private DBConnConf dbconf; - private String dialect; private Connection jdbcCon; private Statement statement; private ResultSet rs; private int colCount; /** - * Constructor for reading whole hive table + * Constructor for reading whole jdbc table * @param dbName * @param tableName * @throws IOException @@ -55,22 +55,20 @@ public class JdbcTableReader implements TableReader { this.dbName = dbName; this.tableName = tableName; KylinConfig config = KylinConfig.getInstanceFromEnv(); - String connectionUrl = config.getJdbcConnectionUrl(); - String driverClass = config.getJdbcDriver(); - String jdbcUser = config.getJdbcUser(); - String jdbcPass = config.getJdbcPass(); + String connectionUrl = config.getJdbcSourceConnectionUrl(); + String driverClass = config.getJdbcSourceDriver(); + String jdbcUser = config.getJdbcSourceUser(); + String jdbcPass = config.getJdbcSourcePass(); dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); - this.dialect = config.getJdbcDialect(); jdbcCon = SqlUtil.getConnection(dbconf); String sql = String.format("select * from %s.%s", dbName, tableName); try { statement = jdbcCon.createStatement(); rs = statement.executeQuery(sql); colCount = rs.getMetaData().getColumnCount(); - }catch(SQLException e){ + } catch (SQLException e) { throw new IOException(String.format("error while exec %s", sql), e); } - } @Override @@ -85,11 +83,17 @@ public class JdbcTableReader implements TableReader { @Override public String[] getRow() { String[] ret = new String[colCount]; - for (int i=1; i<=colCount; i++){ + for (int i = 1; i <= colCount; i++) { try { Object o = rs.getObject(i); - ret[i-1] = (o == null? null:o.toString()); - }catch(Exception e){ + String result; + if (null == o || o instanceof byte[]) { + result = null; + } else { + result = o.toString(); + } + ret[i - 1] = result; + } catch (Exception e) { logger.error("", e); } } diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java index a112d87..79fab7d 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java @@ -21,87 +21,145 @@ package org.apache.kylin.source.jdbc; import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; +import java.sql.Types; import java.util.Random; -import javax.sql.DataSource; - -import org.slf4j.LoggerFactory; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.source.hive.DBConnConf; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SqlUtil { private static final Logger logger = LoggerFactory.getLogger(SqlUtil.class); - public static void closeResources(Connection con, Statement statement){ - try{ - if (statement!=null && !statement.isClosed()){ + public static void closeResources(Connection con, Statement statement) { + try { + if (statement != null && !statement.isClosed()) { statement.close(); } - }catch(Exception e){ + } catch (Exception e) { logger.error("", e); } - - try{ - if (con!=null && !con.isClosed()){ + + try { + if (con != null && !con.isClosed()) { con.close(); } - }catch(Exception e){ + } catch (Exception e) { logger.error("", e); } } - - - public static void execUpdateSQL(String sql, DataSource ds){ - Connection con = null; - try{ - con = ds.getConnection(); - execUpdateSQL(con, sql); - }catch(Exception e){ - logger.error("", e); - }finally{ - closeResources(con, null); - } - } - - public static void execUpdateSQL(Connection db, String sql){ - Statement statement=null; - try{ + + public static void execUpdateSQL(Connection db, String sql) { + Statement statement = null; + try { statement = db.createStatement(); - statement.executeUpdate(sql); - }catch(Exception e){ + statement.executeUpdate(sql); + } catch (Exception e) { logger.error("", e); - }finally{ + } finally { closeResources(null, statement); } } - - public static int tryTimes=10; - public static Connection getConnection(DBConnConf dbconf){ - if (dbconf.getUrl()==null) + + public static int tryTimes = 5; + + public static Connection getConnection(DBConnConf dbconf) { + if (dbconf.getUrl() == null) return null; Connection con = null; try { Class.forName(dbconf.getDriver()); - }catch(Exception e){ + } catch (Exception e) { logger.error("", e); } - boolean got=false; - int times=0; + boolean got = false; + int times = 0; Random r = new Random(); - while(!got && times listDatabases() throws SQLException { + List ret = new ArrayList<>(); + try (Connection con = SqlUtil.getConnection(dbconf); ResultSet rs = con.getMetaData().getSchemas()) { + while (rs.next()) { + String schema = rs.getString("TABLE_SCHEM"); + String catalog = rs.getString("TABLE_CATALOG"); + logger.info(String.format("%s,%s", schema, catalog)); + ret.add(schema); + } + } + return ret; + } + + @Override + public List listTables(String schema) throws SQLException { + List ret = new ArrayList<>(); + try (Connection con = SqlUtil.getConnection(dbconf); + ResultSet rs = con.getMetaData().getTables(null, schema, null, null)) { + while (rs.next()) { + String name = rs.getString("TABLE_NAME"); + ret.add(name); + } + } + return ret; + } + + @Override + public ResultSet getTable(final DatabaseMetaData dbmd, String schema, String table) throws SQLException { + return dbmd.getTables(null, schema, table, null); + } + + @Override + public ResultSet listColumns(final DatabaseMetaData dbmd, String schema, String table) throws SQLException { + return dbmd.getColumns(null, schema, table, null); + } +} diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java new file mode 100644 index 0000000..169fe60 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.jdbc.metadata; + +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +public interface IJdbcMetadata { + List listDatabases() throws SQLException; + + List listTables(String database) throws SQLException; + + ResultSet getTable(final DatabaseMetaData dbmd, String database, String table) throws SQLException; + + ResultSet listColumns(final DatabaseMetaData dbmd, String database, String table) throws SQLException; +} diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java new file mode 100644 index 0000000..4100f79 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.jdbc.metadata; + +import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.JdbcDialect; + +public abstract class JdbcMetadataFactory { + public static IJdbcMetadata getJdbcMetadata(String dialect, final DBConnConf dbConnConf) { + String jdbcDialect = (null == dialect) ? "" : dialect.toLowerCase(); + switch (jdbcDialect) { + case (JdbcDialect.DIALECT_MSSQL): + return new SQLServerJdbcMetadata(dbConnConf); + case (JdbcDialect.DIALECT_MYSQL): + return new MySQLJdbcMetadata(dbConnConf); + default: + return new DefaultJdbcMetadata(dbConnConf); + } + } +} diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java new file mode 100644 index 0000000..6404fd6 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.jdbc.metadata; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.SqlUtil; + +public class MySQLJdbcMetadata extends DefaultJdbcMetadata { + public MySQLJdbcMetadata(DBConnConf dbConnConf) { + super(dbConnConf); + } + + @Override + public List listDatabases() throws SQLException { + List ret = new ArrayList<>(); + try (Connection con = SqlUtil.getConnection(dbconf); ResultSet res = con.getMetaData().getCatalogs()) { + while (res.next()) { + ret.add(res.getString("TABLE_CAT")); + } + } + return ret; + } + + @Override + public List listTables(String catalog) throws SQLException { + List ret = new ArrayList<>(); + try (Connection con = SqlUtil.getConnection(dbconf); + ResultSet res = con.getMetaData().getTables(catalog, null, null, null)) { + String table; + while (res.next()) { + table = res.getString("TABLE_NAME"); + ret.add(table); + } + } + return ret; + } + + @Override + public ResultSet listColumns(final DatabaseMetaData dbmd, String catalog, String table) throws SQLException { + return dbmd.getColumns(catalog, null, table, null); + } + + @Override + public ResultSet getTable(final DatabaseMetaData dbmd, String catalog, String table) throws SQLException { + return dbmd.getTables(catalog, null, table, null); + } +} diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java new file mode 100644 index 0000000..1a34b37 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.jdbc.metadata; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.SqlUtil; + +import com.google.common.base.Preconditions; + +public class SQLServerJdbcMetadata extends DefaultJdbcMetadata { + public SQLServerJdbcMetadata(DBConnConf dbConnConf) { + super(dbConnConf); + } + + @Override + public List listDatabases() throws SQLException { + List ret = new ArrayList<>(); + try (Connection con = SqlUtil.getConnection(dbconf)) { + + String database = con.getCatalog(); + Preconditions.checkArgument(StringUtils.isNotEmpty(database), + "SQL Server needs a specific database in " + "connection string."); + + try (ResultSet rs = con.getMetaData().getSchemas(database, "%")) { + String schema; + String catalog; + while (rs.next()) { + schema = rs.getString("TABLE_SCHEM"); + catalog = rs.getString("TABLE_CATALOG"); + // Skip system schemas + if (database.equals(catalog)) { + ret.add(schema); + } + } + } + } + return ret; + } +} diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java new file mode 100644 index 0000000..b269329 --- /dev/null +++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.jdbc; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.metadata.DefaultJdbcMetadata; +import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata; +import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ JdbcMetadataFactory.class, SqlUtil.class }) + +public class JdbcExplorerTest extends LocalFileMetadataTestCase { + private JdbcExplorer jdbcExplorer; + private static Connection connection; + private static DatabaseMetaData dbmd; + private IJdbcMetadata jdbcMetadata; + + @BeforeClass + public static void setupClass() throws SQLException { + staticCreateTestMetadata(); + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + kylinConfig.setProperty("kylin.source.jdbc.connection-url", "jdbc:vertica://fakehost:1433/database"); + kylinConfig.setProperty("kylin.source.jdbc.driver", "com.vertica.jdbc.Driver"); + kylinConfig.setProperty("kylin.source.jdbc.user", "user"); + kylinConfig.setProperty("kylin.source.jdbc.pass", ""); + kylinConfig.setProperty("kylin.source.jdbc.dialect", "vertica"); + } + + @Before + public void setup() throws SQLException { + connection = mock(Connection.class); + dbmd = mock(DatabaseMetaData.class); + jdbcMetadata = mock(DefaultJdbcMetadata.class); + + PowerMockito.stub(PowerMockito.method(SqlUtil.class, "getConnection")).toReturn(connection); + PowerMockito.mockStatic(JdbcMetadataFactory.class); + + when(JdbcMetadataFactory.getJdbcMetadata(anyString(), any(DBConnConf.class))).thenReturn(jdbcMetadata); + when(connection.getMetaData()).thenReturn(dbmd); + + jdbcExplorer = spy(JdbcExplorer.class); + } + + @Test + public void testListDatabases() throws SQLException { + List databases = new ArrayList<>(); + databases.add("DB1"); + databases.add("DB2"); + when(jdbcMetadata.listDatabases()).thenReturn(databases); + + List result = jdbcExplorer.listDatabases(); + + verify(jdbcMetadata, times(1)).listDatabases(); + Assert.assertEquals(databases, result); + } + + @Test + public void testListTables() throws SQLException { + List tables = new ArrayList<>(); + tables.add("T1"); + tables.add("T2"); + String databaseName = "testDb"; + when(jdbcMetadata.listTables(databaseName)).thenReturn(tables); + + List result = jdbcExplorer.listTables(databaseName); + verify(jdbcMetadata, times(1)).listTables(databaseName); + Assert.assertEquals(tables, result); + } + + @Test + public void testLoadTableMetadata() throws SQLException { + String tableName = "tb1"; + String databaseName = "testdb"; + ResultSet rs1 = mock(ResultSet.class); + when(rs1.next()).thenReturn(true).thenReturn(false); + when(rs1.getString("TABLE_TYPE")).thenReturn("TABLE"); + + ResultSet rs2 = mock(ResultSet.class); + when(rs2.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false); + when(rs2.getString("COLUMN_NAME")).thenReturn("COL1").thenReturn("COL2").thenReturn("COL3"); + when(rs2.getInt("DATA_TYPE")).thenReturn(Types.VARCHAR).thenReturn(Types.INTEGER).thenReturn(Types.DECIMAL); + when(rs2.getInt("COLUMN_SIZE")).thenReturn(128).thenReturn(10).thenReturn(19); + when(rs2.getInt("DECIMAL_DIGITS")).thenReturn(0).thenReturn(0).thenReturn(4); + when(rs2.getInt("ORDINAL_POSITION")).thenReturn(1).thenReturn(3).thenReturn(2); + when(rs2.getString("REMARKS")).thenReturn("comment1").thenReturn("comment2").thenReturn("comment3"); + + when(jdbcMetadata.getTable(dbmd, databaseName, tableName)).thenReturn(rs1); + when(jdbcMetadata.listColumns(dbmd, databaseName, tableName)).thenReturn(rs2); + + Pair result = jdbcExplorer.loadTableMetadata(databaseName, tableName, "proj"); + TableDesc tableDesc = result.getFirst(); + ColumnDesc columnDesc = tableDesc.getColumns()[1]; + + Assert.assertEquals(databaseName.toUpperCase(), tableDesc.getDatabase()); + Assert.assertEquals(3, tableDesc.getColumnCount()); + Assert.assertEquals("TABLE", tableDesc.getTableType()); + Assert.assertEquals("COL2", columnDesc.getName()); + Assert.assertEquals("integer", columnDesc.getTypeName()); + Assert.assertEquals("comment2", columnDesc.getComment()); + Assert.assertEquals(databaseName.toUpperCase() + "." + tableName.toUpperCase(), + result.getSecond().getIdentity()); + } + + @AfterClass + public static void clenup() { + staticCleanupTestMetadata(); + } + +} diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java new file mode 100644 index 0000000..d952675 --- /dev/null +++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.jdbc; + +import java.sql.Types; + +import org.junit.Assert; +import org.junit.Test; + +public class SqlUtilTest { + + @Test + public void testJdbcTypetoKylinDataType() { + this.getClass().getClassLoader().toString(); + Assert.assertEquals("double", SqlUtil.jdbcTypetoKylinDataType(Types.FLOAT)); + Assert.assertEquals("varchar", SqlUtil.jdbcTypetoKylinDataType(Types.NVARCHAR)); + Assert.assertEquals("any", SqlUtil.jdbcTypetoKylinDataType(Types.ARRAY)); + } + + @Test + public void testIsPrecisionApplicable() { + Assert.assertFalse(SqlUtil.isPrecisionApplicable("boolean")); + Assert.assertTrue(SqlUtil.isPrecisionApplicable("varchar")); + } + + @Test + public void testIsScaleApplicable() { + Assert.assertFalse(SqlUtil.isScaleApplicable("varchar")); + Assert.assertTrue(SqlUtil.isScaleApplicable("decimal")); + } +} diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java new file mode 100644 index 0000000..43d467d --- /dev/null +++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.jdbc.metadata; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.SqlUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(SqlUtil.class) +public class DefaultJdbcMetadataTest { + protected DBConnConf dbConnConf; + protected Connection connection; + protected DatabaseMetaData dbmd; + protected IJdbcMetadata jdbcMetadata; + + @Before + public void setup() { + dbConnConf = new DBConnConf(); + dbConnConf.setUrl("jdbc:vertica://fakehost:1433/database"); + dbConnConf.setDriver("com.vertica.jdbc.Driver"); + dbConnConf.setUser("user"); + dbConnConf.setPass("pass"); + jdbcMetadata = new DefaultJdbcMetadata(dbConnConf); + + setupProperties(); + } + + protected void setupProperties() { + connection = mock(Connection.class); + dbmd = mock(DatabaseMetaData.class); + + PowerMockito.mockStatic(SqlUtil.class); + when(SqlUtil.getConnection(dbConnConf)).thenReturn(connection); + } + + @Test + public void testListDatabases() throws SQLException { + ResultSet rs = mock(ResultSet.class); + when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(false); + when(rs.getString("TABLE_SCHEM")).thenReturn("schema1").thenReturn("schema2"); + when(rs.getString("TABLE_CATALOG")).thenReturn("catalog1").thenReturn("catalog2"); + + when(connection.getMetaData()).thenReturn(dbmd); + when(dbmd.getSchemas()).thenReturn(rs); + + List dbs = jdbcMetadata.listDatabases(); + + Assert.assertEquals(2, dbs.size()); + Assert.assertEquals("schema1", dbs.get(0)); + } + + @Test + public void testListTables() throws SQLException { + ResultSet rs = mock(ResultSet.class); + when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false); + when(rs.getString("TABLE_NAME")).thenReturn("KYLIN_SALES").thenReturn("CAT_DT").thenReturn("KYLIN_CAT"); + + String schema = "testschema"; + when(connection.getMetaData()).thenReturn(dbmd); + when(dbmd.getTables(null, schema, null, null)).thenReturn(rs); + + List tables = jdbcMetadata.listTables(schema); + + Assert.assertEquals(3, tables.size()); + Assert.assertEquals("CAT_DT", tables.get(1)); + } + + @Test + public void testGetTable() throws SQLException { + String schema = "testSchema"; + String table = "testTable"; + ResultSet rs = mock(ResultSet.class); + when(dbmd.getTables(null, schema, table, null)).thenReturn(rs); + + ResultSet result = jdbcMetadata.getTable(dbmd, schema, table); + + verify(dbmd, times(1)).getTables(null, schema, table, null); + Assert.assertEquals(rs, result); + } + + @Test + public void testListColumns() throws SQLException { + String schema = "testSchema"; + String table = "testTable"; + ResultSet rs = mock(ResultSet.class); + when(dbmd.getColumns(null, schema, table, null)).thenReturn(rs); + + ResultSet result = jdbcMetadata.listColumns(dbmd, schema, table); + + verify(dbmd, times(1)).getColumns(null, schema, table, null); + Assert.assertEquals(rs, result); + } +} diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java new file mode 100644 index 0000000..d9c7425 --- /dev/null +++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.jdbc.metadata; + +import org.apache.kylin.source.jdbc.JdbcDialect; +import org.junit.Assert; +import org.junit.Test; + +public class JdbcMetadataFactoryTest { + + @Test + public void testGetJdbcMetadata() { + Assert.assertTrue( + JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MSSQL, null) instanceof SQLServerJdbcMetadata); + Assert.assertTrue( + JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MYSQL, null) instanceof MySQLJdbcMetadata); + Assert.assertTrue( + JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_VERTICA, null) instanceof DefaultJdbcMetadata); + } +} diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java new file mode 100644 index 0000000..d0cb6c4 --- /dev/null +++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.jdbc.metadata; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import org.apache.kylin.source.hive.DBConnConf; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class MySQLJdbcMetadataTest extends DefaultJdbcMetadataTest { + + @Before + public void setup() { + dbConnConf = new DBConnConf(); + dbConnConf.setUrl("jdbc:mysql://fakehost:1433/database"); + dbConnConf.setDriver("com.mysql.jdbc.Driver"); + dbConnConf.setUser("user"); + dbConnConf.setPass("pass"); + jdbcMetadata = new MySQLJdbcMetadata(dbConnConf); + + setupProperties(); + } + + @Test + public void testListDatabases() throws SQLException { + ResultSet rs = mock(ResultSet.class); + when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(false); + when(rs.getString("TABLE_CAT")).thenReturn("catalog1").thenReturn("catalog2"); + + when(connection.getMetaData()).thenReturn(dbmd); + when(dbmd.getCatalogs()).thenReturn(rs); + + List dbs = jdbcMetadata.listDatabases(); + + Assert.assertEquals(2, dbs.size()); + Assert.assertEquals("catalog1", dbs.get(0)); + } + + @Test + public void testListTables() throws SQLException { + ResultSet rs = mock(ResultSet.class); + when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false); + when(rs.getString("TABLE_NAME")).thenReturn("KYLIN_SALES").thenReturn("CAT_DT").thenReturn("KYLIN_CAT"); + + String catalog = "testCatalog"; + when(connection.getMetaData()).thenReturn(dbmd); + when(dbmd.getTables(catalog, null, null, null)).thenReturn(rs); + + List tables = jdbcMetadata.listTables(catalog); + + Assert.assertEquals(3, tables.size()); + Assert.assertEquals("CAT_DT", tables.get(1)); + } + + @Test + public void testGetTable() throws SQLException { + String catalog = "testSchema"; + String table = "testTable"; + ResultSet rs = mock(ResultSet.class); + when(dbmd.getTables(catalog, null, table, null)).thenReturn(rs); + + ResultSet result = jdbcMetadata.getTable(dbmd, catalog, table); + + verify(dbmd, times(1)).getTables(catalog, null, table, null); + Assert.assertEquals(rs, result); + } + + @Test + public void testListColumns() throws SQLException { + String catalog = "testSchema"; + String table = "testTable"; + ResultSet rs = mock(ResultSet.class); + when(dbmd.getColumns(catalog, null, table, null)).thenReturn(rs); + + ResultSet result = jdbcMetadata.listColumns(dbmd, catalog, table); + + verify(dbmd, times(1)).getColumns(catalog, null, table, null); + Assert.assertEquals(rs, result); + } +} diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadataTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadataTest.java new file mode 100644 index 0000000..a5516ab --- /dev/null +++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadataTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.jdbc.metadata; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import org.apache.kylin.source.hive.DBConnConf; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SQLServerJdbcMetadataTest extends DefaultJdbcMetadataTest { + + @Before + public void setup() { + dbConnConf = new DBConnConf(); + dbConnConf.setUrl("jdbc:sqlserver://fakehost:1433;database=testdb"); + dbConnConf.setDriver("com.microsoft.sqlserver.jdbc.SQLServerDriver"); + dbConnConf.setUser("user"); + dbConnConf.setPass("pass"); + jdbcMetadata = new SQLServerJdbcMetadata(dbConnConf); + + setupProperties(); + } + + @Test + public void testListDatabases() throws SQLException { + ResultSet rs = mock(ResultSet.class); + when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(false); + when(rs.getString("TABLE_SCHEM")).thenReturn("schema1").thenReturn("schema2"); + when(rs.getString("TABLE_CATALOG")).thenReturn("catalog1").thenReturn("testdb"); + + when(connection.getCatalog()).thenReturn("testdb"); + when(connection.getMetaData()).thenReturn(dbmd); + when(dbmd.getSchemas("testdb", "%")).thenReturn(rs); + + List dbs = jdbcMetadata.listDatabases(); + + Assert.assertEquals(1, dbs.size()); + Assert.assertEquals("schema2", dbs.get(0)); + } + + @Test(expected = IllegalArgumentException.class) + public void testListDatabasesWithoutSpecificDB() throws SQLException { + when(connection.getCatalog()).thenReturn(""); + jdbcMetadata.listDatabases(); + } +} -- 2.9.3 (Apple Git-75)