diff --git common/src/java/org/apache/hadoop/hive/conf/Constants.java common/src/java/org/apache/hadoop/hive/conf/Constants.java index 15397392e3..7a9439506d 100644 --- common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -67,6 +67,8 @@ public static final String JDBC_LOW_BOUND = JDBC_CONFIG_PREFIX + ".lowerBound"; public static final String JDBC_UPPER_BOUND = JDBC_CONFIG_PREFIX + ".upperBound"; + public static final String JDBC_TERADATA_QUERY_BAND = JDBC_CONFIG_PREFIX + ".td.queryBand"; + public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD"; public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD"; public static final String HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG = "hadoop.security.credential.provider.path"; diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java index 2d00755be8..d044678e6c 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java @@ -148,11 +148,15 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { try { classesToLoad.add(Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")); } catch (Exception e) { - } + } // Adding db2 jdbc driver if exists try { classesToLoad.add(Class.forName("com.ibm.db2.jcc.DB2Driver")); } catch (Exception e) { - } // Adding db2 jdbc driver if exists + } // Adding teradata jdbc driver if exists + try { + classesToLoad.add(Class.forName("com.teradata.jdbc.TeraDriver")); + } catch (Exception e) { + } try { JarUtils.addDependencyJars(conf, classesToLoad); } catch (IOException e) { diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java index bdcc3f33d8..afd2515d09 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java @@ -23,5 +23,6 @@ POSTGRES, MSSQL, METASTORE, - JETHRO_DATA + JETHRO_DATA, + TERADATA } diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java index e531ecc17a..bf12ae4030 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java @@ -19,6 +19,8 @@ import org.apache.hive.storage.jdbc.conf.DatabaseType; import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig; +import static org.apache.hadoop.hive.conf.Constants.JDBC_TERADATA_QUERY_BAND; + /** * Factory for creating the correct DatabaseAccessor class for the job */ @@ -28,7 +30,7 @@ private DatabaseAccessorFactory() { } - public static DatabaseAccessor getAccessor(DatabaseType dbType) { + public static DatabaseAccessor getAccessor(DatabaseType dbType, Configuration conf) { DatabaseAccessor accessor = null; switch (dbType) { @@ -55,6 +57,10 @@ public static DatabaseAccessor getAccessor(DatabaseType dbType) { accessor = new DB2DatabaseAccessor(); break; + case TERADATA: + accessor = new TeradataDatabaseAccessor(conf.get(JDBC_TERADATA_QUERY_BAND)); + break; + default: accessor = new GenericJdbcDatabaseAccessor(); break; @@ -67,7 +73,7 @@ public static DatabaseAccessor getAccessor(DatabaseType dbType) { public static DatabaseAccessor getAccessor(Configuration conf) { DatabaseType dbType = DatabaseType.valueOf( conf.get(JdbcStorageConfig.DATABASE_TYPE.getPropertyName()).toUpperCase()); - return getAccessor(dbType); + return getAccessor(dbType, conf); } } diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java index c2e7473e40..8fc0324664 100644 --- jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java @@ -73,6 +73,7 @@ public GenericJdbcDatabaseAccessor() { LOGGER.debug("Query to execute is [{}]", metadataQuery); conn = dbcpDataSource.getConnection(); + executeSetupQuery(conn); ps = conn.prepareStatement(metadataQuery); rs = ps.executeQuery(); @@ -113,6 +114,7 @@ public int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAc LOGGER.info("Query to execute is [{}]", countQuery); conn = dbcpDataSource.getConnection(); + executeSetupQuery(conn); ps = conn.prepareStatement(countQuery); rs = ps.executeQuery(); if (rs.next()) { @@ -159,6 +161,7 @@ public int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAc LOGGER.info("Query to execute is [{}]", partitionQuery); conn = dbcpDataSource.getConnection(); + executeSetupQuery(conn); ps = conn.prepareStatement(partitionQuery, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ps.setFetchSize(getFetchSize(conf)); rs = ps.executeQuery(); @@ -366,6 +369,7 @@ protected int getFetchSize(Configuration conf) { LOGGER.debug("MIN/MAX Query to execute is [{}]", countQuery); conn = dbcpDataSource.getConnection(); + executeSetupQuery(conn); ps = conn.prepareStatement(countQuery); rs = ps.executeQuery(); String lower = null, upper = null; @@ -408,4 +412,11 @@ private String quote() { public boolean needColumnQuote() { return true; } + + /* + * Allows for implementations to run setup queries without results prior to the main query. + * E.g. "SET ...;" + */ + protected void executeSetupQuery(Connection conn) throws SQLException { + } } diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/TeradataDatabaseAccessor.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/TeradataDatabaseAccessor.java new file mode 100644 index 0000000000..cd91a59d55 --- /dev/null +++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/TeradataDatabaseAccessor.java @@ -0,0 +1,79 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.dao; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.storage.jdbc.exception.HiveJdbcDatabaseAccessException; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * TD specific data accessor. This is needed because TD JDBC drivers do not support generic LIMIT and OFFSET + * escape functions. + */ +public class TeradataDatabaseAccessor extends GenericJdbcDatabaseAccessor { + private String queryBandQuery = ""; + + TeradataDatabaseAccessor(String queryBand) { + if(queryBand != null && !queryBand.isEmpty()) { + this.queryBandQuery = "SET QUERY_BAND = '" + queryBand + "' FOR SESSION;"; + } + } + + private static final String UNSUPPORTED_RECORD_SPLIT_MESSAGE = "TeradataDatabaseAccessor does not support " + + "splits over number of records. Please partition by a column instead, or use only one partition."; + + @Override + public int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAccessException { + throw new HiveJdbcDatabaseAccessException(UNSUPPORTED_RECORD_SPLIT_MESSAGE); + } + + @Override + protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { + // TD cannot grab arbitrary offsets, so only allow the query if offset is 0. + if (offset == 0) { + return addLimitToQuery(sql, limit); + } else { + throw new UnsupportedOperationException(UNSUPPORTED_RECORD_SPLIT_MESSAGE); + } + } + + @Override + protected String addLimitToQuery(String sql, int limit) { + if (limit != -1) { + // Use TOP in preference to SAMPLE as TOP causes less load in TD. + return "SELECT TOP " + limit + "* FROM (" + sql + ") tmptable"; + } else { + return sql; + } + } + + @Override + protected String addBoundaryToQuery(String tableName, String sql, String partitionColumn, String lowerBound, + String upperBound) { + return super.addBoundaryToQuery(tableName, sql, partitionColumn, lowerBound, upperBound); + } + + @Override + public void executeSetupQuery(Connection conn) throws SQLException { + PreparedStatement ps; + if (queryBandQuery != null && !queryBandQuery.isEmpty()) { + ps = conn.prepareStatement(queryBandQuery); + ps.executeUpdate(); + } + } +}