diff --git core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
index cf9bba2..e70a0a1 100644
--- core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
+++ core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
@@ -37,7 +37,6 @@ import org.apache.hcatalog.data.transfer.ReadEntity;
import org.apache.hcatalog.data.transfer.ReaderContext;
import org.apache.hcatalog.data.transfer.state.StateProvider;
import org.apache.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.shims.HCatHadoopShims;
/**
@@ -63,9 +62,8 @@ public class HCatInputFormatReader extends HCatReader {
try {
Job job = new Job(conf);
- InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(),
- re.getTableName(), re.getFilterString());
- HCatInputFormat.setInput(job, jobInfo);
+ HCatInputFormat.setInput(job.getConfiguration(),
+ re.getDbName(), re.getTableName(), re.getFilterString(), null);
HCatInputFormat hcif = new HCatInputFormat();
ReaderContext cntxt = new ReaderContext();
cntxt.setInputSplits(hcif.getSplits(
diff --git core/src/main/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java core/src/main/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
index 7a7446f..1d64e41 100644
--- core/src/main/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
+++ core/src/main/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
@@ -19,32 +19,45 @@
package org.apache.hcatalog.mapreduce;
import java.io.IOException;
+import java.util.Properties;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
-/** The InputFormat to use to read data from HCatalog. */
+import javax.annotation.Nullable;
+
+/**
+ * The InputFormat to use to read data from HCatalog.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
public class HCatInputFormat extends HCatBaseInputFormat {
/**
- * @see org.apache.hcatalog.mapreduce.HCatInputFormat#setInput(org.apache.hadoop.conf.Configuration, InputJobInfo)
+ * @deprecated As of release 0.5, replaced by
+ * {@link #setInput(Configuration, String, String, String, Properties)}
+ * Will be removed in a future release.
*/
- public static void setInput(Job job,
- InputJobInfo inputJobInfo) throws IOException {
- setInput(job.getConfiguration(), inputJobInfo);
+ @Deprecated
+ public static void setInput(Job job, InputJobInfo inputJobInfo) throws IOException {
+ setInput(job.getConfiguration(),
+ inputJobInfo.getDatabaseName(),
+ inputJobInfo.getTableName(),
+ inputJobInfo.getFilter(),
+ inputJobInfo.getProperties());
}
/**
- * Set the input information to use for the job. This queries the metadata server
- * with the specified partition predicates, gets the matching partitions, and
- * puts the information in the conf object. The inputInfo object is updated
- * with information needed in the client context.
- * @param conf the job Configuration object
- * @param inputJobInfo the input information about the table to read
- * @throws IOException the exception in communicating with the metadata server
+ * @deprecated As of release 0.5, replaced by
+ * {@link #setInput(Configuration, String, String, String, Properties)}.
+ * Will be removed in a future release.
*/
- public static void setInput(Configuration conf,
- InputJobInfo inputJobInfo) throws IOException {
+ @Deprecated
+ public static void setInput(Configuration conf, InputJobInfo inputJobInfo)
+ throws IOException {
try {
InitializeInput.setInput(conf, inputJobInfo);
} catch (Exception e) {
@@ -52,5 +65,29 @@ public class HCatInputFormat extends HCatBaseInputFormat {
}
}
+ /**
+ * Set inputs to use for the job. This queries the metastore with the given input
+ * specification and serializes matching partitions into the job conf for use by MR tasks.
+ *
+ * @param conf the job configuration
+ * @param dbName database name
+ * @param tableName table name
+ * @param filter filter specification
+ * @param properties properties for the input specification
+ * @throws IOException on all errors
+ */
+ public static void setInput(Configuration conf, String dbName, String tableName,
+ @Nullable String filter, @Nullable Properties properties)
+ throws IOException {
+
+ Preconditions.checkNotNull(conf, "Required argument conf is null");
+ Preconditions.checkNotNull(dbName, "Required argument dbName is null");
+ Preconditions.checkNotNull(tableName, "Required argument tableName is null");
+ try {
+ InitializeInput.setInput(conf, InputJobInfo.create(dbName, tableName, filter, properties));
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
}
diff --git core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java
index e91ddc7..627c4b3 100644
--- core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java
+++ core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
* serialized and written into the JobContext configuration. The inputInfo is also updated with
* info required in the client process context.
*/
-public class InitializeInput {
+class InitializeInput {
private static final Logger LOG = LoggerFactory.getLogger(InitializeInput.class);
@@ -79,8 +79,8 @@ public class InitializeInput {
InputJobInfo inputJobInfo = InputJobInfo.create(
theirInputJobInfo.getDatabaseName(),
theirInputJobInfo.getTableName(),
- theirInputJobInfo.getFilter());
- inputJobInfo.getProperties().putAll(theirInputJobInfo.getProperties());
+ theirInputJobInfo.getFilter(),
+ theirInputJobInfo.getProperties());
conf.set(
HCatConstants.HCAT_KEY_JOB_INFO,
HCatUtil.serialize(getInputJobInfo(conf, inputJobInfo, null)));
diff --git core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
index 8779c5e..2cfbc30 100644
--- core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
+++ core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
@@ -17,6 +17,8 @@
*/
package org.apache.hcatalog.mapreduce;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import java.io.IOException;
@@ -30,17 +32,16 @@ import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
/**
- * Container for metadata read from the metadata server. Users should specify input to
- * their HCatalog MR jobs as follows:
- *
- * HCatInputFormat.setInput(job, InputJobInfo.create(databaseName, tableName, filter));
- *
- * Note: while InputJobInfo is public,
- * HCATALOG-527 discusses
- * removing this class from the public API, by simplifying {@link HCatInputFormat#setInput}
- * to simply take the input specification arguments directly. Use InputJobInfo outside the
- * above context (including serialization) at your own peril!
+ * Container for metadata read from the metadata server.
+ * Prior to release 0.5, InputJobInfo was a key part of the public API, exposed directly
+ * to end-users as an argument to
+ * {@link HCatInputFormat#setInput(org.apache.hadoop.mapreduce.Job, InputJobInfo)}.
+ * Going forward, we plan on treating InputJobInfo as an implementation detail and no longer
+ * expose to end-users. Should you have a need to use InputJobInfo outside HCatalog itself,
+ * please contact the developer mailing list before depending on this class.
*/
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
public class InputJobInfo implements Serializable {
/** The serialization version */
@@ -69,22 +70,22 @@ public class InputJobInfo implements Serializable {
* @param tableName the table name
* @param filter the partition filter
*/
-
public static InputJobInfo create(String databaseName,
String tableName,
- String filter) {
- return new InputJobInfo(databaseName, tableName, filter);
+ String filter,
+ Properties properties) {
+ return new InputJobInfo(databaseName, tableName, filter, properties);
}
-
private InputJobInfo(String databaseName,
String tableName,
- String filter) {
+ String filter,
+ Properties properties) {
this.databaseName = (databaseName == null) ?
MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
this.tableName = tableName;
this.filter = filter;
- this.properties = new Properties();
+ this.properties = properties == null ? new Properties() : properties;
}
/**
diff --git core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
index 74c4888..602ef9a 100644
--- core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
+++ core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
@@ -321,8 +321,7 @@ public abstract class HCatMapReduceTest extends HCatBaseTest {
job.setInputFormatClass(HCatInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
- InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, filter);
- HCatInputFormat.setInput(job, inputJobInfo);
+ HCatInputFormat.setInput(job.getConfiguration(), dbName, tableName, filter, null);
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(Text.class);
@@ -353,10 +352,9 @@ public abstract class HCatMapReduceTest extends HCatBaseTest {
job.setInputFormatClass(HCatInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
- InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, null);
- HCatInputFormat.setInput(job, inputJobInfo);
+ HCatInputFormat.setInput(job.getConfiguration(), dbName, tableName, null, null);
- return HCatInputFormat.getTableSchema(job);
+ return HCatInputFormat.getTableSchema(job.getConfiguration());
}
}
diff --git core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java
index a6b381e..4329475 100644
--- core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java
+++ core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java
@@ -119,7 +119,7 @@ public class TestHCatInputFormat extends HCatBaseTest {
job.setInputFormatClass(HCatInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
- HCatInputFormat.setInput(job, InputJobInfo.create("default", "test_bad_records", null));
+ HCatInputFormat.setInput(job.getConfiguration(), "default", "test_bad_records", null, null);
job.setMapOutputKeyClass(HCatRecord.class);
job.setMapOutputValueClass(HCatRecord.class);
diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java
index 4bdb7c3..cd3ffe7 100644
--- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java
+++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java
@@ -115,8 +115,8 @@ public class HCatLoader extends HCatBaseLoader {
}
} else {
Job clone = new Job(job.getConfiguration());
- HCatInputFormat.setInput(job, InputJobInfo.create(dbName,
- tableName, getPartitionFilterString()));
+ HCatInputFormat.setInput(job.getConfiguration(), dbName, tableName,
+ getPartitionFilterString(), null);
// We will store all the new /changed properties in the job in the
// udf context, so the the HCatInputFormat.setInput method need not
diff --git storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
index 3875185..817e898 100644
--- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
+++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
@@ -63,7 +63,6 @@ import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
import org.apache.hcatalog.hbase.snapshot.Transaction;
import org.apache.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
import org.junit.Test;
@@ -592,9 +591,7 @@ public class TestHBaseBulkOutputFormat extends SkeletonHBaseTest {
job.setJarByClass(this.getClass());
job.setMapperClass(MapReadAbortedTransaction.class);
job.setInputFormatClass(HCatInputFormat.class);
- InputJobInfo inputJobInfo = InputJobInfo.create(databaseName,
- tableName, null);
- HCatInputFormat.setInput(job, inputJobInfo);
+ HCatInputFormat.setInput(job.getConfiguration(), databaseName, tableName, null, null);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(BytesWritable.class);
diff --git storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
index 81a3099..9757ff2 100644
--- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
+++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
@@ -61,7 +61,6 @@ import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
import org.apache.hcatalog.hbase.snapshot.Transaction;
import org.apache.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
import org.junit.Test;
@@ -363,9 +362,7 @@ public class TestHBaseDirectOutputFormat extends SkeletonHBaseTest {
job.setJarByClass(this.getClass());
job.setMapperClass(MapReadAbortedTransaction.class);
job.setInputFormatClass(HCatInputFormat.class);
- InputJobInfo inputJobInfo = InputJobInfo.create(databaseName,
- tableName, null);
- HCatInputFormat.setInput(job, inputJobInfo);
+ HCatInputFormat.setInput(job.getConfiguration(), databaseName, tableName, null, null);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(BytesWritable.class);
diff --git storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
index 78724a1..9ffe193 100644
--- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
+++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
@@ -216,9 +216,7 @@ public class TestHBaseInputFormat extends SkeletonHBaseTest {
MapReadHTable.resetCounters();
job.setInputFormatClass(HCatInputFormat.class);
- InputJobInfo inputJobInfo = InputJobInfo.create(databaseName, tableName,
- null);
- HCatInputFormat.setInput(job, inputJobInfo);
+ HCatInputFormat.setInput(job.getConfiguration(), databaseName, tableName, null, null);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(BytesWritable.class);
@@ -281,10 +279,9 @@ public class TestHBaseInputFormat extends SkeletonHBaseTest {
job.setJarByClass(this.getClass());
job.setMapperClass(MapReadProjHTable.class);
job.setInputFormatClass(HCatInputFormat.class);
- InputJobInfo inputJobInfo = InputJobInfo.create(
- MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null);
HCatInputFormat.setOutputSchema(job, getProjectionSchema());
- HCatInputFormat.setInput(job, inputJobInfo);
+ HCatInputFormat.setInput(job.getConfiguration(), MetaStoreUtils.DEFAULT_DATABASE_NAME,
+ tableName, null, null);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(BytesWritable.class);
@@ -340,12 +337,11 @@ public class TestHBaseInputFormat extends SkeletonHBaseTest {
job.setMapperClass(MapReadProjectionHTable.class);
job.setInputFormat(HBaseInputFormat.class);
- InputJobInfo inputJobInfo = InputJobInfo.create(
- MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null);
//Configure projection schema
job.set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(getProjectionSchema()));
Job newJob = new Job(job);
- HCatInputFormat.setInput(newJob, inputJobInfo);
+ HCatInputFormat.setInput(newJob.getConfiguration(), MetaStoreUtils.DEFAULT_DATABASE_NAME,
+ tableName, null, null);
String inputJobString = newJob.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
InputJobInfo info = (InputJobInfo) HCatUtil.deserialize(inputJobString);
job.set(HCatConstants.HCAT_KEY_JOB_INFO, inputJobString);
@@ -411,9 +407,8 @@ public class TestHBaseInputFormat extends SkeletonHBaseTest {
job.setMapperClass(MapReadHTable.class);
MapReadHTable.resetCounters();
job.setInputFormatClass(HCatInputFormat.class);
- InputJobInfo inputJobInfo = InputJobInfo.create(
- MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null);
- HCatInputFormat.setInput(job, inputJobInfo);
+ HCatInputFormat.setInput(job.getConfiguration(), MetaStoreUtils.DEFAULT_DATABASE_NAME,
+ tableName, null, null);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(BytesWritable.class);
@@ -473,9 +468,8 @@ public class TestHBaseInputFormat extends SkeletonHBaseTest {
job.setJarByClass(this.getClass());
job.setMapperClass(MapReadHTableRunningAbort.class);
job.setInputFormatClass(HCatInputFormat.class);
- InputJobInfo inputJobInfo = InputJobInfo.create(
- MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null);
- HCatInputFormat.setInput(job, inputJobInfo);
+ HCatInputFormat.setInput(job.getConfiguration(), MetaStoreUtils.DEFAULT_DATABASE_NAME,
+ tableName, null, null);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(BytesWritable.class);
diff --git storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
index e07bf46..f39a597 100644
--- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
+++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -37,7 +38,7 @@ import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
-import org.apache.hcatalog.mapreduce.InitializeInput;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.junit.Test;
@@ -87,15 +88,15 @@ public class TestSnapshots extends SkeletonHBaseTest {
cmdResponse = hcatDriver.run(tableQuery);
assertEquals(0, cmdResponse.getResponseCode());
- InputJobInfo inputInfo = InputJobInfo.create(databaseName, tableName, null);
Configuration conf = new Configuration(hcatConf);
conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(getHiveConf().getAllProperties()));
Job job = new Job(conf);
- inputInfo.getProperties().setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot");
- InitializeInput.setInput(job, inputInfo);
+ Properties properties = new Properties();
+ properties.setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot");
+ HCatInputFormat.setInput(job.getConfiguration(), databaseName, tableName, null, properties);
String modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
- inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo);
+ InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo);
Map revMap = new HashMap();
revMap.put("cf1", 3L);
@@ -121,9 +122,7 @@ public class TestSnapshots extends SkeletonHBaseTest {
revMap.clear();
revMap.put("cf1", 3L);
hbaseSnapshot = new TableSnapshot(fullyQualTableName, revMap, -1);
- inputInfo = InputJobInfo.create(databaseName, tableName, null);
- inputInfo.getProperties().setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot");
- InitializeInput.setInput(job, inputInfo);
+ HCatInputFormat.setInput(job.getConfiguration(), databaseName, tableName, null, properties);
modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo);
hcatSnapshot = HBaseRevisionManagerUtil.convertSnapshot(hbaseSnapshot, inputInfo.getTableInfo());
@@ -138,5 +137,4 @@ public class TestSnapshots extends SkeletonHBaseTest {
cmdResponse = hcatDriver.run(dropDatabase);
assertEquals(0, cmdResponse.getResponseCode());
}
-
}