diff --git itests/qtest/pom.xml itests/qtest/pom.xml
index 1b49e88..1c3b601 100644
--- itests/qtest/pom.xml
+++ itests/qtest/pom.xml
@@ -119,6 +119,13 @@
tests
test
+
+ org.apache.hive
+ hive-jdbc-handler
+ ${project.version}
+ test
+
+
diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties
index d344464..fca8093 100644
--- itests/src/test/resources/testconfiguration.properties
+++ itests/src/test/resources/testconfiguration.properties
@@ -410,6 +410,7 @@ minillap.query.files=acid_bucket_pruning.q,\
intersect_all.q,\
intersect_distinct.q,\
intersect_merge.q,\
+ jdbc_handler.q,\
llap_udf.q,\
llapdecider.q,\
reduce_deduplicate.q,\
diff --git jdbc-handler/pom.xml jdbc-handler/pom.xml
new file mode 100644
index 0000000..364886a
--- /dev/null
+++ jdbc-handler/pom.xml
@@ -0,0 +1,127 @@
+
+
+
+ 4.0.0
+
+ org.apache.hive
+ hive
+ 2.2.0-SNAPSHOT
+ ../pom.xml
+
+
+ hive-jdbc-handler
+ jar
+ Hive JDBC Handler
+
+
+ ..
+
+
+
+
+ org.apache.hive
+ hive-common
+ ${project.version}
+
+
+ org.eclipse.jetty.aggregate
+ jetty-all
+
+
+
+
+
+ org.apache.hive
+ hive-shims
+ ${project.version}
+
+
+
+ org.apache.hive
+ hive-exec
+ ${project.version}
+
+
+
+ org.apache.hive
+ hive-serde
+ ${project.version}
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ ${hadoop.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+ ${hadoop.version}
+ true
+ test
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commmons-logging
+ commons-logging
+
+
+
+
+
+ org.hamcrest
+ hamcrest-all
+ ${hamcrest.version}
+ test
+
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+
+ org.mockito
+ mockito-all
+ ${mockito-all.version}
+ test
+
+
+
+ org.apache.hive
+ hive-common
+ ${project.version}
+ test
+ test-jar
+
+
+
+ com.h2database
+ h2
+ ${h2database.version}
+ test
+
+
+
+
+
diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java
new file mode 100644
index 0000000..bfa7a26
--- /dev/null
+++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory;
+
+import java.io.IOException;
+
+public class JdbcInputFormat extends HiveInputFormat {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JdbcInputFormat.class);
+ private DatabaseAccessor dbAccessor = null;
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public RecordReader
+ getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+
+ if (!(split instanceof JdbcInputSplit)) {
+ throw new RuntimeException("Incompatible split type " + split.getClass().getName() + ".");
+ }
+
+ return new JdbcRecordReader(job, (JdbcInputSplit) split);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ try {
+ if (numSplits <= 0) {
+ numSplits = 1;
+ }
+ LOGGER.debug("Creating {} input splits", numSplits);
+ if (dbAccessor == null) {
+ dbAccessor = DatabaseAccessorFactory.getAccessor(job);
+ }
+
+ int numRecords = dbAccessor.getTotalNumberOfRecords(job);
+ int numRecordsPerSplit = numRecords / numSplits;
+ int numSplitsWithExtraRecords = numRecords % numSplits;
+
+ LOGGER.debug("Num records = {}", numRecords);
+ InputSplit[] splits = new InputSplit[numSplits];
+ Path[] tablePaths = FileInputFormat.getInputPaths(job);
+
+ int offset = 0;
+ for (int i = 0; i < numSplits; i++) {
+ int numRecordsInThisSplit = numRecordsPerSplit;
+ if (i < numSplitsWithExtraRecords) {
+ numRecordsInThisSplit++;
+ }
+
+ splits[i] = new JdbcInputSplit(numRecordsInThisSplit, offset, tablePaths[0]);
+ offset += numRecordsInThisSplit;
+ }
+
+ return splits;
+ }
+ catch (Exception e) {
+ LOGGER.error("Error while splitting input data.", e);
+ throw new IOException(e);
+ }
+ }
+
+
+ /**
+ * For testing purposes only
+ *
+ * @param dbAccessor
+ * DatabaseAccessor object
+ */
+ public void setDbAccessor(DatabaseAccessor dbAccessor) {
+ this.dbAccessor = dbAccessor;
+ }
+
+}
diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java
new file mode 100644
index 0000000..a691cc2
--- /dev/null
+++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class JdbcInputSplit extends FileSplit implements InputSplit {
+
+ private static final String[] EMPTY_ARRAY = new String[] {};
+
+ private int limit = 0;
+ private int offset = 0;
+
+
+ public JdbcInputSplit() {
+ super((Path) null, 0, 0, EMPTY_ARRAY);
+
+ }
+
+
+ public JdbcInputSplit(long start, long end, Path dummyPath) {
+ super(dummyPath, 0, 0, EMPTY_ARRAY);
+ this.setLimit((int) start);
+ this.setOffset((int) end);
+ }
+
+
+ public JdbcInputSplit(int limit, int offset) {
+ super((Path) null, 0, 0, EMPTY_ARRAY);
+ this.limit = limit;
+ this.offset = offset;
+ }
+
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeInt(limit);
+ out.writeInt(offset);
+ }
+
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ limit = in.readInt();
+ offset = in.readInt();
+ }
+
+
+ @Override
+ public long getLength() {
+ return limit;
+ }
+
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return EMPTY_ARRAY;
+ }
+
+
+ public int getLimit() {
+ return limit;
+ }
+
+
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+
+
+ public int getOffset() {
+ return offset;
+ }
+
+
+ public void setOffset(int offset) {
+ this.offset = offset;
+ }
+
+}
diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcOutputFormat.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcOutputFormat.java
new file mode 100644
index 0000000..26fb3cd
--- /dev/null
+++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcOutputFormat.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class JdbcOutputFormat implements OutputFormat,
+ HiveOutputFormat {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public RecordWriter getHiveRecordWriter(JobConf jc,
+ Path finalOutPath,
+ Class extends Writable> valueClass,
+ boolean isCompressed,
+ Properties tableProperties,
+ Progressable progress) throws IOException {
+ throw new UnsupportedOperationException("Write operations are not allowed.");
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public org.apache.hadoop.mapred.RecordWriter getRecordWriter(FileSystem ignored,
+ JobConf job,
+ String name,
+ Progressable progress) throws IOException {
+ throw new UnsupportedOperationException("Write operations are not allowed.");
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+ // do nothing
+ }
+
+}
diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java
new file mode 100644
index 0000000..0a24bd9
--- /dev/null
+++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory;
+import org.apache.hive.storage.jdbc.dao.JdbcRecordIterator;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class JdbcRecordReader implements RecordReader {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JdbcRecordReader.class);
+ private DatabaseAccessor dbAccessor = null;
+ private JdbcRecordIterator iterator = null;
+ private JdbcInputSplit split = null;
+ private JobConf conf = null;
+ private int pos = 0;
+
+
+ public JdbcRecordReader(JobConf conf, JdbcInputSplit split) {
+ LOGGER.debug("Initializing JdbcRecordReader");
+ this.split = split;
+ this.conf = conf;
+ }
+
+
+ @Override
+ public boolean next(LongWritable key, MapWritable value) throws IOException {
+ try {
+ LOGGER.debug("JdbcRecordReader.next called");
+ if (dbAccessor == null) {
+ dbAccessor = DatabaseAccessorFactory.getAccessor(conf);
+ iterator = dbAccessor.getRecordIterator(conf, split.getLimit(), split.getOffset());
+ }
+
+ if (iterator.hasNext()) {
+ LOGGER.debug("JdbcRecordReader has more records to read.");
+ key.set(pos);
+ pos++;
+ Map record = iterator.next();
+ if ((record != null) && (!record.isEmpty())) {
+ for (Entry entry : record.entrySet()) {
+ value.put(new Text(entry.getKey()), new Text(entry.getValue()));
+ }
+ return true;
+ }
+ else {
+ LOGGER.debug("JdbcRecordReader got null record.");
+ return false;
+ }
+ }
+ else {
+ LOGGER.debug("JdbcRecordReader has no more records to read.");
+ return false;
+ }
+ }
+ catch (Exception e) {
+ LOGGER.error("An error occurred while reading the next record from DB.", e);
+ return false;
+ }
+ }
+
+
+ @Override
+ public LongWritable createKey() {
+ return new LongWritable();
+ }
+
+
+ @Override
+ public MapWritable createValue() {
+ return new MapWritable();
+ }
+
+
+ @Override
+ public long getPos() throws IOException {
+ return pos;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ if (iterator != null) {
+ iterator.close();
+ }
+ }
+
+
+ @Override
+ public float getProgress() throws IOException {
+ if (split == null) {
+ return 0;
+ }
+ else {
+ return split.getLength() > 0 ? pos / (float) split.getLength() : 1.0f;
+ }
+ }
+
+
+ public void setDbAccessor(DatabaseAccessor dbAccessor) {
+ this.dbAccessor = dbAccessor;
+ }
+
+
+ public void setIterator(JdbcRecordIterator iterator) {
+ this.iterator = iterator;
+ }
+
+}
diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java
new file mode 100644
index 0000000..f35c33d
--- /dev/null
+++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig;
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager;
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+public class JdbcSerDe extends AbstractSerDe {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSerDe.class);
+
+ private StructObjectInspector objectInspector;
+ private int numColumns;
+ private String[] hiveColumnTypeArray;
+ private List columnNames;
+ private List row;
+
+
+ /*
+ * This method gets called multiple times by Hive. On some invocations, the properties will be empty.
+ * We need to detect when the properties are not empty to initialise the class variables.
+ *
+ * @see org.apache.hadoop.hive.serde2.Deserializer#initialize(org.apache.hadoop.conf.Configuration, java.util.Properties)
+ */
+ @Override
+ public void initialize(Configuration conf, Properties tbl) throws SerDeException {
+ try {
+ LOGGER.debug("Initializing the SerDe");
+
+ // Hive cdh-4.3 does not provide the properties object on all calls
+ if (tbl.containsKey(JdbcStorageConfig.DATABASE_TYPE.getPropertyName())) {
+ Configuration tableConfig = JdbcStorageConfigManager.convertPropertiesToConfiguration(tbl);
+
+ DatabaseAccessor dbAccessor = DatabaseAccessorFactory.getAccessor(tableConfig);
+ columnNames = dbAccessor.getColumnNames(tableConfig);
+ numColumns = columnNames.size();
+
+ String[] hiveColumnNameArray = parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMNS), ",");
+ if (numColumns != hiveColumnNameArray.length) {
+ throw new SerDeException("Expected " + numColumns + " columns. Table definition has "
+ + hiveColumnNameArray.length + " columns");
+ }
+ List hiveColumnNames = Arrays.asList(hiveColumnNameArray);
+
+ hiveColumnTypeArray = parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES), ":");
+ if (hiveColumnTypeArray.length == 0) {
+ throw new SerDeException("Received an empty Hive column type definition");
+ }
+
+ List fieldInspectors = new ArrayList(numColumns);
+ for (int i = 0; i < numColumns; i++) {
+ fieldInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+ }
+
+ objectInspector =
+ ObjectInspectorFactory.getStandardStructObjectInspector(hiveColumnNames,
+ fieldInspectors);
+ row = new ArrayList(numColumns);
+ }
+ }
+ catch (Exception e) {
+ LOGGER.error("Caught exception while initializing the SqlSerDe", e);
+ throw new SerDeException(e);
+ }
+ }
+
+
+ private String[] parseProperty(String propertyValue, String delimiter) {
+ if ((propertyValue == null) || (propertyValue.trim().isEmpty())) {
+ return new String[] {};
+ }
+
+ return propertyValue.split(delimiter);
+ }
+
+
+ @Override
+ public Object deserialize(Writable blob) throws SerDeException {
+ LOGGER.debug("Deserializing from SerDe");
+ if (!(blob instanceof MapWritable)) {
+ throw new SerDeException("Expected MapWritable. Got " + blob.getClass().getName());
+ }
+
+ if ((row == null) || (columnNames == null)) {
+ throw new SerDeException("JDBC SerDe hasn't been initialized properly");
+ }
+
+ row.clear();
+ MapWritable input = (MapWritable) blob;
+ Text columnKey = new Text();
+
+ for (int i = 0; i < numColumns; i++) {
+ columnKey.set(columnNames.get(i));
+ Writable value = input.get(columnKey);
+ if (value == null) {
+ row.add(null);
+ }
+ else {
+ row.add(value.toString());
+ }
+ }
+
+ return row;
+ }
+
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return objectInspector;
+ }
+
+
+ @Override
+ public Class extends Writable> getSerializedClass() {
+ return MapWritable.class;
+ }
+
+
+ @Override
+ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+ throw new UnsupportedOperationException("Writes are not allowed");
+ }
+
+
+ @Override
+ public SerDeStats getSerDeStats() {
+ return null;
+ }
+
+}
diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java
new file mode 100644
index 0000000..946ee0c
--- /dev/null
+++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class JdbcStorageHandler implements HiveStorageHandler {
+
+ private Configuration conf;
+
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Class extends InputFormat> getInputFormatClass() {
+ return JdbcInputFormat.class;
+ }
+
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Class extends OutputFormat> getOutputFormatClass() {
+ return JdbcOutputFormat.class;
+ }
+
+
+ @Override
+ public Class extends AbstractSerDe> getSerDeClass() {
+ return JdbcSerDe.class;
+ }
+
+
+ @Override
+ public HiveMetaHook getMetaHook() {
+ return null;
+ }
+
+
+ @Override
+ public void configureTableJobProperties(TableDesc tableDesc, Map jobProperties) {
+ Properties properties = tableDesc.getProperties();
+ JdbcStorageConfigManager.copyConfigurationToJob(properties, jobProperties);
+ }
+
+
+ @Override
+ public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) {
+ Properties properties = tableDesc.getProperties();
+ JdbcStorageConfigManager.copyConfigurationToJob(properties, jobProperties);
+ }
+
+
+ @Override
+ public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) {
+ // Nothing to do here...
+ }
+
+
+ @Override
+ public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException {
+ return null;
+ }
+
+ @Override
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+
+ }
+
+}
diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/QueryConditionBuilder.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/QueryConditionBuilder.java
new file mode 100644
index 0000000..194fad8
--- /dev/null
+++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/QueryConditionBuilder.java
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig;
+
+import java.beans.XMLDecoder;
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Translates the hive query condition into a condition that can be run on the underlying database
+ */
+public class QueryConditionBuilder {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(QueryConditionBuilder.class);
+ private static final String EMPTY_STRING = "";
+ private static QueryConditionBuilder instance = null;
+
+
+ public static QueryConditionBuilder getInstance() {
+ if (instance == null) {
+ instance = new QueryConditionBuilder();
+ }
+
+ return instance;
+ }
+
+
+ private QueryConditionBuilder() {
+
+ }
+
+
+ public String buildCondition(Configuration conf) {
+ if (conf == null) {
+ return EMPTY_STRING;
+ }
+
+ String filterXml = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+ String hiveColumns = conf.get(serdeConstants.LIST_COLUMNS);
+ String columnMapping = conf.get(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName());
+
+ if ((filterXml == null) || ((columnMapping == null) && (hiveColumns == null))) {
+ return EMPTY_STRING;
+ }
+
+ if (hiveColumns == null) {
+ hiveColumns = "";
+ }
+
+ Map columnMap = buildColumnMapping(columnMapping, hiveColumns);
+ String condition = createConditionString(filterXml, columnMap);
+ return condition;
+ }
+
+
+ /*
+ * Build a Hive-to-X column mapping,
+ *
+ */
+ private Map buildColumnMapping(String columnMapping, String hiveColumns) {
+ if ((columnMapping == null) || (columnMapping.trim().isEmpty())) {
+ return createIdentityMap(hiveColumns);
+ }
+
+ Map columnMap = new HashMap();
+ String[] mappingPairs = columnMapping.toLowerCase().split(",");
+ for (String mapPair : mappingPairs) {
+ String[] columns = mapPair.split("=");
+ columnMap.put(columns[0].trim(), columns[1].trim());
+ }
+
+ return columnMap;
+ }
+
+
+ /*
+ * When no mapping is defined, it is assumed that the hive column names are equivalent to the column names in the
+ * underlying table
+ */
+ private Map createIdentityMap(String hiveColumns) {
+ Map columnMap = new HashMap();
+ String[] columns = hiveColumns.toLowerCase().split(",");
+
+ for (String col : columns) {
+ columnMap.put(col.trim(), col.trim());
+ }
+
+ return columnMap;
+ }
+
+
+ /*
+ * Walk to Hive AST and translate the hive column names to their equivalent mappings. This is basically a cheat.
+ *
+ */
+ private String createConditionString(String filterXml, Map columnMap) {
+ if ((filterXml == null) || (filterXml.trim().isEmpty())) {
+ return EMPTY_STRING;
+ }
+
+ try (XMLDecoder decoder = new XMLDecoder(new ByteArrayInputStream(filterXml.getBytes("UTF-8")))) {
+ Object object = decoder.readObject();
+ if (!(object instanceof ExprNodeDesc)) {
+ LOGGER.error("Deserialized filter expression is not of the expected type");
+ throw new RuntimeException("Deserialized filter expression is not of the expected type");
+ }
+
+ ExprNodeDesc conditionNode = (ExprNodeDesc) object;
+ walkTreeAndTranslateColumnNames(conditionNode, columnMap);
+ return conditionNode.getExprString();
+ }
+ catch (Exception e) {
+ LOGGER.error("Error during condition build", e);
+ return EMPTY_STRING;
+ }
+ }
+
+
+ /*
+ * Translate column names by walking the AST
+ */
+ private void walkTreeAndTranslateColumnNames(ExprNodeDesc node, Map columnMap) {
+ if (node == null) {
+ return;
+ }
+
+ if (node instanceof ExprNodeColumnDesc) {
+ ExprNodeColumnDesc column = (ExprNodeColumnDesc) node;
+ String hiveColumnName = column.getColumn().toLowerCase();
+ if (columnMap.containsKey(hiveColumnName)) {
+ String dbColumnName = columnMap.get(hiveColumnName);
+ String finalName = formatColumnName(dbColumnName);
+ column.setColumn(finalName);
+ }
+ }
+ else {
+ if (node.getChildren() != null) {
+ for (ExprNodeDesc childNode : node.getChildren()) {
+ walkTreeAndTranslateColumnNames(childNode, columnMap);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * This is an ugly hack for handling date column types because Hive doesn't have a built-in type for dates
+ */
+ private String formatColumnName(String dbColumnName) {
+ if (dbColumnName.contains(":")) {
+ String[] typeSplit = dbColumnName.split(":");
+
+ if (typeSplit[1].equalsIgnoreCase("date")) {
+ return "{d " + typeSplit[0] + "}";
+ }
+
+ return typeSplit[0];
+ }
+ else {
+ return dbColumnName;
+ }
+ }
+}
diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManager.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManager.java
new file mode 100644
index 0000000..7a787d4
--- /dev/null
+++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManager.java
@@ -0,0 +1,23 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.conf;
+
+import java.util.Properties;
+
+public interface CustomConfigManager {
+
+ void checkRequiredProperties(Properties properties);
+
+}
diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManagerFactory.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManagerFactory.java
new file mode 100644
index 0000000..eed0dff
--- /dev/null
+++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManagerFactory.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.conf;
+
+import java.util.Properties;
+
+/**
+ * Factory for creating custom config managers based on the database type
+ */
+public class CustomConfigManagerFactory {
+
+ private static CustomConfigManager nopConfigManager = new NopCustomConfigManager();
+
+
+ private CustomConfigManagerFactory() {
+ }
+
+
+ public static CustomConfigManager getCustomConfigManagerFor(DatabaseType databaseType) {
+ switch (databaseType) {
+ case MYSQL:
+ return nopConfigManager;
+
+ default:
+ return nopConfigManager;
+ }
+ }
+
+ private static class NopCustomConfigManager implements CustomConfigManager {
+
+ @Override
+ public void checkRequiredProperties(Properties properties) {
+ return;
+ }
+
+ }
+
+}
diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java
new file mode 100644
index 0000000..a2bdbe4
--- /dev/null
+++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java
@@ -0,0 +1,21 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.conf;
+
+public enum DatabaseType {
+ MYSQL,
+ H2,
+ DERBY
+}
diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java
new file mode 100644
index 0000000..ff6357d
--- /dev/null
+++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.conf;
+
+public enum JdbcStorageConfig {
+ DATABASE_TYPE("database.type", true),
+ JDBC_URL("jdbc.url", true),
+ JDBC_DRIVER_CLASS("jdbc.driver", true),
+ QUERY("query", true),
+ JDBC_FETCH_SIZE("jdbc.fetch.size", false),
+ COLUMN_MAPPING("column.mapping", false);
+
+ private String propertyName;
+ private boolean required = false;
+
+
+ JdbcStorageConfig(String propertyName, boolean required) {
+ this.propertyName = propertyName;
+ this.required = required;
+ }
+
+
+ JdbcStorageConfig(String propertyName) {
+ this.propertyName = propertyName;
+ }
+
+
+ public String getPropertyName() {
+ return JdbcStorageConfigManager.CONFIG_PREFIX + "." + propertyName;
+ }
+
+
+ public boolean isRequired() {
+ return required;
+ }
+
+}
diff --git jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
new file mode 100644
index 0000000..5267cda
--- /dev/null
+++ jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hive.storage.jdbc.QueryConditionBuilder;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+/**
+ * Main configuration handler class
+ */
+public class JdbcStorageConfigManager {
+
+ public static final String CONFIG_PREFIX = "hive.sql";
+ private static final EnumSet DEFAULT_REQUIRED_PROPERTIES =
+ EnumSet.of(JdbcStorageConfig.DATABASE_TYPE,
+ JdbcStorageConfig.JDBC_URL,
+ JdbcStorageConfig.JDBC_DRIVER_CLASS,
+ JdbcStorageConfig.QUERY);
+
+
+ private JdbcStorageConfigManager() {
+ }
+
+
+ public static void copyConfigurationToJob(Properties props, Map jobProps) {
+ checkRequiredPropertiesAreDefined(props);
+ for (Entry