Index: src/java/org/apache/hcatalog/mapreduce/InitializeInput.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (revision 1304455) +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (revision ) @@ -27,30 +27,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; -import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; -import org.apache.hadoop.hive.ql.metadata.HiveUtils; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.serde.Constants; -import org.apache.hadoop.hive.serde2.Deserializer; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; - import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; @@ -64,7 +47,7 @@ * info required in the client process context. */ public class InitializeInput { - + private static final Log LOG = LogFactory.getLog(InitializeInput.class); /** The prefix for keys used for storage driver arguments */ @@ -78,23 +61,39 @@ } /** - * Set the input to use for the Job. This queries the metadata server with the specified partition predicates, - * gets the matching partitions, puts the information in the configuration object. + * Set the input to use for the Job. This queries the metadata server with the specified + * partition predicates, gets the matching partitions, and puts the information in the job + * configuration object. + * + * To ensure a known InputJobInfo state, only the database name, table name, filter, and + * properties are preserved. All other modification from the given InputJobInfo are discarded. + * + * After calling setInput, InputJobInfo can be retrieved from the job configuration as follows: + * {code} + * InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize( + * job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)); + * {code} + * * @param job the job object - * @param inputJobInfo information on the Input to read + * @param theirInputJobInfo information about the input to read * @throws Exception */ - public static void setInput(Job job, InputJobInfo inputJobInfo) throws Exception { + public static void setInput(Job job, InputJobInfo theirInputJobInfo) throws Exception { + InputJobInfo inputJobInfo = InputJobInfo.create( + theirInputJobInfo.getDatabaseName(), + theirInputJobInfo.getTableName(), + theirInputJobInfo.getFilter()); + inputJobInfo.getProperties().putAll(theirInputJobInfo.getProperties()); - //* Create and initialize an InputJobInfo object - //* Serialize the InputJobInfo and save in the Job's Configuration object - job.getConfiguration().set( - HCatConstants.HCAT_KEY_JOB_INFO, + HCatConstants.HCAT_KEY_JOB_INFO, - getSerializedHcatKeyJobInfo(job, inputJobInfo,null)); + HCatUtil.serialize(getInputJobInfo(job, inputJobInfo, null))); } - public static String getSerializedHcatKeyJobInfo(Job job, InputJobInfo inputJobInfo, String locationFilter) throws Exception { + /** + * Returns the given InputJobInfo after populating with data queried from the metadata service. + */ + private static InputJobInfo getInputJobInfo(Job job, InputJobInfo inputJobInfo, String locationFilter) throws Exception { //* Create and initialize an InputJobInfo object HiveMetaStoreClient client = null; @@ -144,7 +143,7 @@ } inputJobInfo.setPartitions(partInfoList); - return HCatUtil.serialize(inputJobInfo); + return inputJobInfo; } finally { if (client != null ) { client.close(); @@ -174,19 +173,19 @@ return ptnKeyValues; } - static PartInfo extractPartInfo(StorageDescriptor sd, - Map parameters, Configuration conf, + static PartInfo extractPartInfo(StorageDescriptor sd, + Map parameters, Configuration conf, InputJobInfo inputJobInfo) throws IOException{ HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd); StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd,parameters); Properties hcatProperties = new Properties(); - HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, + HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, storerInfo); // copy the properties from storageHandler to jobProperties MapjobProperties = HCatUtil.getInputJobProperties( - storageHandler, + storageHandler, inputJobInfo); for (String key : parameters.keySet()){ @@ -194,7 +193,7 @@ hcatProperties.put(key, parameters.get(key)); } } - // FIXME + // FIXME // Bloating partinfo with inputJobInfo is not good return new PartInfo(schema, storageHandler, sd.getLocation(), hcatProperties,