diff --git common/src/java/org/apache/hadoop/hive/conf/Constants.java common/src/java/org/apache/hadoop/hive/conf/Constants.java index ff9eb59..3d79eec 100644 --- common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -48,6 +48,7 @@ public static final String DRUID_SEGMENT_VERSION = "druid.segment.version"; public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory"; + public static final String KAFKA_TOPIC = "kafka.topic"; public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; @@ -55,6 +56,11 @@ /* Kafka Ingestion state - valid values - START/STOP/RESET */ public static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion"; + public static final String HIVE_JDBC_QUERY = "hive.sql.generated.query"; + public static final String JDBC_QUERY = "hive.sql.query"; + public static final String JDBC_HIVE_STORAGE_HANDLER_ID = + "org.apache.hive.storage.jdbc.JdbcStorageHandler"; + public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD"; public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD"; public static final String HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG = "hadoop.security.credential.provider.path"; diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java index 6def148..caa823f 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java @@ -66,7 +66,7 @@ dbAccessor = DatabaseAccessorFactory.getAccessor(job); } - int numRecords = dbAccessor.getTotalNumberOfRecords(job); + int numRecords = numSplits <=1 ? Integer.MAX_VALUE : dbAccessor.getTotalNumberOfRecords(job); if (numRecords < numSplits) { numSplits = numRecords; diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java index 88b2f0a..1da6213 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java @@ -43,7 +43,7 @@ public JdbcRecordReader(JobConf conf, JdbcInputSplit split) { - LOGGER.debug("Initializing JdbcRecordReader"); + LOGGER.trace("Initializing JdbcRecordReader"); this.split = split; this.conf = conf; } @@ -52,14 +52,14 @@ public JdbcRecordReader(JobConf conf, JdbcInputSplit split) { @Override public boolean next(LongWritable key, MapWritable value) throws IOException { try { - LOGGER.debug("JdbcRecordReader.next called"); + LOGGER.trace("JdbcRecordReader.next called"); if (dbAccessor == null) { dbAccessor = DatabaseAccessorFactory.getAccessor(conf); iterator = dbAccessor.getRecordIterator(conf, split.getLimit(), split.getOffset()); } if (iterator.hasNext()) { - LOGGER.debug("JdbcRecordReader has more records to read."); + LOGGER.trace("JdbcRecordReader has more records to read."); key.set(pos); pos++; Map record = iterator.next(); diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java index 3764c8c..eac03d2 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java @@ -15,6 +15,7 @@ package org.apache.hive.storage.jdbc; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -63,25 +64,35 @@ @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { try { - LOGGER.debug("Initializing the SerDe"); + LOGGER.trace("Initializing the SerDe"); if (tbl.containsKey(JdbcStorageConfig.DATABASE_TYPE.getPropertyName())) { + final boolean hiveQueryExecution = tbl.containsKey(Constants.HIVE_JDBC_QUERY); + Configuration tableConfig = JdbcStorageConfigManager.convertPropertiesToConfiguration(tbl); DatabaseAccessor dbAccessor = DatabaseAccessorFactory.getAccessor(tableConfig); columnNames = dbAccessor.getColumnNames(tableConfig); numColumns = columnNames.size(); - - String[] hiveColumnNameArray = parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMNS), ","); - if (numColumns != hiveColumnNameArray.length) { - throw new SerDeException("Expected " + numColumns + " columns. Table definition has " - + hiveColumnNameArray.length + " columns"); - } - List hiveColumnNames = Arrays.asList(hiveColumnNameArray); - - hiveColumnTypeArray = parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES), ":"); - if (hiveColumnTypeArray.length == 0) { - throw new SerDeException("Received an empty Hive column type definition"); + List hiveColumnNames; + if (hiveQueryExecution) { + hiveColumnNames = columnNames; + final List columnTypes = dbAccessor.getColumnTypes(tableConfig); + hiveColumnTypeArray = new String[columnTypes.size()]; + hiveColumnTypeArray = columnTypes.toArray(hiveColumnTypeArray); + } else { + + String[] hiveColumnNameArray = parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMNS), ","); + if (numColumns != hiveColumnNameArray.length) { + throw new SerDeException("Expected " + numColumns + " columns. Table definition has " + + hiveColumnNameArray.length + " columns"); + } + hiveColumnNames = Arrays.asList(hiveColumnNameArray); + + hiveColumnTypeArray = parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES), ":"); + if (hiveColumnTypeArray.length == 0) { + throw new SerDeException("Received an empty Hive column type definition"); + } } List fieldInspectors = new ArrayList(numColumns); @@ -115,7 +126,7 @@ public void initialize(Configuration conf, Properties tbl) throws SerDeException @Override public Object deserialize(Writable blob) throws SerDeException { - LOGGER.debug("Deserializing from SerDe"); + LOGGER.trace("Deserializing from SerDe"); if (!(blob instanceof MapWritable)) { throw new SerDeException("Expected MapWritable. Got " + blob.getClass().getName()); } diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java index 4b03285..df55272 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java @@ -15,6 +15,7 @@ package org.apache.hive.storage.jdbc; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; @@ -124,4 +125,9 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { } + @Override + public String toString() { + return Constants.JDBC_HIVE_STORAGE_HANDLER_ID; + } + } diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java index c4e97ba..b8b770f 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java @@ -21,5 +21,6 @@ ORACLE, POSTGRES, MSSQL, - METASTORE + METASTORE, + JETHRO_DATA } diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java index ff6357d..1ccbe08 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java @@ -18,7 +18,8 @@ DATABASE_TYPE("database.type", true), JDBC_URL("jdbc.url", true), JDBC_DRIVER_CLASS("jdbc.driver", true), - QUERY("query", true), + QUERY("query", false), + TABLE("table", false), JDBC_FETCH_SIZE("jdbc.fetch.size", false), COLUMN_MAPPING("column.mapping", false); diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java index 350b0c6..55fc0ea 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java @@ -44,13 +44,12 @@ public static final String CONFIG_USERNAME = CONFIG_PREFIX + ".dbcp.username"; private static final EnumSet DEFAULT_REQUIRED_PROPERTIES = EnumSet.of(JdbcStorageConfig.DATABASE_TYPE, - JdbcStorageConfig.JDBC_URL, - JdbcStorageConfig.JDBC_DRIVER_CLASS, - JdbcStorageConfig.QUERY); + JdbcStorageConfig.JDBC_URL, + JdbcStorageConfig.JDBC_DRIVER_CLASS); private static final EnumSet METASTORE_REQUIRED_PROPERTIES = EnumSet.of(JdbcStorageConfig.DATABASE_TYPE, - JdbcStorageConfig.QUERY); + JdbcStorageConfig.QUERY); private JdbcStorageConfigManager() { } @@ -120,6 +119,12 @@ public static String getConfigValue(JdbcStorageConfig key, Configuration config) public static String getQueryToExecute(Configuration config) { String query = config.get(JdbcStorageConfig.QUERY.getPropertyName()); + + if (query == null) { + String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName()); + query = "select * from " + tableName; + } + String hiveFilterCondition = QueryConditionBuilder.getInstance().buildCondition(config); if ((hiveFilterCondition != null) && (!hiveFilterCondition.trim().isEmpty())) { query = query + " WHERE " + hiveFilterCondition; diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java index f50d53e..fdaa794 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java @@ -24,10 +24,10 @@ List getColumnNames(Configuration conf) throws HiveJdbcDatabaseAccessException; + List getColumnTypes(Configuration conf) throws HiveJdbcDatabaseAccessException; int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAccessException; - JdbcRecordIterator getRecordIterator(Configuration conf, int limit, int offset) throws HiveJdbcDatabaseAccessException; diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java index 7dc690f..6d3c8d9 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java @@ -35,6 +35,9 @@ public static DatabaseAccessor getAccessor(DatabaseType dbType) { case MYSQL: accessor = new MySqlDatabaseAccessor(); break; + case JETHRO_DATA: + accessor = new JethroDatabaseAccessor(); + break; default: accessor = new GenericJdbcDatabaseAccessor(); diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java index 178c97d..772bc5d 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java @@ -16,6 +16,7 @@ import org.apache.commons.dbcp.BasicDataSourceFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -34,6 +35,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Types; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -64,8 +66,7 @@ public GenericJdbcDatabaseAccessor() { try { initializeDatabaseConnection(conf); - String sql = JdbcStorageConfigManager.getQueryToExecute(conf); - String metadataQuery = addLimitToQuery(sql, 1); + String metadataQuery = getMetaDataQuery(conf); LOGGER.debug("Query to execute is [{}]", metadataQuery); conn = dbcpDataSource.getConnection(); @@ -92,6 +93,75 @@ public GenericJdbcDatabaseAccessor() { } + protected String getMetaDataQuery(Configuration conf) { + String sql = JdbcStorageConfigManager.getQueryToExecute(conf); + String metadataQuery = addLimitToQuery(sql, 1); + return metadataQuery; + } + + @Override + public List getColumnTypes(Configuration conf) throws HiveJdbcDatabaseAccessException { + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + + try { + initializeDatabaseConnection(conf); + String metadataQuery = getMetaDataQuery(conf); + LOGGER.debug("Query to execute is [{}]", metadataQuery); + + conn = dbcpDataSource.getConnection(); + ps = conn.prepareStatement(metadataQuery); + rs = ps.executeQuery(); + + ResultSetMetaData metadata = rs.getMetaData(); + int numColumns = metadata.getColumnCount(); + List columnTypes = new ArrayList(numColumns); + for (int i = 0; i < numColumns; i++) { + switch (metadata.getColumnType(i + 1)) { + case Types.CHAR: + columnTypes.add(serdeConstants.STRING_TYPE_NAME); + break; + case Types.INTEGER: + columnTypes.add(serdeConstants.INT_TYPE_NAME); + break; + case Types.BIGINT: + columnTypes.add(serdeConstants.BIGINT_TYPE_NAME); + break; + case Types.DECIMAL: + columnTypes.add(serdeConstants.DECIMAL_TYPE_NAME); + break; + case Types.FLOAT: + case Types.REAL: + columnTypes.add(serdeConstants.FLOAT_TYPE_NAME); + break; + case Types.DOUBLE: + columnTypes.add(serdeConstants.DOUBLE_TYPE_NAME); + break; + case Types.DATE: + columnTypes.add(serdeConstants.DATE_TYPE_NAME); + break; + case Types.TIMESTAMP: + columnTypes.add(serdeConstants.TIMESTAMP_TYPE_NAME); + break; + + default: + columnTypes.add(metadata.getColumnTypeName(i+1)); + break; + } + } + + return columnTypes; + } catch (Exception e) { + LOGGER.error("Error while trying to get column names.", e); + throw new HiveJdbcDatabaseAccessException("Error while trying to get column names: " + e.getMessage(), e); + } finally { + cleanupResources(conn, ps, rs); + } + + } + + @Override public int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAccessException { Connection conn = null; diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java new file mode 100644 index 0000000..55a5dde --- /dev/null +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java @@ -0,0 +1,47 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.dao; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager; + +/** + * JethroData specific data accessor. This is needed because JethroData JDBC drivers do + * not support generic LIMIT and OFFSET escape functions, and has some special optimization + * for getting the query metadata using limit 0. + */ + +public class JethroDatabaseAccessor extends GenericJdbcDatabaseAccessor { + + @Override + protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { + if (offset == 0) { + return addLimitToQuery(sql, limit); + } else { + return sql + " LIMIT " + offset + "," + limit; + } + } + + @Override + protected String addLimitToQuery(String sql, int limit) { + return "Select * from (" + sql + ") as \"tmp\" limit " + limit; + } + + @Override + protected String getMetaDataQuery(Configuration conf) { + String sql = JdbcStorageConfigManager.getQueryToExecute(conf); + return addLimitToQuery(sql, 0); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index c8cb8a4..80e3dc9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -5356,6 +5356,7 @@ public static boolean doesTableNeedLocation(Table tbl) { String sh = tbl.getStorageHandler().toString(); retval = !sh.equals("org.apache.hadoop.hive.hbase.HBaseStorageHandler") && !sh.equals(Constants.DRUID_HIVE_STORAGE_HANDLER_ID) + && !sh.equals(Constants.JDBC_HIVE_STORAGE_HANDLER_ID) && !sh.equals("org.apache.hadoop.hive.accumulo.AccumuloStorageHandler"); } return retval; diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java index 6e585e5..960ad76 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java @@ -419,18 +419,26 @@ private static RelNode parseQuery(HiveConf conf, String viewQuery) { } private static TableType obtainTableType(Table tabMetaData) { - if (tabMetaData.getStorageHandler() != null && - tabMetaData.getStorageHandler().toString().equals( - Constants.DRUID_HIVE_STORAGE_HANDLER_ID)) { - return TableType.DRUID; + if (tabMetaData.getStorageHandler() != null) { + final String storageHandlerStr = tabMetaData.getStorageHandler().toString(); + if (storageHandlerStr.equals(Constants.DRUID_HIVE_STORAGE_HANDLER_ID)) { + return TableType.DRUID; + } + + if (storageHandlerStr.equals(Constants.JDBC_HIVE_STORAGE_HANDLER_ID)) { + return TableType.JDBC; + } + } + return TableType.NATIVE; } //@TODO this seems to be the same as org.apache.hadoop.hive.ql.parse.CalcitePlanner.TableType.DRUID do we really need both private enum TableType { DRUID, - NATIVE + NATIVE, + JDBC } private enum OpType { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java index c5c17de..615f30d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java @@ -29,6 +29,7 @@ import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlSplittableAggFunction; +import org.apache.calcite.sql.SqlSyntax; import org.apache.calcite.sql.SqlSplittableAggFunction.CountSplitter; import org.apache.calcite.sql.SqlSplittableAggFunction.Registry; import org.apache.calcite.sql.fun.SqlStdOperatorTable; @@ -68,6 +69,11 @@ public boolean isDistinct() { } @Override + public SqlSyntax getSyntax() { + return SqlSyntax.FUNCTION_STAR; + } + + @Override public T unwrap(Class clazz) { if (clazz == SqlSplittableAggFunction.class) { return clazz.cast(new HiveCountSplitter()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/HiveJdbcConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/HiveJdbcConverter.java new file mode 100644 index 0000000..e69dd38 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/HiveJdbcConverter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc; + +import java.util.List; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.adapter.jdbc.JdbcConvention; +import org.apache.calcite.adapter.jdbc.JdbcImplementor; +import org.apache.calcite.adapter.jdbc.JdbcRel; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelVisitor; +import org.apache.calcite.rel.convert.ConverterImpl; +import org.apache.calcite.sql.SqlDialect; + +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode; + +/** + * This is a designated RelNode that splits the Hive operators and the Jdbc operators, + * every successor of this node will be Jdbc operator. + */ +public class HiveJdbcConverter extends ConverterImpl implements HiveRelNode { + + private final JdbcConvention convention; + + public HiveJdbcConverter(RelOptCluster cluster, RelTraitSet traits, + JdbcRel input, JdbcConvention jc) { + super(cluster, ConventionTraitDef.INSTANCE, traits, input); + convention = jc; + } + + private HiveJdbcConverter(RelOptCluster cluster, RelTraitSet traits, + RelNode input, JdbcConvention jc) { + super(cluster, ConventionTraitDef.INSTANCE, traits, input); + convention = jc; + } + + public JdbcConvention getJdbcConvention() { + return convention; + } + + public SqlDialect getJdbcDialect() { + return convention.dialect; + } + + @Override + public void implement(Implementor implementor) { + + } + + @Override + public RelNode copy( + RelTraitSet traitSet, + List inputs) { + return new HiveJdbcConverter(getCluster(), traitSet, sole(inputs), convention); + } + + public String generateSql() { + SqlDialect dialect = getJdbcDialect(); + final JdbcImplementor jdbcImplementor = + new JdbcImplementor(dialect, + (JavaTypeFactory) getCluster().getTypeFactory()); + final JdbcImplementor.Result result = + jdbcImplementor.visitChild(0, getInput()); + return result.asStatement().toSqlString(dialect).getSql(); + } + + public JdbcHiveTableScan getTableScan() { + final JdbcHiveTableScan[] tmpJdbcHiveTableScan = new JdbcHiveTableScan[1]; + new RelVisitor() { + + public void visit( + RelNode node, + int ordinal, + RelNode parent) { + if (node instanceof JdbcHiveTableScan && tmpJdbcHiveTableScan [0] == null) { + tmpJdbcHiveTableScan [0] = (JdbcHiveTableScan) node; + } else { + super.visit(node, ordinal, parent); + } + } + }.go(this); + + JdbcHiveTableScan jdbcHiveTableScan = tmpJdbcHiveTableScan [0]; + + assert jdbcHiveTableScan != null; + return jdbcHiveTableScan; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/JdbcHiveTableScan.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/JdbcHiveTableScan.java new file mode 100644 index 0000000..5b9a635 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/JdbcHiveTableScan.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc; + +import java.util.List; + +import org.apache.calcite.adapter.jdbc.JdbcConvention; +import org.apache.calcite.adapter.jdbc.JdbcTable; +import org.apache.calcite.adapter.jdbc.JdbcTableScan; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; + +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +/** + * Relational expression representing a scan of a HiveDB collection. + * + *

+ * Additional operations might be applied, using the "find" or "aggregate" methods. + *

+ */ +public class JdbcHiveTableScan extends JdbcTableScan { + + private final HiveTableScan hiveTableScan; + + public JdbcHiveTableScan(RelOptCluster cluster, RelOptTable table, JdbcTable jdbcTable, + JdbcConvention jdbcConvention, HiveTableScan hiveTableScan) { + super(cluster, table, jdbcTable, jdbcConvention); + this.hiveTableScan= hiveTableScan; + } + + @Override public RelNode copy(RelTraitSet traitSet, List inputs) { + assert inputs.isEmpty(); + return new JdbcHiveTableScan( + getCluster(), table, jdbcTable, (JdbcConvention) getConvention(), this.hiveTableScan); + } + + public HiveTableScan getHiveTableScan() { + return hiveTableScan; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelColumnsAlignment.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelColumnsAlignment.java index bccbde5..1d89ddd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelColumnsAlignment.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelColumnsAlignment.java @@ -46,6 +46,7 @@ import org.apache.calcite.util.ReflectUtil; import org.apache.calcite.util.ReflectiveVisitor; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; import com.google.common.collect.ImmutableList; @@ -89,6 +90,9 @@ public RelNode align(RelNode root) { } protected final RelNode dispatchAlign(RelNode node, List collations) { + if (node instanceof HiveJdbcConverter) { + return node; + } return alignDispatcher.invoke(node, collations); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAbstractSplitFilterRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAbstractSplitFilterRule.java new file mode 100644 index 0000000..20c0bfb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAbstractSplitFilterRule.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.ArrayList; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * JDBCAbstractSplitFilterRule split a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter} into + * two {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter} operators where the lower operator + * could be pushed down below the {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJdbcConverter}} + * operator and therefore could be sent to the external table. + */ + +public abstract class JDBCAbstractSplitFilterRule extends RelOptRule { + private static final Logger LOGGER = LoggerFactory.getLogger(JDBCAbstractSplitFilterRule.class); + + public static final JDBCAbstractSplitFilterRule SPLIT_FILTER_ABOVE_JOIN = new JDBCSplitFilterAboveJoinRule(); + public static final JDBCAbstractSplitFilterRule SPLIT_FILTER_ABOVE_CONVERTER = new JDBCSplitFilterRule(); + + /** + * FilterSupportedFunctionsVisitor traverse all of the Rex call and splits them into + * two lists, one with supported jdbc calls, and one with not supported jdbc calls + */ + public static class FilterSupportedFunctionsVisitor extends RexVisitorImpl { + + private final SqlDialect dialect; + + public FilterSupportedFunctionsVisitor (SqlDialect dialect) { + super (true); + this.dialect = dialect; + } + + private final ArrayList validJdbcNode = new ArrayList (); + private final ArrayList invalidJdbcNode = new ArrayList (); + + public ArrayList getValidJdbcNode() { + return validJdbcNode; + } + + public ArrayList getInvalidJdbcNode() { + return invalidJdbcNode; + } + + @Override + public Void visitCall(RexCall call) { + if (call.getKind() == SqlKind.AND) { + return super.visitCall(call); + } else { + boolean isValidCall = JDBCRexCallValidator.isValidJdbcOperation(call, dialect); + if (isValidCall) { + validJdbcNode.add(call); + } else { + invalidJdbcNode.add(call); + } + } + return null; + } + + public boolean canBeSplit() { + return !validJdbcNode.isEmpty() && !invalidJdbcNode.isEmpty(); + } + } + + protected JDBCAbstractSplitFilterRule(RelOptRuleOperand operand) { + super (operand); + } + + public static boolean canSplitFilter(RexNode cond, SqlDialect dialect) { + FilterSupportedFunctionsVisitor visitor = new FilterSupportedFunctionsVisitor(dialect); + cond.accept(visitor); + return visitor.canBeSplit(); + } + + public boolean matches(RelOptRuleCall call, SqlDialect dialect) { + LOGGER.debug("MySplitFilter.matches has been called"); + + final HiveFilter filter = call.rel(0); + + RexNode cond = filter.getCondition (); + + return canSplitFilter(cond, dialect); + } + + public void onMatch(RelOptRuleCall call, SqlDialect dialect) { + LOGGER.debug("MySplitFilter.onMatch has been called"); + + final HiveFilter filter = call.rel(0); + + RexCall callExpression = (RexCall) filter.getCondition (); + + FilterSupportedFunctionsVisitor visitor = new FilterSupportedFunctionsVisitor(dialect); + callExpression.accept(visitor); + + ArrayList validJdbcNode = visitor.getValidJdbcNode(); + ArrayList invalidJdbcNode = visitor.getInvalidJdbcNode(); + + assert validJdbcNode.size() != 0 && invalidJdbcNode.size() != 0; + + final RexBuilder rexBuilder = filter.getCluster().getRexBuilder(); + + RexNode validCondition; + if (validJdbcNode.size() == 1) { + validCondition = validJdbcNode.get(0); + } else { + validCondition = rexBuilder.makeCall(SqlStdOperatorTable.AND, validJdbcNode); + } + + HiveFilter newJdbcValidFilter = new HiveFilter(filter.getCluster(), filter.getTraitSet(), filter.getInput(), validCondition); + + RexNode invalidCondition; + if (invalidJdbcNode.size() == 1) { + invalidCondition = invalidJdbcNode.get(0); + } else { + invalidCondition = rexBuilder.makeCall(SqlStdOperatorTable.AND, invalidJdbcNode); + } + + HiveFilter newJdbcInvalidFilter = new HiveFilter(filter.getCluster(), filter.getTraitSet(), + newJdbcValidFilter, invalidCondition); + + call.transformTo(newJdbcInvalidFilter); + } + + /** + * JDBCSplitFilterAboveJoinRule split splitter above a HiveJoin operator, so we could push it into the HiveJoin + */ + public static class JDBCSplitFilterAboveJoinRule extends JDBCAbstractSplitFilterRule { + public JDBCSplitFilterAboveJoinRule() { + super(operand(HiveFilter.class, + operand(HiveJoin.class, + operand(HiveJdbcConverter.class, any())))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + LOGGER.debug("MyUpperJoinFilterFilter.matches has been called"); + + final HiveJoin join = call.rel(1); + final HiveJdbcConverter conv = call.rel(2); + + RexNode joinCond = join.getCondition (); + + return super.matches(call) && JDBCRexCallValidator.isValidJdbcOperation(joinCond, conv.getJdbcDialect()); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final HiveJdbcConverter conv = call.rel(0); + super.onMatch(call, conv.getJdbcDialect()); + } + } + + /** + * JDBCSplitFilterRule splits a HiveFilter rule so we could push part of the HiveFilter into the jdbc + */ + public static class JDBCSplitFilterRule extends JDBCAbstractSplitFilterRule { + public JDBCSplitFilterRule() { + super(operand(HiveFilter.class, + operand(HiveJdbcConverter.class, any()))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final HiveJdbcConverter conv = call.rel(1); + return super.matches(call, conv.getJdbcDialect()); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final HiveJdbcConverter conv = call.rel(1); + super.onMatch(call, conv.getJdbcDialect()); + } + } + +}; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAggregationPushDownRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAggregationPushDownRule.java new file mode 100644 index 0000000..8f96288 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAggregationPushDownRule.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.Arrays; + +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcAggregate; +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcAggregateRule; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlKind; +import org.apache.hadoop.hive.ql.optimizer.calcite.functions.HiveSqlCountAggFunction; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JDBCAggregationPushDownRule convert a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate} + * into a {@link org.apache.calcite.adapter.jdbc.JdbcRules.JdbcAggregateRule.JdbcAggregate} + * and pushes it down below the {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter} + * operator so it will be sent to the external table. + */ + +public class JDBCAggregationPushDownRule extends RelOptRule { + private static final Logger LOG = LoggerFactory.getLogger(JDBCAggregationPushDownRule.class); + + public static final JDBCAggregationPushDownRule INSTANCE = new JDBCAggregationPushDownRule(); + + public JDBCAggregationPushDownRule() { + super(operand(HiveAggregate.class, + operand(HiveJdbcConverter.class, any()))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final HiveAggregate agg = call.rel(0); + final HiveJdbcConverter converter = call.rel(1); + + for (AggregateCall relOptRuleOperand : agg.getAggCallList()) { + SqlAggFunction f = relOptRuleOperand.getAggregation(); + if (f instanceof HiveSqlCountAggFunction) { + //count distinct with more that one argument is not supported + HiveSqlCountAggFunction countAgg = (HiveSqlCountAggFunction)f; + if (countAgg.isDistinct() && 1 < relOptRuleOperand.getArgList().size()) { + return false; + } + } + SqlKind kind = f.getKind(); + if (!converter.getJdbcDialect().supportsAggregateFunction(kind)) { + return false; + } + } + return true; + } + + @Override + public void onMatch(RelOptRuleCall call) { + LOG.debug("MyAggregationPushDownRule.onMatch has been called"); + + final HiveAggregate agg = call.rel(0); + final HiveJdbcConverter converter = call.rel(1); + + Aggregate newHiveAggregate = agg.copy(agg.getTraitSet(), converter.getInput(), + agg.getIndicatorCount() !=0, agg.getGroupSet(), agg.getGroupSets(), agg.getAggCallList()); + JdbcAggregate newJdbcAggregate = + (JdbcAggregate) new JdbcAggregateRule(converter.getJdbcConvention()).convert(newHiveAggregate); + if (newJdbcAggregate != null) { + RelNode converterRes = converter.copy(converter.getTraitSet(), Arrays.asList(newJdbcAggregate)); + + call.transformTo(converterRes); + } + } + +}; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCExtractJoinFilterRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCExtractJoinFilterRule.java new file mode 100644 index 0000000..bdd7193 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCExtractJoinFilterRule.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.rules.AbstractJoinExtractFilterRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; + +/** + * JDBCExtractJoinFilterRule extracts out the + * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter} + * from a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin} operator. + * if the HiveFilter could be replaced by two HiveFilter operators that one of them could be pushed down below the + * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter} + */ + + +public final class JDBCExtractJoinFilterRule extends AbstractJoinExtractFilterRule { + //~ Static fields/initializers --------------------------------------------- + public static final JDBCExtractJoinFilterRule INSTANCE = new JDBCExtractJoinFilterRule(); + + //~ Constructors ----------------------------------------------------------- + + /** + * Creates an JoinExtractFilterRule. + */ + public JDBCExtractJoinFilterRule() { + super(operand(HiveJoin.class, + operand(HiveJdbcConverter.class, any()), + operand(HiveJdbcConverter.class, any())), + HiveRelFactories.HIVE_BUILDER, null); + } + + //~ Methods ---------------------------------------------------------------- + + @Override + public boolean matches(RelOptRuleCall call) { + final Join join = call.rel(0); + final HiveJdbcConverter conv1 = call.rel(1); + final HiveJdbcConverter conv2 = call.rel(2); + if (!conv1.getJdbcDialect().equals(conv2.getJdbcDialect())) { + return false; + } + return JDBCAbstractSplitFilterRule.canSplitFilter(join.getCondition(), conv1.getJdbcDialect()); + } + +} + +// End JoinExtractFilterRule.java diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterJoinRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterJoinRule.java new file mode 100644 index 0000000..a8e6a6d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterJoinRule.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterJoinRule; + +/** + * Rule that tries to push filter expressions into a join condition and into + * the inputs of the join. + */ + +public class JDBCFilterJoinRule extends HiveFilterJoinRule { + + public static final JDBCFilterJoinRule INSTANCE = new JDBCFilterJoinRule(); + + public JDBCFilterJoinRule() { + super(RelOptRule.operand(HiveFilter.class, + RelOptRule.operand(HiveJoin.class, + RelOptRule.operand(HiveJdbcConverter.class, RelOptRule.any()), + RelOptRule.operand(HiveJdbcConverter.class, RelOptRule.any()))), + "JDBCFilterJoinRule", true, HiveRelFactories.HIVE_BUILDER); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Filter filter = call.rel(0); + Join join = call.rel(1); + HiveJdbcConverter conv1 = call.rel(2); + HiveJdbcConverter conv2 = call.rel(3); + + if (!conv1.getJdbcDialect().equals(conv2.getJdbcDialect())) { + return false; + } + + boolean visitorRes = JDBCRexCallValidator.isValidJdbcOperation(filter.getCondition(), conv1.getJdbcDialect()); + if (visitorRes) { + return JDBCRexCallValidator.isValidJdbcOperation(join.getCondition(), conv1.getJdbcDialect()); + } + return false; + } + + @Override + public void onMatch(RelOptRuleCall call) { + Filter filter = call.rel(0); + Join join = call.rel(1); + super.perform(call, filter, join); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterPushDownRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterPushDownRule.java new file mode 100644 index 0000000..26c8b60 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterPushDownRule.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.Arrays; + +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcFilter; +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcFilterRule; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JDBCExtractJoinFilterRule extracts out the + * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter} + * from a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin} operator. + * if the HiveFilter could be replaced by two HiveFilter operators that one of them could be pushed down below the + * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter} + */ + +public class JDBCFilterPushDownRule extends RelOptRule { + private static final Logger LOG = LoggerFactory.getLogger(JDBCFilterPushDownRule.class); + + public static final JDBCFilterPushDownRule INSTANCE = new JDBCFilterPushDownRule(); + + public JDBCFilterPushDownRule() { + super(operand(HiveFilter.class, + operand(HiveJdbcConverter.class, any()))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final HiveFilter filter = call.rel(0); + final HiveJdbcConverter converter = call.rel(1); + + RexNode cond = filter.getCondition(); + + return JDBCRexCallValidator.isValidJdbcOperation(cond, converter.getJdbcDialect()); + } + + @Override + public void onMatch(RelOptRuleCall call) { + LOG.debug("JDBCFilterPushDown has been called"); + + final HiveFilter filter = call.rel(0); + final HiveJdbcConverter converter = call.rel(1); + + Filter newHiveFilter = filter.copy(filter.getTraitSet(), converter.getInput(), filter.getCondition()); + JdbcFilter newJdbcFilter = (JdbcFilter) new JdbcFilterRule(converter.getJdbcConvention()).convert(newHiveFilter); + if (newJdbcFilter != null) { + RelNode converterRes = converter.copy(converter.getTraitSet(), Arrays.asList(newJdbcFilter)); + + call.transformTo(converterRes); + } + } + +}; \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCJoinPushDownRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCJoinPushDownRule.java new file mode 100644 index 0000000..1a9a516 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCJoinPushDownRule.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.Arrays; + +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcJoin; +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcJoinRule; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JDBCJoinPushDownRule convert a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin} + * into a {@link org.apache.calcite.adapter.jdbc.JdbcRules.JdbcJoin} + * and pushes it down below the {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter}} + * operator so it will be sent to the external table. + */ + +public class JDBCJoinPushDownRule extends RelOptRule { + private static final Logger LOG = LoggerFactory.getLogger(JDBCJoinPushDownRule.class); + + public static final JDBCJoinPushDownRule INSTANCE = new JDBCJoinPushDownRule(); + + public JDBCJoinPushDownRule() { + super(operand(HiveJoin.class, + operand(HiveJdbcConverter.class, any()), + operand(HiveJdbcConverter.class, any()))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final HiveJoin join = call.rel(0); + final RexNode cond = join.getCondition(); + final HiveJdbcConverter converter1 = call.rel(1); + final HiveJdbcConverter converter2 = call.rel(2); + + //The actual check should be the compare of the connection string of the external tables + /*if (converter1.getJdbcConvention().equals(converter2.getJdbcConvention()) == false) { + return false; + }*/ + + if (converter1.getJdbcConvention().getName().equals(converter2.getJdbcConvention().getName()) == false) { + return false; + } + + if (cond.isAlwaysTrue()) { + //We don't want to push cross join + return false; + } + + boolean visitorRes = JDBCRexCallValidator.isValidJdbcOperation(cond, converter1.getJdbcDialect()); + return visitorRes; + } + + @Override + public void onMatch(RelOptRuleCall call) { + LOG.debug("JDBCJoinPushDownRule has been called"); + + final HiveJoin join = call.rel(0); + final HiveJdbcConverter converter1 = call.rel(1); + final HiveJdbcConverter converter2 = call.rel(2); + + RelNode input1 = converter1.getInput(); + RelNode input2 = converter2.getInput(); + + HiveJoin newHiveJoin = join.copy(join.getTraitSet(), join.getCondition(), input1, input2, join.getJoinType(), + join.isSemiJoinDone()); + JdbcJoin newJdbcJoin = (JdbcJoin) new JdbcJoinRule(converter1.getJdbcConvention()).convert(newHiveJoin, + false); + if (newJdbcJoin != null) { + RelNode ConverterRes = converter1.copy(converter1.getTraitSet(), Arrays.asList(newJdbcJoin)); + if (ConverterRes != null) { + call.transformTo(ConverterRes); + } + } + } + +}; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCProjectPushDownRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCProjectPushDownRule.java new file mode 100644 index 0000000..33d4c5f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCProjectPushDownRule.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.Arrays; + +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcProject; +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcProjectRule; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JDBCProjectPushDownRule convert a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject} + * into a {@link org.apache.calcite.adapter.jdbc.JdbcRules.JdbcAggregateRule.JdbcProject} + * and pushes it down below the {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter}} + * operator so it will be sent to the external table. + */ + +public class JDBCProjectPushDownRule extends RelOptRule { + private static final Logger LOG = LoggerFactory.getLogger(JDBCProjectPushDownRule.class); + + public static final JDBCProjectPushDownRule INSTANCE = new JDBCProjectPushDownRule(); + + public JDBCProjectPushDownRule() { + super(operand(HiveProject.class, + operand(HiveJdbcConverter.class, any()))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final HiveProject project = call.rel(0); + final HiveJdbcConverter conv = call.rel(1); + for (RexNode currProject : project.getProjects()) { + if (JDBCRexCallValidator.isValidJdbcOperation(currProject, conv.getJdbcDialect()) == false) { + return false; + } + } + + return true; + } + + @Override + public void onMatch(RelOptRuleCall call) { + LOG.debug("JDBCProjectPushDownRule has been called"); + + final HiveProject project = call.rel(0); + final HiveJdbcConverter converter = call.rel(1); + + Project newHiveProject = project.copy(project.getTraitSet(), converter.getInput(), + project.getProjects(), project.getRowType()); + JdbcProject newJdbcProject = + (JdbcProject) new JdbcProjectRule(converter.getJdbcConvention()).convert(newHiveProject); + if (newJdbcProject != null) { + RelNode converterRes = converter.copy(converter.getTraitSet(), Arrays.asList(newJdbcProject)); + call.transformTo(converterRes); + } + } + +}; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCRexCallValidator.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCRexCallValidator.java new file mode 100644 index 0000000..2512d0c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCRexCallValidator.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A utility class that helps identify Hive-Jdbc functions gaps. + */ + +public class JDBCRexCallValidator { + + private static final Logger LOG = LoggerFactory.getLogger(JDBCRexCallValidator.class); + + private static class JdbcRexCallValidatorVisitor extends RexVisitorImpl { + private final SqlDialect dialect; + + JdbcRexCallValidatorVisitor(SqlDialect dialect) { + super(true); + this.dialect = dialect; + } + + boolean res = true; + + private boolean validRexCall(RexCall call) { + if (call instanceof RexOver) { + LOG.debug("RexOver operator push down is not supported for now with the following operator:" + call); + return false; + } + final SqlOperator operator = call.getOperator(); + List operands = call.getOperands(); + RelDataType resType = call.getType(); + ArrayList paramsListType = new ArrayList(); + for (RexNode currNode : operands) { + paramsListType.add(currNode.getType()); + } + return dialect.supportsFunction(operator, resType, paramsListType); + } + + @Override + public Void visitCall(RexCall call) { + if (res) { + res = validRexCall(call); + if (res) { + return super.visitCall(call); + } + } + return null; + } + + private boolean go(RexNode cond) { + cond.accept(this); + return res; + } + } + + public static boolean isValidJdbcOperation(RexNode cond, SqlDialect dialect) { + return new JdbcRexCallValidatorVisitor(dialect).go(cond); + } + +}; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCSortPushDownRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCSortPushDownRule.java new file mode 100644 index 0000000..059d7e7 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCSortPushDownRule.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.Arrays; + +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcSortRule; +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcSort; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JDBCSortPushDownRule convert a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit} + * into a {@link org.apache.calcite.adapter.jdbc.JdbcRules.JdbcSort} + * and pushes it down below the {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter}} + * operator so it will be sent to the external table. + */ + +public class JDBCSortPushDownRule extends RelOptRule { + private static final Logger LOG = LoggerFactory.getLogger(JDBCSortPushDownRule.class); + + public static final JDBCSortPushDownRule INSTANCE = new JDBCSortPushDownRule(); + + public JDBCSortPushDownRule() { + super(operand(HiveSortLimit.class, + operand(HiveJdbcConverter.class, operand(RelNode.class, any())))); + } + + public boolean matches(RelOptRuleCall call) { + final Sort sort = (Sort) call.rel(0); + final HiveJdbcConverter conv = call.rel(1); + + for (RexNode currCall : sort.getChildExps()) { + if (!JDBCRexCallValidator.isValidJdbcOperation(currCall, conv.getJdbcDialect())) { + return false; + } + } + + return true; + } + + @Override + public void onMatch(RelOptRuleCall call) { + LOG.debug("JDBCSortPushDownRule has been called"); + + final HiveSortLimit sort = call.rel(0); + final HiveJdbcConverter converter = call.rel(1); + final RelNode input = call.rel(2); + + Sort newHiveSort = sort.copy(sort.getTraitSet(), input, sort.getCollation(), sort.getOffsetExpr (), sort.getFetchExpr()); + JdbcSort newJdbcSort = + (JdbcSort) new JdbcSortRule(converter.getJdbcConvention()).convert(newHiveSort, false); + if (newJdbcSort != null) { + RelNode converterRes = converter.copy(converter.getTraitSet(), Arrays.asList(newJdbcSort)); + + call.transformTo(converterRes); + } + } + +}; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCUnionPushDownRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCUnionPushDownRule.java new file mode 100644 index 0000000..d4f3b0e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCUnionPushDownRule.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.Arrays; +import java.util.List; + +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcUnion; +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcUnionRule; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Union; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JDBCUnionPushDownRule convert a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion} + * into a {@link org.apache.calcite.adapter.jdbc.JdbcRules.JdbcUnion} + * and pushes it down below the {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter}} + * operator so it will be sent to the external table. + */ + +public class JDBCUnionPushDownRule extends RelOptRule { + private static final Logger LOG = LoggerFactory.getLogger(JDBCUnionPushDownRule.class); + + public static final JDBCUnionPushDownRule INSTANCE = new JDBCUnionPushDownRule(); + + public JDBCUnionPushDownRule() { + super(operand(HiveUnion.class, + operand(HiveJdbcConverter.class, any()), + operand(HiveJdbcConverter.class, any()))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final HiveUnion union = call.rel(0); + final HiveJdbcConverter converter1 = call.rel(1); + final HiveJdbcConverter converter2 = call.rel(2); + + //The actual check should be the compare of the connection string of the external tables + /*if (converter1.getJdbcConvention().equals(converter2.getJdbcConvention()) == false) { + return false; + }*/ + + if (!converter1.getJdbcConvention().getName().equals(converter2.getJdbcConvention().getName())) { + return false; + } + + return union.getInputs().size() == 2; + } + + @Override + public void onMatch(RelOptRuleCall call) { + LOG.debug("JDBCUnionPushDown has been called"); + + final HiveUnion union = call.rel(0); + final HiveJdbcConverter converter1 = call.rel(1); + final HiveJdbcConverter converter2 = call.rel(2); + + final List unionInput = Arrays.asList(converter1.getInput(), converter2.getInput()); + Union newHiveUnion = (Union) union.copy(union.getTraitSet(), unionInput, union.all); + JdbcUnion newJdbcUnion = (JdbcUnion) new JdbcUnionRule(converter1.getJdbcConvention()).convert(newHiveUnion); + if (newJdbcUnion != null) { + RelNode converterRes = converter1.copy(converter1.getTraitSet(), Arrays.asList(newJdbcUnion)); + + call.transformTo(converterRes); + } + } + +}; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java index 79fcfcf..17a48f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java @@ -33,13 +33,19 @@ import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.JdbcHiveTableScan; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.ParseDriver; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ASTBuilder { + public static final Logger LOGGER = LoggerFactory.getLogger(ASTBuilder.class); public static ASTBuilder construct(int tokenType, String text) { ASTBuilder b = new ASTBuilder(); @@ -59,14 +65,20 @@ public static ASTNode destNode() { "TOK_TMP_FILE")).node(); } - public static ASTNode table(RelNode scan) { - HiveTableScan hts; - if (scan instanceof DruidQuery) { - hts = (HiveTableScan) ((DruidQuery)scan).getTableScan(); + public static ASTNode table(final RelNode scan) { + HiveTableScan hts = null; + if (scan instanceof HiveJdbcConverter) { + HiveJdbcConverter jdbcConverter = (HiveJdbcConverter) scan; + JdbcHiveTableScan jdbcHiveTableScan = jdbcConverter.getTableScan(); + + hts = jdbcHiveTableScan.getHiveTableScan(); + } else if (scan instanceof DruidQuery) { + hts = (HiveTableScan) ((DruidQuery) scan).getTableScan(); } else { hts = (HiveTableScan) scan; } + assert hts != null; RelOptHiveTable hTbl = (RelOptHiveTable) hts.getTable(); ASTBuilder b = ASTBuilder.construct(HiveParser.TOK_TABREF, "TOK_TABREF").add( ASTBuilder.construct(HiveParser.TOK_TABNAME, "TOK_TABNAME") @@ -100,13 +112,26 @@ public static ASTNode table(RelNode scan) { propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY") .add(HiveParser.StringLiteral, "\"" + Constants.DRUID_QUERY_TYPE + "\"") .add(HiveParser.StringLiteral, "\"" + dq.getQueryType().getQueryName() + "\"")); + } else if (scan instanceof HiveJdbcConverter) { + HiveJdbcConverter jdbcConverter = (HiveJdbcConverter) scan; + final String query = jdbcConverter.generateSql (); + LOGGER.info("The HiveJdbcConverter generated sql message is: " + System.lineSeparator() + query); + propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY") + .add(HiveParser.StringLiteral, "\"" + Constants.JDBC_QUERY + "\"") + .add(HiveParser.StringLiteral, "\"" + SemanticAnalyzer.escapeSQLString(query) + "\"")); + + propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY") + .add(HiveParser.StringLiteral, "\"" + Constants.HIVE_JDBC_QUERY + "\"") + .add(HiveParser.StringLiteral, "\"" + SemanticAnalyzer.escapeSQLString(query) + "\"")); } + if (hts.isInsideView()) { // We need to carry the insideView information from calcite into the ast. propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY") .add(HiveParser.StringLiteral, "\"insideView\"") .add(HiveParser.StringLiteral, "\"TRUE\"")); } + b.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTIES, "TOK_TABLEPROPERTIES").add(propList)); // NOTE: Calcite considers tbls to be equal if their names are the same. Hence diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java index 47c00aa..141ebe5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java @@ -64,9 +64,11 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableFunctionScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.JdbcHiveTableScan; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConverter.HiveToken; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.HiveParser; @@ -344,6 +346,10 @@ private QueryBlockInfo convertSource(RelNode r) throws CalciteSemanticException TableScan f = (TableScan) r; s = new Schema(f); ast = ASTBuilder.table(f); + } else if (r instanceof HiveJdbcConverter) { + HiveJdbcConverter f = (HiveJdbcConverter) r; + s = new Schema(f); + ast = ASTBuilder.table(f); } else if (r instanceof DruidQuery) { DruidQuery f = (DruidQuery) r; s = new Schema(f); @@ -425,7 +431,8 @@ public void handle(TableFunctionScan tableFunctionScan) { public void visit(RelNode node, int ordinal, RelNode parent) { if (node instanceof TableScan || - node instanceof DruidQuery) { + node instanceof DruidQuery || + node instanceof HiveJdbcConverter) { ASTConverter.this.from = node; } else if (node instanceof Filter) { handle((Filter) node); @@ -765,6 +772,15 @@ public QueryBlockInfo(Schema schema, ASTNode ast) { } } + Schema(HiveJdbcConverter scan) { + HiveJdbcConverter jdbcHiveCoverter = scan; + final JdbcHiveTableScan jdbcTableScan = jdbcHiveCoverter.getTableScan(); + String tabName = jdbcTableScan.getHiveTableScan().getTableAlias(); + for (RelDataTypeField field : jdbcHiveCoverter.getRowType().getFieldList()) { + add(new ColumnInfo(tabName, field.getName())); + } + } + Schema(Project select, String alias) { for (RelDataTypeField field : select.getRowType().getFieldList()) { add(new ColumnInfo(alias, field.getName())); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 062df06..bb5bd2f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import javax.sql.DataSource; import com.google.common.collect.Iterables; import org.antlr.runtime.ClassicToken; import org.antlr.runtime.CommonToken; @@ -52,6 +53,9 @@ import org.apache.calcite.adapter.druid.DruidQuery; import org.apache.calcite.adapter.druid.DruidSchema; import org.apache.calcite.adapter.druid.DruidTable; +import org.apache.calcite.adapter.jdbc.JdbcConvention; +import org.apache.calcite.adapter.jdbc.JdbcSchema; +import org.apache.calcite.adapter.jdbc.JdbcTable; import org.apache.calcite.config.CalciteConnectionConfig; import org.apache.calcite.config.CalciteConnectionConfigImpl; import org.apache.calcite.config.CalciteConnectionProperty; @@ -106,6 +110,8 @@ import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlDialectFactoryImpl; import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; @@ -171,6 +177,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIntersect; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode; @@ -179,6 +186,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableFunctionScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.JdbcHiveTableScan; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveAggregateJoinTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveAggregateProjectMergeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveAggregatePullUpConstantsRule; @@ -226,10 +234,22 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveUnionMergeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveUnionPullUpConstantsRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveWindowingFixRule; + +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc.JDBCAggregationPushDownRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc.JDBCFilterJoinRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc.JDBCFilterPushDownRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc.JDBCExtractJoinFilterRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc.JDBCJoinPushDownRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc.JDBCProjectPushDownRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc.JDBCSortPushDownRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc.JDBCUnionPushDownRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc.JDBCAbstractSplitFilterRule; + import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveAggregateIncrementalRewritingRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializedViewRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveNoAggregateIncrementalRewritingRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.MaterializedViewRewritingRelVisitor; + import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTBuilder; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter; @@ -1830,6 +1850,17 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu ); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Druid transformation rules"); + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, true, mdProvider.getMetadataProvider(), null, + HepMatchOrder.TOP_DOWN, + JDBCExtractJoinFilterRule.INSTANCE, + JDBCAbstractSplitFilterRule.SPLIT_FILTER_ABOVE_JOIN, + JDBCAbstractSplitFilterRule.SPLIT_FILTER_ABOVE_CONVERTER, + JDBCFilterJoinRule.INSTANCE, + JDBCJoinPushDownRule.INSTANCE, JDBCUnionPushDownRule.INSTANCE, + JDBCFilterPushDownRule.INSTANCE, JDBCProjectPushDownRule.INSTANCE, + JDBCAggregationPushDownRule.INSTANCE, JDBCSortPushDownRule.INSTANCE + ); + // 12. Run rules to aid in translation from Calcite tree to Hive tree if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); @@ -2722,7 +2753,8 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc } // 4. Build operator - if (tableType == TableType.DRUID) { + if (tableType == TableType.DRUID || + (tableType == TableType.JDBC && tabMetaData.getProperty("hive.sql.table") != null)) { // Create case sensitive columns list List originalColumnNames = ((StandardStructObjectInspector)rowObjectInspector).getOriginalColumnNames(); @@ -2742,51 +2774,79 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc fullyQualifiedTabName = tabMetaData.getTableName(); } - // Build Druid query - String address = HiveConf.getVar(conf, - HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); - String dataSource = tabMetaData.getParameters().get(Constants.DRUID_DATA_SOURCE); - Set metrics = new HashSet<>(); - RexBuilder rexBuilder = cluster.getRexBuilder(); - RelDataTypeFactory dtFactory = rexBuilder.getTypeFactory(); - List druidColTypes = new ArrayList<>(); - List druidColNames = new ArrayList<>(); - //@TODO FIX this, we actually do not need this anymore, - // in addition to that Druid allow numeric dimensions now so this check is not accurate - for (RelDataTypeField field : rowType.getFieldList()) { - if (DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(field.getName())) { - // Druid's time column is always not null. - druidColTypes.add(dtFactory.createTypeWithNullability(field.getType(), false)); - } else { - druidColTypes.add(field.getType()); - } - druidColNames.add(field.getName()); - if (field.getName().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) { - // timestamp - continue; - } - if (field.getType().getSqlTypeName() == SqlTypeName.VARCHAR) { - // dimension - continue; - } - metrics.add(field.getName()); - } - - List intervals = Arrays.asList(DruidTable.DEFAULT_INTERVAL); - rowType = dtFactory.createStructType(druidColTypes, druidColNames); - DruidTable druidTable = new DruidTable(new DruidSchema(address, address, false), - dataSource, RelDataTypeImpl.proto(rowType), metrics, DruidTable.DEFAULT_TIMESTAMP_COLUMN, - intervals, null, null); RelOptHiveTable optTable = new RelOptHiveTable(relOptSchema, fullyQualifiedTabName, rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf, partitionCache, colStatsCache, noColsMissingStats); - final TableScan scan = new HiveTableScan(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), - optTable, null == tableAlias ? tabMetaData.getTableName() : tableAlias, - getAliasId(tableAlias, qb), HiveConf.getBoolVar(conf, - HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP), qb.isInsideView() - || qb.getAliasInsideView().contains(tableAlias.toLowerCase())); - tableRel = DruidQuery.create(cluster, cluster.traitSetOf(BindableConvention.INSTANCE), - optTable, druidTable, ImmutableList.of(scan), DruidSqlOperatorConverter.getDefaultMap()); + + final HiveTableScan hts = new HiveTableScan(cluster, + cluster.traitSetOf(HiveRelNode.CONVENTION), optTable, + null == tableAlias ? tabMetaData.getTableName() : tableAlias, + getAliasId(tableAlias, qb), + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP), + qb.isInsideView() || qb.getAliasInsideView().contains(tableAlias.toLowerCase())); + + if (tableType == TableType.DRUID) { + // Build Druid query + String address = + HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); + String dataSource = tabMetaData.getParameters().get(Constants.DRUID_DATA_SOURCE); + Set metrics = new HashSet<>(); + RexBuilder rexBuilder = cluster.getRexBuilder(); + RelDataTypeFactory dtFactory = rexBuilder.getTypeFactory(); + List druidColTypes = new ArrayList<>(); + List druidColNames = new ArrayList<>(); + //@TODO FIX this, we actually do not need this anymore, + // in addition to that Druid allow numeric dimensions now so this check is not accurate + for (RelDataTypeField field : rowType.getFieldList()) { + if (DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(field.getName())) { + // Druid's time column is always not null. + druidColTypes.add(dtFactory.createTypeWithNullability(field.getType(), false)); + } else { + druidColTypes.add(field.getType()); + } + druidColNames.add(field.getName()); + if (field.getName().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) { + // timestamp + continue; + } + if (field.getType().getSqlTypeName() == SqlTypeName.VARCHAR) { + // dimension + continue; + } + metrics.add(field.getName()); + } + + List intervals = Arrays.asList(DruidTable.DEFAULT_INTERVAL); + rowType = dtFactory.createStructType(druidColTypes, druidColNames); + DruidTable druidTable = new DruidTable(new DruidSchema(address, address, false), + dataSource, RelDataTypeImpl.proto(rowType), metrics, DruidTable.DEFAULT_TIMESTAMP_COLUMN, + intervals, null, null); + + tableRel = DruidQuery.create(cluster, cluster.traitSetOf(BindableConvention.INSTANCE), + optTable, druidTable, ImmutableList.of(hts)); + } else if (tableType == TableType.JDBC) { + LOG.debug("JDBC is running"); + final String dataBaseType = tabMetaData.getProperty("hive.sql.database.type"); + final String url = tabMetaData.getProperty("hive.sql.jdbc.url"); + final String driver = tabMetaData.getProperty("hive.sql.jdbc.driver"); + final String user = tabMetaData.getProperty("hive.sql.dbcp.username"); + final String pswd = tabMetaData.getProperty("hive.sql.dbcp.password"); + //final String query = tabMetaData.getProperty("hive.sql.query"); + final String tableName = tabMetaData.getProperty("hive.sql.table"); + + final DataSource ds = JdbcSchema.dataSource(url, driver, user, pswd); + SqlDialect jdbcDialect = JdbcSchema.createDialect(SqlDialectFactoryImpl.INSTANCE, ds); + JdbcConvention jc = JdbcConvention.of(jdbcDialect, null, dataBaseType); + JdbcSchema schema = new JdbcSchema(ds, jc.dialect, jc, null/*catalog */, null/*schema */); + JdbcTable jt = (JdbcTable) schema.getTable(tableName); + if (jt == null) { + throw new SemanticException("Table " + tableName + " was not found in the database"); + } + + JdbcHiveTableScan jdbcTableRel = new JdbcHiveTableScan(cluster, optTable, jt, jc, hts); + tableRel = new HiveJdbcConverter(cluster, jdbcTableRel.getTraitSet().replace(HiveRelNode.CONVENTION), + jdbcTableRel, jc); + } } else { // Build row type from field RelDataType rowType = inferNotNullableColumns(tabMetaData, TypeConverter.getType(cluster, rr, null)); @@ -2886,11 +2946,20 @@ private RelDataType inferNotNullableColumns(Table tabMetaData, RelDataType rowTy } private TableType obtainTableType(Table tabMetaData) { - if (tabMetaData.getStorageHandler() != null && - tabMetaData.getStorageHandler().toString().equals( - Constants.DRUID_HIVE_STORAGE_HANDLER_ID)) { - return TableType.DRUID; + if (tabMetaData.getStorageHandler() != null) { + final String storageHandlerStr = tabMetaData.getStorageHandler().toString(); + if (storageHandlerStr + .equals(Constants.DRUID_HIVE_STORAGE_HANDLER_ID)) { + return TableType.DRUID; + } + + if (storageHandlerStr + .equals(Constants.JDBC_HIVE_STORAGE_HANDLER_ID)) { + return TableType.JDBC; + } + } + return TableType.NATIVE; } @@ -4975,6 +5044,7 @@ private QBParseInfo getQBParseInfo(QB qb) throws CalciteSemanticException { private enum TableType { DRUID, - NATIVE + NATIVE, + JDBC } } diff --git ql/src/test/queries/clientpositive/jdbc_handler.q ql/src/test/queries/clientpositive/jdbc_handler.q index 61e02a8..4d7effd 100644 --- ql/src/test/queries/clientpositive/jdbc_handler.q +++ ql/src/test/queries/clientpositive/jdbc_handler.q @@ -1,6 +1,45 @@ --! qt:dataset:src set hive.strict.checks.cartesian.product= false; + + +CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput'; + + +FROM src + +SELECT dboutput ( 'jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby_as_external_table_db;create=true','','', +'CREATE TABLE SIMPLE_DERBY_TABLE ("kkey" INTEGER NOT NULL )' ), + +dboutput('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby_as_external_table_db;create=true','','', +'INSERT INTO SIMPLE_DERBY_TABLE ("kkey") VALUES (?)','20'), + +dboutput('jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby_as_external_table_db;create=true','','', +'INSERT INTO SIMPLE_DERBY_TABLE ("kkey") VALUES (?)','200') + +limit 1; + +CREATE EXTERNAL TABLE ext_simple_derby_table +( + kkey bigint +) +STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' +TBLPROPERTIES ( + "hive.sql.database.type" = "DERBY", + "hive.sql.jdbc.driver" = "org.apache.derby.jdbc.EmbeddedDriver", + "hive.sql.jdbc.url" = "jdbc:derby:;databaseName=${system:test.tmp.dir}/test_derby_as_external_table_db;create=true;collation=TERRITORY_BASED:PRIMARY", + "hive.sql.dbcp.username" = "APP", + "hive.sql.dbcp.password" = "mine", + "hive.sql.table" = "SIMPLE_DERBY_TABLE", + "hive.sql.dbcp.maxActive" = "1" +); + +select * from ext_simple_derby_table; + +explain select * from ext_simple_derby_table where 100 < ext_simple_derby_table.kkey; + +select * from ext_simple_derby_table where 100 < ext_simple_derby_table.kkey; + CREATE EXTERNAL TABLE tables ( id bigint, @@ -27,6 +66,7 @@ TBLPROPERTIES ( "hive.sql.query" = "SELECT DB_ID, NAME FROM DBS" ); + select tables.name as tn, dbs.NAME as dn, tables.type as t from tables join dbs on (tables.db_id = dbs.DB_ID) WHERE tables.name IN ("src", "dbs", "tables") order by tn, dn, t; diff --git ql/src/test/results/clientpositive/llap/jdbc_handler.q.out ql/src/test/results/clientpositive/llap/jdbc_handler.q.out index 3be32fb..3868b42 100644 --- ql/src/test/results/clientpositive/llap/jdbc_handler.q.out +++ ql/src/test/results/clientpositive/llap/jdbc_handler.q.out @@ -1,3 +1,119 @@ +PREHOOK: query: CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput' +PREHOOK: type: CREATEFUNCTION +PREHOOK: Output: dboutput +POSTHOOK: query: CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput' +POSTHOOK: type: CREATEFUNCTION +POSTHOOK: Output: dboutput +PREHOOK: query: FROM src + +#### A masked pattern was here #### +'CREATE TABLE SIMPLE_DERBY_TABLE ("kkey" INTEGER NOT NULL )' ), + +#### A masked pattern was here #### +'INSERT INTO SIMPLE_DERBY_TABLE ("kkey") VALUES (?)','20'), + +#### A masked pattern was here #### +'INSERT INTO SIMPLE_DERBY_TABLE ("kkey") VALUES (?)','200') + +limit 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: FROM src + +#### A masked pattern was here #### +'CREATE TABLE SIMPLE_DERBY_TABLE ("kkey" INTEGER NOT NULL )' ), + +#### A masked pattern was here #### +'INSERT INTO SIMPLE_DERBY_TABLE ("kkey") VALUES (?)','20'), + +#### A masked pattern was here #### +'INSERT INTO SIMPLE_DERBY_TABLE ("kkey") VALUES (?)','200') + +limit 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 0 0 +PREHOOK: query: CREATE EXTERNAL TABLE ext_simple_derby_table +( + kkey bigint +) +STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' +TBLPROPERTIES ( + "hive.sql.database.type" = "DERBY", + "hive.sql.jdbc.driver" = "org.apache.derby.jdbc.EmbeddedDriver", +#### A masked pattern was here #### + "hive.sql.dbcp.username" = "APP", + "hive.sql.dbcp.password" = "mine", + "hive.sql.table" = "SIMPLE_DERBY_TABLE", + "hive.sql.dbcp.maxActive" = "1" +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ext_simple_derby_table +POSTHOOK: query: CREATE EXTERNAL TABLE ext_simple_derby_table +( + kkey bigint +) +STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' +TBLPROPERTIES ( + "hive.sql.database.type" = "DERBY", + "hive.sql.jdbc.driver" = "org.apache.derby.jdbc.EmbeddedDriver", +#### A masked pattern was here #### + "hive.sql.dbcp.username" = "APP", + "hive.sql.dbcp.password" = "mine", + "hive.sql.table" = "SIMPLE_DERBY_TABLE", + "hive.sql.dbcp.maxActive" = "1" +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ext_simple_derby_table +PREHOOK: query: select * from ext_simple_derby_table +PREHOOK: type: QUERY +PREHOOK: Input: default@ext_simple_derby_table +#### A masked pattern was here #### +POSTHOOK: query: select * from ext_simple_derby_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ext_simple_derby_table +#### A masked pattern was here #### +20 +200 +PREHOOK: query: explain select * from ext_simple_derby_table where 100 < ext_simple_derby_table.kkey +PREHOOK: type: QUERY +POSTHOOK: query: explain select * from ext_simple_derby_table where 100 < ext_simple_derby_table.kkey +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: ext_simple_derby_table + properties: + hive.sql.generated.query SELECT * +FROM "SIMPLE_DERBY_TABLE" +WHERE 100 < "kkey" + hive.sql.query SELECT * +FROM "SIMPLE_DERBY_TABLE" +WHERE 100 < "kkey" + Select Operator + expressions: kkey (type: int) + outputColumnNames: _col0 + ListSink + +PREHOOK: query: select * from ext_simple_derby_table where 100 < ext_simple_derby_table.kkey +PREHOOK: type: QUERY +PREHOOK: Input: default@ext_simple_derby_table +#### A masked pattern was here #### +POSTHOOK: query: select * from ext_simple_derby_table where 100 < ext_simple_derby_table.kkey +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ext_simple_derby_table +#### A masked pattern was here #### +200 PREHOOK: query: CREATE EXTERNAL TABLE tables ( id bigint,