diff --git core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java index cf9bba2..e70a0a1 100644 --- core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java +++ core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java @@ -37,7 +37,6 @@ import org.apache.hcatalog.data.transfer.ReadEntity; import org.apache.hcatalog.data.transfer.ReaderContext; import org.apache.hcatalog.data.transfer.state.StateProvider; import org.apache.hcatalog.mapreduce.HCatInputFormat; -import org.apache.hcatalog.mapreduce.InputJobInfo; import org.apache.hcatalog.shims.HCatHadoopShims; /** @@ -63,9 +62,8 @@ public class HCatInputFormatReader extends HCatReader { try { Job job = new Job(conf); - InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(), - re.getTableName(), re.getFilterString()); - HCatInputFormat.setInput(job, jobInfo); + HCatInputFormat.setInput(job.getConfiguration(), + re.getDbName(), re.getTableName(), re.getFilterString(), null); HCatInputFormat hcif = new HCatInputFormat(); ReaderContext cntxt = new ReaderContext(); cntxt.setInputSplits(hcif.getSplits( diff --git core/src/main/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java core/src/main/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java index 7a7446f..1d64e41 100644 --- core/src/main/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java +++ core/src/main/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java @@ -19,32 +19,45 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; +import java.util.Properties; +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; -/** The InputFormat to use to read data from HCatalog. */ +import javax.annotation.Nullable; + +/** + * The InputFormat to use to read data from HCatalog. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving public class HCatInputFormat extends HCatBaseInputFormat { /** - * @see org.apache.hcatalog.mapreduce.HCatInputFormat#setInput(org.apache.hadoop.conf.Configuration, InputJobInfo) + * @deprecated As of release 0.5, replaced by + * {@link #setInput(Configuration, String, String, String, Properties)} + * Will be removed in a future release. */ - public static void setInput(Job job, - InputJobInfo inputJobInfo) throws IOException { - setInput(job.getConfiguration(), inputJobInfo); + @Deprecated + public static void setInput(Job job, InputJobInfo inputJobInfo) throws IOException { + setInput(job.getConfiguration(), + inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName(), + inputJobInfo.getFilter(), + inputJobInfo.getProperties()); } /** - * Set the input information to use for the job. This queries the metadata server - * with the specified partition predicates, gets the matching partitions, and - * puts the information in the conf object. The inputInfo object is updated - * with information needed in the client context. - * @param conf the job Configuration object - * @param inputJobInfo the input information about the table to read - * @throws IOException the exception in communicating with the metadata server + * @deprecated As of release 0.5, replaced by + * {@link #setInput(Configuration, String, String, String, Properties)}. + * Will be removed in a future release. */ - public static void setInput(Configuration conf, - InputJobInfo inputJobInfo) throws IOException { + @Deprecated + public static void setInput(Configuration conf, InputJobInfo inputJobInfo) + throws IOException { try { InitializeInput.setInput(conf, inputJobInfo); } catch (Exception e) { @@ -52,5 +65,29 @@ public class HCatInputFormat extends HCatBaseInputFormat { } } + /** + * Set inputs to use for the job. This queries the metastore with the given input + * specification and serializes matching partitions into the job conf for use by MR tasks. + * + * @param conf the job configuration + * @param dbName database name + * @param tableName table name + * @param filter filter specification + * @param properties properties for the input specification + * @throws IOException on all errors + */ + public static void setInput(Configuration conf, String dbName, String tableName, + @Nullable String filter, @Nullable Properties properties) + throws IOException { + + Preconditions.checkNotNull(conf, "Required argument conf is null"); + Preconditions.checkNotNull(dbName, "Required argument dbName is null"); + Preconditions.checkNotNull(tableName, "Required argument tableName is null"); + try { + InitializeInput.setInput(conf, InputJobInfo.create(dbName, tableName, filter, properties)); + } catch (Exception e) { + throw new IOException(e); + } + } } diff --git core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java index e91ddc7..627c4b3 100644 --- core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java +++ core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; * serialized and written into the JobContext configuration. The inputInfo is also updated with * info required in the client process context. */ -public class InitializeInput { +class InitializeInput { private static final Logger LOG = LoggerFactory.getLogger(InitializeInput.class); @@ -79,8 +79,8 @@ public class InitializeInput { InputJobInfo inputJobInfo = InputJobInfo.create( theirInputJobInfo.getDatabaseName(), theirInputJobInfo.getTableName(), - theirInputJobInfo.getFilter()); - inputJobInfo.getProperties().putAll(theirInputJobInfo.getProperties()); + theirInputJobInfo.getFilter(), + theirInputJobInfo.getProperties()); conf.set( HCatConstants.HCAT_KEY_JOB_INFO, HCatUtil.serialize(getInputJobInfo(conf, inputJobInfo, null))); diff --git core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java index 8779c5e..2cfbc30 100644 --- core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java +++ core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java @@ -17,6 +17,8 @@ */ package org.apache.hcatalog.mapreduce; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import java.io.IOException; @@ -30,17 +32,16 @@ import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; /** - * Container for metadata read from the metadata server. Users should specify input to - * their HCatalog MR jobs as follows: - *

- * HCatInputFormat.setInput(job, InputJobInfo.create(databaseName, tableName, filter)); - *

- * Note: while InputJobInfo is public, - * HCATALOG-527 discusses - * removing this class from the public API, by simplifying {@link HCatInputFormat#setInput} - * to simply take the input specification arguments directly. Use InputJobInfo outside the - * above context (including serialization) at your own peril! + * Container for metadata read from the metadata server. + * Prior to release 0.5, InputJobInfo was a key part of the public API, exposed directly + * to end-users as an argument to + * {@link HCatInputFormat#setInput(org.apache.hadoop.mapreduce.Job, InputJobInfo)}. + * Going forward, we plan on treating InputJobInfo as an implementation detail and no longer + * expose to end-users. Should you have a need to use InputJobInfo outside HCatalog itself, + * please contact the developer mailing list before depending on this class. */ +@InterfaceAudience.Private +@InterfaceStability.Evolving public class InputJobInfo implements Serializable { /** The serialization version */ @@ -69,22 +70,22 @@ public class InputJobInfo implements Serializable { * @param tableName the table name * @param filter the partition filter */ - public static InputJobInfo create(String databaseName, String tableName, - String filter) { - return new InputJobInfo(databaseName, tableName, filter); + String filter, + Properties properties) { + return new InputJobInfo(databaseName, tableName, filter, properties); } - private InputJobInfo(String databaseName, String tableName, - String filter) { + String filter, + Properties properties) { this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName; this.tableName = tableName; this.filter = filter; - this.properties = new Properties(); + this.properties = properties == null ? new Properties() : properties; } /** diff --git core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java index 74c4888..602ef9a 100644 --- core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java +++ core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java @@ -321,8 +321,7 @@ public abstract class HCatMapReduceTest extends HCatBaseTest { job.setInputFormatClass(HCatInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, filter); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job.getConfiguration(), dbName, tableName, filter, null); job.setMapOutputKeyClass(BytesWritable.class); job.setMapOutputValueClass(Text.class); @@ -353,10 +352,9 @@ public abstract class HCatMapReduceTest extends HCatBaseTest { job.setInputFormatClass(HCatInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, null); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job.getConfiguration(), dbName, tableName, null, null); - return HCatInputFormat.getTableSchema(job); + return HCatInputFormat.getTableSchema(job.getConfiguration()); } } diff --git core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java index a6b381e..4329475 100644 --- core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java +++ core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java @@ -119,7 +119,7 @@ public class TestHCatInputFormat extends HCatBaseTest { job.setInputFormatClass(HCatInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); - HCatInputFormat.setInput(job, InputJobInfo.create("default", "test_bad_records", null)); + HCatInputFormat.setInput(job.getConfiguration(), "default", "test_bad_records", null, null); job.setMapOutputKeyClass(HCatRecord.class); job.setMapOutputValueClass(HCatRecord.class); diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java index 4bdb7c3..cd3ffe7 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java @@ -115,8 +115,8 @@ public class HCatLoader extends HCatBaseLoader { } } else { Job clone = new Job(job.getConfiguration()); - HCatInputFormat.setInput(job, InputJobInfo.create(dbName, - tableName, getPartitionFilterString())); + HCatInputFormat.setInput(job.getConfiguration(), dbName, tableName, + getPartitionFilterString(), null); // We will store all the new /changed properties in the job in the // udf context, so the the HCatInputFormat.setInput method need not diff --git storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java index 3875185..817e898 100644 --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java @@ -63,7 +63,6 @@ import org.apache.hcatalog.hbase.snapshot.TableSnapshot; import org.apache.hcatalog.hbase.snapshot.Transaction; import org.apache.hcatalog.mapreduce.HCatInputFormat; import org.apache.hcatalog.mapreduce.HCatOutputFormat; -import org.apache.hcatalog.mapreduce.InputJobInfo; import org.apache.hcatalog.mapreduce.OutputJobInfo; import org.junit.Test; @@ -592,9 +591,7 @@ public class TestHBaseBulkOutputFormat extends SkeletonHBaseTest { job.setJarByClass(this.getClass()); job.setMapperClass(MapReadAbortedTransaction.class); job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create(databaseName, - tableName, null); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job.getConfiguration(), databaseName, tableName, null, null); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(BytesWritable.class); diff --git storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java index 81a3099..9757ff2 100644 --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java @@ -61,7 +61,6 @@ import org.apache.hcatalog.hbase.snapshot.TableSnapshot; import org.apache.hcatalog.hbase.snapshot.Transaction; import org.apache.hcatalog.mapreduce.HCatInputFormat; import org.apache.hcatalog.mapreduce.HCatOutputFormat; -import org.apache.hcatalog.mapreduce.InputJobInfo; import org.apache.hcatalog.mapreduce.OutputJobInfo; import org.junit.Test; @@ -363,9 +362,7 @@ public class TestHBaseDirectOutputFormat extends SkeletonHBaseTest { job.setJarByClass(this.getClass()); job.setMapperClass(MapReadAbortedTransaction.class); job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create(databaseName, - tableName, null); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job.getConfiguration(), databaseName, tableName, null, null); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(BytesWritable.class); diff --git storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java index 78724a1..9ffe193 100644 --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java @@ -216,9 +216,7 @@ public class TestHBaseInputFormat extends SkeletonHBaseTest { MapReadHTable.resetCounters(); job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create(databaseName, tableName, - null); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job.getConfiguration(), databaseName, tableName, null, null); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(BytesWritable.class); @@ -281,10 +279,9 @@ public class TestHBaseInputFormat extends SkeletonHBaseTest { job.setJarByClass(this.getClass()); job.setMapperClass(MapReadProjHTable.class); job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create( - MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); HCatInputFormat.setOutputSchema(job, getProjectionSchema()); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job.getConfiguration(), MetaStoreUtils.DEFAULT_DATABASE_NAME, + tableName, null, null); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(BytesWritable.class); @@ -340,12 +337,11 @@ public class TestHBaseInputFormat extends SkeletonHBaseTest { job.setMapperClass(MapReadProjectionHTable.class); job.setInputFormat(HBaseInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create( - MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); //Configure projection schema job.set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(getProjectionSchema())); Job newJob = new Job(job); - HCatInputFormat.setInput(newJob, inputJobInfo); + HCatInputFormat.setInput(newJob.getConfiguration(), MetaStoreUtils.DEFAULT_DATABASE_NAME, + tableName, null, null); String inputJobString = newJob.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); InputJobInfo info = (InputJobInfo) HCatUtil.deserialize(inputJobString); job.set(HCatConstants.HCAT_KEY_JOB_INFO, inputJobString); @@ -411,9 +407,8 @@ public class TestHBaseInputFormat extends SkeletonHBaseTest { job.setMapperClass(MapReadHTable.class); MapReadHTable.resetCounters(); job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create( - MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job.getConfiguration(), MetaStoreUtils.DEFAULT_DATABASE_NAME, + tableName, null, null); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(BytesWritable.class); @@ -473,9 +468,8 @@ public class TestHBaseInputFormat extends SkeletonHBaseTest { job.setJarByClass(this.getClass()); job.setMapperClass(MapReadHTableRunningAbort.class); job.setInputFormatClass(HCatInputFormat.class); - InputJobInfo inputJobInfo = InputJobInfo.create( - MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null); - HCatInputFormat.setInput(job, inputJobInfo); + HCatInputFormat.setInput(job.getConfiguration(), MetaStoreUtils.DEFAULT_DATABASE_NAME, + tableName, null, null); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(BytesWritable.class); diff --git storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java index e07bf46..f39a597 100644 --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import java.net.URI; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -37,7 +38,7 @@ import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.hbase.snapshot.TableSnapshot; -import org.apache.hcatalog.mapreduce.InitializeInput; +import org.apache.hcatalog.mapreduce.HCatInputFormat; import org.apache.hcatalog.mapreduce.InputJobInfo; import org.junit.Test; @@ -87,15 +88,15 @@ public class TestSnapshots extends SkeletonHBaseTest { cmdResponse = hcatDriver.run(tableQuery); assertEquals(0, cmdResponse.getResponseCode()); - InputJobInfo inputInfo = InputJobInfo.create(databaseName, tableName, null); Configuration conf = new Configuration(hcatConf); conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties())); Job job = new Job(conf); - inputInfo.getProperties().setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot"); - InitializeInput.setInput(job, inputInfo); + Properties properties = new Properties(); + properties.setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot"); + HCatInputFormat.setInput(job.getConfiguration(), databaseName, tableName, null, properties); String modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); - inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo); + InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo); Map revMap = new HashMap(); revMap.put("cf1", 3L); @@ -121,9 +122,7 @@ public class TestSnapshots extends SkeletonHBaseTest { revMap.clear(); revMap.put("cf1", 3L); hbaseSnapshot = new TableSnapshot(fullyQualTableName, revMap, -1); - inputInfo = InputJobInfo.create(databaseName, tableName, null); - inputInfo.getProperties().setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot"); - InitializeInput.setInput(job, inputInfo); + HCatInputFormat.setInput(job.getConfiguration(), databaseName, tableName, null, properties); modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo); hcatSnapshot = HBaseRevisionManagerUtil.convertSnapshot(hbaseSnapshot, inputInfo.getTableInfo()); @@ -138,5 +137,4 @@ public class TestSnapshots extends SkeletonHBaseTest { cmdResponse = hcatDriver.run(dropDatabase); assertEquals(0, cmdResponse.getResponseCode()); } - }