commit 3244cd03e18941fe5e615f07bb4a0b482f02b056 Author: Daniel Dai Date: Tue Oct 9 21:13:16 2018 -0700 HIVE-20720: Add partition column option to JDBC handler diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 4badfa3..c0968b2 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -72,6 +72,10 @@ public static final String JDBC_QUERY_FIELD_NAMES = JDBC_CONFIG_PREFIX + ".query.fieldNames"; public static final String JDBC_QUERY_FIELD_TYPES = JDBC_CONFIG_PREFIX + ".query.fieldTypes"; public static final String JDBC_SPLIT_QUERY = JDBC_CONFIG_PREFIX + ".query.split"; + public static final String JDBC_PARTITION_COLUMN = JDBC_CONFIG_PREFIX + ".partitionColumn"; + public static final String JDBC_NUM_PARTITIONS = JDBC_CONFIG_PREFIX + ".numPartitions"; + public static final String JDBC_LOW_BOUND = JDBC_CONFIG_PREFIX + ".lowerBound"; + public static final String JDBC_UPPER_BOUND = JDBC_CONFIG_PREFIX + ".upperBound"; public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD"; public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD"; diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java index 74999db..59673a8 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java @@ -15,9 +15,14 @@ package org.apache.hive.storage.jdbc; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.mapred.FileInputFormat; @@ -25,6 +30,8 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hive.storage.jdbc.spitter.Splitter; +import org.apache.hive.storage.jdbc.spitter.SplitterFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +39,7 @@ import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory; import java.io.IOException; +import java.util.List; public class JdbcInputFormat extends HiveInputFormat { @@ -61,44 +69,86 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { try { - if (!job.getBoolean(Constants.JDBC_SPLIT_QUERY, true)) { - // We will not split this query + String partitionColumn = job.get(Constants.JDBC_PARTITION_COLUMN); + int numPartitions = job.getInt(Constants.JDBC_NUM_PARTITIONS, -1); + String lowerBound = job.get(Constants.JDBC_LOW_BOUND); + String upperBound = job.get(Constants.JDBC_UPPER_BOUND); + + InputSplit[] splits; + + if (!job.getBoolean(Constants.JDBC_SPLIT_QUERY, true) || numPartitions <= 1) { + // We will not split this query if: + // 1. hive.sql.query.split is set to false (either manually or automatically by calcite + // 2. numPartitions == 1 LOGGER.debug("Creating 1 input splits"); - InputSplit[] splits = new InputSplit[1]; + splits = new InputSplit[1]; splits[0] = new JdbcInputSplit(FileInputFormat.getInputPaths(job)[0]); return splits; } - // We will split this query into n splits - LOGGER.debug("Creating {} input splits", numSplits); dbAccessor = DatabaseAccessorFactory.getAccessor(job); + Path[] tablePaths = FileInputFormat.getInputPaths(job); - int numRecords = numSplits <=1 ? Integer.MAX_VALUE : dbAccessor.getTotalNumberOfRecords(job); + // We will split this query into n splits + LOGGER.debug("Creating {} input splits", numPartitions); - if (numRecords < numSplits) { - numSplits = numRecords; - } + if (partitionColumn != null) { + List columnNames = dbAccessor.getColumnNames(job); + if (!columnNames.contains(partitionColumn)) { + throw new IOException("Cannot find partitionColumn:" + partitionColumn + " in " + columnNames); + } + List hiveColumnTypesList = TypeInfoUtils.getTypeInfosFromTypeString(job.get(serdeConstants.LIST_COLUMN_TYPES)); + TypeInfo typeInfo = hiveColumnTypesList.get(columnNames.indexOf(partitionColumn)); + if (!(typeInfo instanceof PrimitiveTypeInfo)) { + throw new IOException(partitionColumn + " is a complex type, only primitive type can be a partition column"); + } + if (lowerBound == null) { + lowerBound = dbAccessor.getLowerBound(job, partitionColumn); + } + if (upperBound == null) { + upperBound = dbAccessor.getUpperBound(job, partitionColumn); + } + if (lowerBound == null) { + throw new IOException("lowerBound of " + partitionColumn + " cannot be null"); + } + if (upperBound == null) { + throw new IOException("upperBound of " + partitionColumn + " cannot be null"); + } + Splitter splitter = SplitterFactory.newSpitter(typeInfo); + List> ranges = splitter.getRanges(lowerBound, upperBound, numPartitions, typeInfo); + if (ranges.size()<=1) { + LOGGER.debug("Creating 1 input splits"); + splits = new InputSplit[1]; + splits[0] = new JdbcInputSplit(FileInputFormat.getInputPaths(job)[0]); + return splits; + } + splits = new InputSplit[ranges.size()]; + for (int i = 0; i < ranges.size(); i++) { + splits[i] = new JdbcInputSplit(partitionColumn, ranges.get(i).getLeft(), ranges.get(i).getRight()); + } + } else { + int numRecords = dbAccessor.getTotalNumberOfRecords(job); - if (numSplits <= 0) { - numSplits = 1; - } + if (numRecords < numPartitions) { + numPartitions = numRecords; + } - int numRecordsPerSplit = numRecords / numSplits; - int numSplitsWithExtraRecords = numRecords % numSplits; + int numRecordsPerSplit = numRecords / numPartitions; + int numSplitsWithExtraRecords = numRecords % numPartitions; - LOGGER.debug("Num records = {}", numRecords); - InputSplit[] splits = new InputSplit[numSplits]; - Path[] tablePaths = FileInputFormat.getInputPaths(job); + LOGGER.debug("Num records = {}", numRecords); + splits = new InputSplit[numPartitions]; - int offset = 0; - for (int i = 0; i < numSplits; i++) { - int numRecordsInThisSplit = numRecordsPerSplit; - if (i < numSplitsWithExtraRecords) { - numRecordsInThisSplit++; - } + int offset = 0; + for (int i = 0; i < numPartitions; i++) { + int numRecordsInThisSplit = numRecordsPerSplit; + if (i < numSplitsWithExtraRecords) { + numRecordsInThisSplit++; + } - splits[i] = new JdbcInputSplit(numRecordsInThisSplit, offset, tablePaths[0]); - offset += numRecordsInThisSplit; + splits[i] = new JdbcInputSplit(numRecordsInThisSplit, offset, tablePaths[0]); + offset += numRecordsInThisSplit; + } } dbAccessor = null; diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java index 3a6ada8..4d9e1c9 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java @@ -28,6 +28,9 @@ private int limit = 0; private int offset = 0; + private String partitionColumn = null; + private String lowerBound = null; + private String upperBound = null; public JdbcInputSplit() { @@ -54,12 +57,20 @@ public JdbcInputSplit(int limit, int offset) { this.offset = offset; } + public JdbcInputSplit(String partitionColumn, String lowerBound, String upperBound) { + super(null, 0, 0, EMPTY_ARRAY); + this.partitionColumn = partitionColumn; + this.lowerBound = lowerBound; + this.upperBound = upperBound; + } @Override public void write(DataOutput out) throws IOException { super.write(out); out.writeInt(limit); out.writeInt(offset); + out.writeUTF(lowerBound); + out.writeUTF(upperBound); } @@ -68,6 +79,8 @@ public void readFields(DataInput in) throws IOException { super.readFields(in); limit = in.readInt(); offset = in.readInt(); + lowerBound = in.readUTF(); + upperBound = in.readUTF(); } @@ -102,4 +115,15 @@ public void setOffset(int offset) { this.offset = offset; } + public String getPartitionColumn() { + return this.partitionColumn; + } + + public String getLowerBound() { + return this.lowerBound; + } + + public String getUpperBound() { + return this.upperBound; + } } diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java index 1da6213..a3248b4 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java @@ -55,7 +55,8 @@ public boolean next(LongWritable key, MapWritable value) throws IOException { LOGGER.trace("JdbcRecordReader.next called"); if (dbAccessor == null) { dbAccessor = DatabaseAccessorFactory.getAccessor(conf); - iterator = dbAccessor.getRecordIterator(conf, split.getLimit(), split.getOffset()); + iterator = dbAccessor.getRecordIterator(conf, split.getPartitionColumn(), split.getLowerBound(), split + .getUpperBound(), split.getLimit(), split.getOffset()); } if (iterator.hasNext()) { diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java index 5947628..c42e4d0 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java @@ -14,9 +14,7 @@ */ package org.apache.hive.storage.jdbc; -import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.serde.serdeConstants; @@ -35,13 +33,13 @@ import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager; +import org.apache.hive.storage.jdbc.dao.DatabaseAccessor; +import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig; -import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager; -import org.apache.hive.storage.jdbc.dao.DatabaseAccessor; -import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory; import java.math.BigDecimal; import java.util.ArrayList; @@ -73,42 +71,24 @@ public void initialize(Configuration conf, Properties properties) throws SerDeEx if (properties.containsKey(JdbcStorageConfig.DATABASE_TYPE.getPropertyName())) { Configuration tableConfig = JdbcStorageConfigManager.convertPropertiesToConfiguration(properties); DatabaseAccessor dbAccessor = DatabaseAccessorFactory.getAccessor(tableConfig); - - // Extract information from properties - String[] jdbcColumnNamesArray; - List hiveColumnTypesArray; - if (properties.containsKey(Constants.JDBC_TABLE) && properties.containsKey(Constants.JDBC_QUERY)) { - // The query has been autogenerated by Hive, the column names are the - // same in the query pushed and the list of hiveColumnNames - String fieldNamesProperty = - Preconditions.checkNotNull(properties.getProperty(Constants.JDBC_QUERY_FIELD_NAMES, null)); - String fieldTypesProperty = - Preconditions.checkNotNull(properties.getProperty(Constants.JDBC_QUERY_FIELD_TYPES, null)); - hiveColumnNames = fieldNamesProperty.trim().split(","); - jdbcColumnNamesArray = hiveColumnNames; - hiveColumnTypesArray = TypeInfoUtils.getTypeInfosFromTypeString(fieldTypesProperty); - } else { - // The query was hardcoded by user or we are creating the table. - // We obtain the column names with the db accessor. - List columnNames = dbAccessor.getColumnNames(tableConfig); - hiveColumnNames = columnNames.toArray(new String[columnNames.size()]); - // These are the column names for the table defined with the JDBC storage handler. - jdbcColumnNamesArray = parseProperty(properties.getProperty(serdeConstants.LIST_COLUMNS), ","); - if (hiveColumnNames.length != jdbcColumnNamesArray.length) { - throw new SerDeException("Expected " + hiveColumnNames.length + " hiveColumnNames. Table definition has " - + jdbcColumnNamesArray.length + " hiveColumnNames"); - } - hiveColumnTypesArray = TypeInfoUtils.getTypeInfosFromTypeString(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES)); + // Extract column names and types from properties + List columnNames = dbAccessor.getColumnNames(tableConfig); + hiveColumnNames = columnNames.toArray(new String[columnNames.size()]);; + if (hiveColumnNames.length == 0) { + throw new SerDeException("Received an empty Hive column name definition"); } - if (hiveColumnTypesArray.size() == 0) { + List hiveColumnTypesList = TypeInfoUtils.getTypeInfosFromTypeString(properties.getProperty + (serdeConstants + .LIST_COLUMN_TYPES)); + if (hiveColumnTypesList.size() == 0) { throw new SerDeException("Received an empty Hive column type definition"); } // Populate column types and inspector - hiveColumnTypes = new PrimitiveTypeInfo[hiveColumnTypesArray.size()]; + hiveColumnTypes = new PrimitiveTypeInfo[hiveColumnTypesList.size()]; List fieldInspectors = new ArrayList<>(hiveColumnNames.length); for (int i = 0; i < hiveColumnNames.length; i++) { - TypeInfo ti = hiveColumnTypesArray.get(i); + TypeInfo ti = hiveColumnTypesList.get(i); if (ti.getCategory() != Category.PRIMITIVE) { throw new SerDeException("Non primitive types not supported yet"); } @@ -117,7 +97,7 @@ public void initialize(Configuration conf, Properties properties) throws SerDeEx PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(hiveColumnTypes[i])); } inspector = - ObjectInspectorFactory.getStandardStructObjectInspector(Arrays.asList(jdbcColumnNamesArray), + ObjectInspectorFactory.getStandardStructObjectInspector(Arrays.asList(hiveColumnNames), fieldInspectors); row = new ArrayList<>(hiveColumnNames.length); } @@ -128,16 +108,6 @@ public void initialize(Configuration conf, Properties properties) throws SerDeEx } } - - private String[] parseProperty(String propertyValue, String delimiter) { - if ((propertyValue == null) || (propertyValue.trim().isEmpty())) { - return new String[] {}; - } - - return propertyValue.split(delimiter); - } - - @Override public Object deserialize(Writable blob) throws SerDeException { LOGGER.trace("Deserializing from SerDe"); diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java index fdaa794..c377da1 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java @@ -16,6 +16,7 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hive.storage.jdbc.exception.HiveJdbcDatabaseAccessException; import java.util.List; @@ -29,6 +30,12 @@ int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAccessException; JdbcRecordIterator - getRecordIterator(Configuration conf, int limit, int offset) throws HiveJdbcDatabaseAccessException; + getRecordIterator(Configuration conf, String partitionColumn, String lowerBound, String upperBound, int limit, int + offset) throws + HiveJdbcDatabaseAccessException; + String getLowerBound(Configuration conf, String partitionColumn) throws + HiveJdbcDatabaseAccessException; + + String getUpperBound(Configuration conf, String partitionColumn) throws HiveJdbcDatabaseAccessException; } diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java index abdc5f0..c3a8f5d 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -200,7 +201,9 @@ public int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAc @Override public JdbcRecordIterator - getRecordIterator(Configuration conf, int limit, int offset) throws HiveJdbcDatabaseAccessException { + getRecordIterator(Configuration conf, String partitionColumn, String lowerBound, String upperBound, int limit, int + offset) throws + HiveJdbcDatabaseAccessException { Connection conn = null; PreparedStatement ps = null; @@ -209,11 +212,16 @@ public int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAc try { initializeDatabaseConnection(conf); String sql = JdbcStorageConfigManager.getQueryToExecute(conf); - String limitQuery = addLimitAndOffsetToQuery(sql, limit, offset); - LOGGER.info("Query to execute is [{}]", limitQuery); + String partitionQuery; + if (partitionColumn != null) { + partitionQuery = addBoundaryToQuery(sql, partitionColumn, lowerBound, upperBound); + } else { + partitionQuery = addLimitAndOffsetToQuery(sql, limit, offset); + } + LOGGER.info("Query to execute is [{}]", partitionQuery); conn = dbcpDataSource.getConnection(); - ps = conn.prepareStatement(limitQuery, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + ps = conn.prepareStatement(partitionQuery, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ps.setFetchSize(getFetchSize(conf)); rs = ps.executeQuery(); @@ -245,7 +253,6 @@ protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { } } - /* * Uses generic JDBC escape functions to add a limit clause to a query string */ @@ -256,6 +263,19 @@ protected String addLimitToQuery(String sql, int limit) { return sql + " {LIMIT " + limit + "}"; } + protected String addBoundaryToQuery(String sql, String partitionColumn, String lowerBound, String upperBound) { + String result = "SELECT * FROM (" + sql + ") tmptable WHERE "; + if (lowerBound != null) { + result += partitionColumn + " >= " + lowerBound; + } + if (upperBound != null) { + if (lowerBound != null) { + result += " AND "; + } + result += partitionColumn + " < " + upperBound; + } + return result; + } protected void cleanupResources(Connection conn, PreparedStatement ps, ResultSet rs) { try { @@ -344,4 +364,52 @@ protected Properties getDefaultDBCPProperties() { protected int getFetchSize(Configuration conf) { return conf.getInt(JdbcStorageConfig.JDBC_FETCH_SIZE.getPropertyName(), DEFAULT_FETCH_SIZE); } + + @Override + public String getLowerBound(Configuration conf, String partitionColumn) throws + HiveJdbcDatabaseAccessException { + return getBound(conf, partitionColumn, "MIN"); + } + + @Override + public String getUpperBound(Configuration conf, String partitionColumn) throws + HiveJdbcDatabaseAccessException { + return getBound(conf, partitionColumn, "MAX"); + } + + private String getBound(Configuration conf, String partitionColumn, String func) throws + HiveJdbcDatabaseAccessException { + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + + try { + initializeDatabaseConnection(conf); + String sql = JdbcStorageConfigManager.getQueryToExecute(conf); + String countQuery = "SELECT " + func + "(" + partitionColumn + ") FROM (" + sql + ") tmptable WHERE " + + partitionColumn + " IS NOT NULL"; + LOGGER.info("Query to execute is [{}]", countQuery); + + conn = dbcpDataSource.getConnection(); + ps = conn.prepareStatement(countQuery); + rs = ps.executeQuery(); + if (rs.next()) { + return rs.getString(1); + } + else { + LOGGER.warn("The count query did not return any results.", countQuery); + throw new HiveJdbcDatabaseAccessException(func + " query did not return any results."); + } + } + catch (HiveJdbcDatabaseAccessException he) { + throw he; + } + catch (Exception e) { + LOGGER.error("Caught exception while trying to get " + func + " of " + partitionColumn, e); + throw new HiveJdbcDatabaseAccessException(e); + } + finally { + cleanupResources(conn, ps, rs); + } + } } diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java index 86fde7c..e05a595 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java @@ -26,14 +26,22 @@ protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { return addLimitToQuery(sql, limit); } else { - return sql + " LIMIT " + offset + "," + limit; + if (limit != -1) { + return sql + " LIMIT " + offset + "," + limit; + } else { + return sql; + } } } @Override protected String addLimitToQuery(String sql, int limit) { - return sql + " LIMIT " + limit; + if (limit != -1) { + return sql + " LIMIT " + limit; + } else { + return sql; + } } } diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DateSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DateSplitter.java new file mode 100644 index 0000000..2b06513 --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DateSplitter.java @@ -0,0 +1,51 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.spitter; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.sql.Date; +import java.util.ArrayList; +import java.util.List; + +public class DateSplitter implements Splitter { + @Override + public List> getRanges(String lowerBound, String upperBound, int numPartitions, TypeInfo + typeInfo) { + List> ranges = new ArrayList>(); + Date dateLower = Date.valueOf(lowerBound); + Date dateUpper = Date.valueOf(upperBound); + double dateInterval = (dateUpper.getTime() - dateLower.getTime())/(double)numPartitions; + Date splitDateLower, splitDateUpper; + for (int i=0;i(splitLower, splitUpper)); + } + } + return ranges; + } +} diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DecimalSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DecimalSplitter.java new file mode 100644 index 0000000..bab748c --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DecimalSplitter.java @@ -0,0 +1,60 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.spitter; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.math.BigDecimal; +import java.math.MathContext; +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.List; + +public class DecimalSplitter implements Splitter { + @Override + public List> getRanges(String lowerBound, String upperBound, int numPartitions, TypeInfo + typeInfo) { + List> ranges = new ArrayList>(); + DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo)typeInfo; + int precision = decimalTypeInfo.getPrecision(); + int scale = decimalTypeInfo.getScale(); + BigDecimal decimalLower = new BigDecimal(lowerBound); + BigDecimal decimalUpper = new BigDecimal(upperBound); + BigDecimal decimalInterval = (decimalUpper.subtract(decimalLower)).divide(new BigDecimal(numPartitions), + MathContext.DECIMAL64); + BigDecimal splitDecimalLower, splitDecimalUpper; + for (int i=0;i(splitLower, splitUpper)); + } + } + return ranges; + } +} diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DoubleSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DoubleSplitter.java new file mode 100644 index 0000000..97034a0 --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/DoubleSplitter.java @@ -0,0 +1,50 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.spitter; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.util.ArrayList; +import java.util.List; + +public class DoubleSplitter implements Splitter { + @Override + public List> getRanges(String lowerBound, String upperBound, int numPartitions, TypeInfo + typeInfo) { + List> ranges = new ArrayList>(); + double doubleLower = Double.parseDouble(lowerBound); + double doubleUpper = Double.parseDouble(upperBound); + double doubleInterval = (doubleUpper - doubleLower)/(double)numPartitions; + double splitDoubleLower, splitDoubleUpper; + for (int i=0;i splitDoubleLower) { + String splitLower = Double.toString(splitDoubleLower); + String splitUpper = Double.toString(splitDoubleUpper); + if (i == 0) { + splitLower = null; + } + if (i == numPartitions-1) { + splitUpper = null; + } + ranges.add(new ImmutablePair(splitLower, splitUpper)); + } + } + return ranges; + } +} diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/LongSpitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/LongSpitter.java new file mode 100644 index 0000000..bc4b40d --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/LongSpitter.java @@ -0,0 +1,51 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.spitter; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.util.ArrayList; +import java.util.List; + +public class LongSpitter implements Splitter { + + @Override + public List> getRanges(String lowerBound, String upperBound, int numPartitions, TypeInfo + typeInfo) { + List> ranges = new ArrayList>(); + long longLower = Long.parseLong(lowerBound); + long longUpper = Long.parseLong(upperBound); + double longInterval = (longUpper - longLower) / (double) numPartitions; + long splitLongLower, splitLongUpper; + for (int i = 0; i < numPartitions; i++) { + splitLongLower = Math.round(longLower + longInterval * i); + splitLongUpper = Math.round(longLower + longInterval * (i + 1)); + if (splitLongUpper > splitLongLower) { + String splitLower = Long.toString(splitLongLower); + String splitUpper = Long.toString(splitLongUpper); + if (i == 0) { + splitLower = null; + } + if (i == numPartitions - 1) { + splitUpper = null; + } + ranges.add(new ImmutablePair(splitLower, splitUpper)); + } + } + return ranges; + } +} diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/Splitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/Splitter.java new file mode 100644 index 0000000..1c8e6a4 --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/Splitter.java @@ -0,0 +1,24 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.spitter; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.util.List; + +public interface Splitter { + List> getRanges(String lowerBound, String upperBound, int numPartitions, TypeInfo typeInfo); +} \ No newline at end of file diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/SplitterFactory.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/SplitterFactory.java new file mode 100644 index 0000000..c04f6e2 --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/SplitterFactory.java @@ -0,0 +1,45 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.spitter; + +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.io.IOException; + +public class SplitterFactory { + public static Splitter newSpitter(TypeInfo typeInfo) throws IOException { + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo; + switch (primitiveTypeInfo.getPrimitiveCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + return new LongSpitter(); + case FLOAT: + case DOUBLE: + return new DoubleSplitter(); + case DECIMAL: + return new DecimalSplitter(); + case TIMESTAMP: + return new TimestampSplitter(); + case DATE: + return new DateSplitter(); + default: + throw new IOException("partitionColumn is " + primitiveTypeInfo.getPrimitiveCategory() + + ", only numeric/date/timestamp type can be a partition column"); + } + } +} diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/TimestampSplitter.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/TimestampSplitter.java new file mode 100644 index 0000000..e0ca52d --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/spitter/TimestampSplitter.java @@ -0,0 +1,52 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.spitter; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; + +public class TimestampSplitter implements Splitter { + @Override + public List> getRanges(String lowerBound, String upperBound, int numPartitions, TypeInfo + typeInfo) { + List> ranges = new ArrayList>(); + Timestamp timestampLower = Timestamp.valueOf(lowerBound); + Timestamp timestampUpper = Timestamp.valueOf(upperBound); + // Note nano is not fully represented as the precision limit + double timestampInterval = (timestampUpper.getTime() - timestampLower.getTime())/(double)numPartitions; + Timestamp splitTimestampLower, splitTimestampUpper; + for (int i=0;i(splitLower, splitUpper)); + } + } + return ranges; + } +} diff --git a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java index b146633..92302be 100644 --- a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java +++ b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/TestJdbcInputFormat.java @@ -14,7 +14,9 @@ */ package org.apache.hive.storage.jdbc; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hive.storage.jdbc.dao.DatabaseAccessor; @@ -33,6 +35,8 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Mockito.when; @@ -45,7 +49,7 @@ @Test - public void testSplitLogic_noSpillOver() throws HiveJdbcDatabaseAccessException, IOException { + public void testLimitSplit_noSpillOver() throws HiveJdbcDatabaseAccessException, IOException { PowerMockito.mockStatic(DatabaseAccessorFactory.class); BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor); JdbcInputFormat f = new JdbcInputFormat(); @@ -53,7 +57,8 @@ public void testSplitLogic_noSpillOver() throws HiveJdbcDatabaseAccessException, JobConf conf = new JobConf(); conf.set("mapred.input.dir", "/temp"); - InputSplit[] splits = f.getSplits(conf, 3); + conf.set("hive.sql.numPartitions", "3"); + InputSplit[] splits = f.getSplits(conf, -1); assertThat(splits, is(notNullValue())); assertThat(splits.length, is(3)); @@ -63,7 +68,7 @@ public void testSplitLogic_noSpillOver() throws HiveJdbcDatabaseAccessException, @Test - public void testSplitLogic_withSpillOver() throws HiveJdbcDatabaseAccessException, IOException { + public void testLimitSplit_withSpillOver() throws HiveJdbcDatabaseAccessException, IOException { PowerMockito.mockStatic(DatabaseAccessorFactory.class); BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor); JdbcInputFormat f = new JdbcInputFormat(); @@ -71,7 +76,8 @@ public void testSplitLogic_withSpillOver() throws HiveJdbcDatabaseAccessExceptio JobConf conf = new JobConf(); conf.set("mapred.input.dir", "/temp"); - InputSplit[] splits = f.getSplits(conf, 6); + conf.set("hive.sql.numPartitions", "6"); + InputSplit[] splits = f.getSplits(conf, -1); assertThat(splits, is(notNullValue())); assertThat(splits.length, is(6)); @@ -84,4 +90,114 @@ public void testSplitLogic_withSpillOver() throws HiveJdbcDatabaseAccessExceptio assertThat(splits[i].getLength(), is(2L)); } } + + @Test + public void testRangeSplit_Numeric() throws HiveJdbcDatabaseAccessException, IOException { + PowerMockito.mockStatic(DatabaseAccessorFactory.class); + BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor); + JdbcInputFormat f = new JdbcInputFormat(); + when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a")); + + JobConf conf = new JobConf(); + conf.set("mapred.input.dir", "/temp"); + conf.set(serdeConstants.LIST_COLUMN_TYPES, "int"); + conf.set("hive.sql.partitionColumn", "a"); + conf.set("hive.sql.numPartitions", "3"); + conf.set("hive.sql.lowerBound", "1"); + conf.set("hive.sql.upperBound", "10"); + InputSplit[] splits = f.getSplits(conf, -1); + + assertThat(splits, is(notNullValue())); + assertThat(splits.length, is(3)); + + assertNull(((JdbcInputSplit)splits[0]).getLowerBound()); + assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "4"); + assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "4"); + assertEquals(((JdbcInputSplit)splits[1]).getUpperBound(), "7"); + assertEquals(((JdbcInputSplit)splits[2]).getLowerBound(), "7"); + assertNull(((JdbcInputSplit)splits[2]).getUpperBound()); + } + + @Test + public void testRangeSplit_Decimal() throws HiveJdbcDatabaseAccessException, IOException { + PowerMockito.mockStatic(DatabaseAccessorFactory.class); + BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor); + JdbcInputFormat f = new JdbcInputFormat(); + when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a")); + + JobConf conf = new JobConf(); + conf.set("mapred.input.dir", "/temp"); + conf.set(serdeConstants.LIST_COLUMN_TYPES, "decimal(10,5)"); + conf.set("hive.sql.partitionColumn", "a"); + conf.set("hive.sql.numPartitions", "4"); + conf.set("hive.sql.lowerBound", "5"); + conf.set("hive.sql.upperBound", "1000"); + InputSplit[] splits = f.getSplits(conf, -1); + + assertThat(splits, is(notNullValue())); + assertThat(splits.length, is(4)); + + assertNull(((JdbcInputSplit)splits[0]).getLowerBound()); + assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "253.75000"); + assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "253.75000"); + assertEquals(((JdbcInputSplit)splits[1]).getUpperBound(), "502.50000"); + assertEquals(((JdbcInputSplit)splits[2]).getLowerBound(), "502.50000"); + assertEquals(((JdbcInputSplit)splits[2]).getUpperBound(), "751.25000"); + assertEquals(((JdbcInputSplit)splits[3]).getLowerBound(), "751.25000"); + assertNull(((JdbcInputSplit)splits[3]).getUpperBound()); + } + + @Test + public void testRangeSplit_Timestamp() throws HiveJdbcDatabaseAccessException, IOException { + PowerMockito.mockStatic(DatabaseAccessorFactory.class); + BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor); + JdbcInputFormat f = new JdbcInputFormat(); + when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a")); + when(mockDatabaseAccessor.getLowerBound(any(Configuration.class), any(String.class))).thenReturn(("2010-01-01 " + + "00:00:00.000000000")); + when(mockDatabaseAccessor.getUpperBound(any(Configuration.class), any(String.class))).thenReturn(("2018-01-01 " + + "12:00:00.000000000")); + + JobConf conf = new JobConf(); + conf.set("mapred.input.dir", "/temp"); + conf.set(serdeConstants.LIST_COLUMN_TYPES, "timestamp"); + conf.set("hive.sql.partitionColumn", "a"); + conf.set("hive.sql.numPartitions", "2"); + InputSplit[] splits = f.getSplits(conf, -1); + + assertThat(splits, is(notNullValue())); + assertThat(splits.length, is(2)); + + assertNull(((JdbcInputSplit)splits[0]).getLowerBound()); + assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "2014-01-01 06:00:00.0"); + assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "2014-01-01 06:00:00.0"); + assertNull(((JdbcInputSplit)splits[1]).getUpperBound()); + } + + @Test + public void testRangeSplit_Date() throws HiveJdbcDatabaseAccessException, IOException { + PowerMockito.mockStatic(DatabaseAccessorFactory.class); + BDDMockito.given(DatabaseAccessorFactory.getAccessor(any(Configuration.class))).willReturn(mockDatabaseAccessor); + JdbcInputFormat f = new JdbcInputFormat(); + when(mockDatabaseAccessor.getColumnNames(any(Configuration.class))).thenReturn(Lists.newArrayList("a")); + when(mockDatabaseAccessor.getLowerBound(any(Configuration.class), any(String.class))).thenReturn(("2010-01-01")); + when(mockDatabaseAccessor.getUpperBound(any(Configuration.class), any(String.class))).thenReturn(("2018-01-01")); + + JobConf conf = new JobConf(); + conf.set("mapred.input.dir", "/temp"); + conf.set(serdeConstants.LIST_COLUMN_TYPES, "date"); + conf.set("hive.sql.partitionColumn", "a"); + conf.set("hive.sql.numPartitions", "3"); + InputSplit[] splits = f.getSplits(conf, -1); + + assertThat(splits, is(notNullValue())); + assertThat(splits.length, is(3)); + + assertNull(((JdbcInputSplit)splits[0]).getLowerBound()); + assertEquals(((JdbcInputSplit)splits[0]).getUpperBound(), "2012-09-01"); + assertEquals(((JdbcInputSplit)splits[1]).getLowerBound(), "2012-09-01"); + assertEquals(((JdbcInputSplit)splits[1]).getUpperBound(), "2015-05-03"); + assertEquals(((JdbcInputSplit)splits[2]).getLowerBound(), "2015-05-03"); + assertNull(((JdbcInputSplit)splits[2]).getUpperBound()); + } } diff --git a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java index 34f061e..ba43478 100644 --- a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java +++ b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/dao/TestGenericJdbcDatabaseAccessor.java @@ -111,7 +111,7 @@ public void testGetTotalNumberOfRecords_invalidQuery() throws HiveJdbcDatabaseAc public void testGetRecordIterator() throws HiveJdbcDatabaseAccessException { Configuration conf = buildConfiguration(); DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf); - JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 2, 0); + JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null,2, 0); assertThat(iterator, is(notNullValue())); @@ -134,7 +134,7 @@ public void testGetRecordIterator() throws HiveJdbcDatabaseAccessException { public void testGetRecordIterator_offsets() throws HiveJdbcDatabaseAccessException { Configuration conf = buildConfiguration(); DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf); - JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 2, 2); + JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null, 2, 2); assertThat(iterator, is(notNullValue())); @@ -158,7 +158,7 @@ public void testGetRecordIterator_emptyResultSet() throws HiveJdbcDatabaseAccess Configuration conf = buildConfiguration(); conf.set(JdbcStorageConfig.QUERY.getPropertyName(), "select * from test_strategy where strategy_id = '25'"); DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf); - JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 0, 2); + JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null, 0, 2); assertThat(iterator, is(notNullValue())); assertThat(iterator.hasNext(), is(false)); @@ -170,7 +170,7 @@ public void testGetRecordIterator_emptyResultSet() throws HiveJdbcDatabaseAccess public void testGetRecordIterator_largeOffset() throws HiveJdbcDatabaseAccessException { Configuration conf = buildConfiguration(); DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf); - JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 10, 25); + JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null, 10, 25); assertThat(iterator, is(notNullValue())); assertThat(iterator.hasNext(), is(false)); @@ -184,7 +184,7 @@ public void testGetRecordIterator_invalidQuery() throws HiveJdbcDatabaseAccessEx conf.set(JdbcStorageConfig.QUERY.getPropertyName(), "select * from strategyx"); DatabaseAccessor accessor = DatabaseAccessorFactory.getAccessor(conf); @SuppressWarnings("unused") - JdbcRecordIterator iterator = accessor.getRecordIterator(conf, 0, 2); + JdbcRecordIterator iterator = accessor.getRecordIterator(conf, null, null, null, 0, 2); }