commit 175a8bbee2c4bc8b050d02a2bd163ce307cdc15a Author: Nitay Joffe Date: Fri Sep 28 14:54:08 2012 -0400 HCOF refactor to allow calling without Job diff --git src/java/org/apache/hcatalog/common/HCatUtil.java src/java/org/apache/hcatalog/common/HCatUtil.java index 10446e1..ed379bd 100644 --- src/java/org/apache/hcatalog/common/HCatUtil.java +++ src/java/org/apache/hcatalog/common/HCatUtil.java @@ -470,7 +470,7 @@ public class HCatUtil { public static void configureOutputStorageHandler(HCatStorageHandler storageHandler, - JobContext context, + Configuration conf, OutputJobInfo outputJobInfo) { //TODO replace IgnoreKeyTextOutputFormat with a //HiveOutputFormatWrapper in StorageHandler @@ -480,7 +480,7 @@ public class HCatUtil { outputJobInfo.getTableInfo().getStorerInfo().getProperties()); if (tableDesc.getJobProperties() == null) tableDesc.setJobProperties(new HashMap()); - for (Map.Entry el : context.getConfiguration()) { + for (Map.Entry el : conf) { tableDesc.getJobProperties().put(el.getKey(), el.getValue()); } @@ -494,7 +494,7 @@ public class HCatUtil { jobProperties); for (Map.Entry el : jobProperties.entrySet()) { - context.getConfiguration().set(el.getKey(), el.getValue()); + conf.set(el.getKey(), el.getValue()); } } catch (IOException e) { throw new IllegalStateException( @@ -502,6 +502,13 @@ public class HCatUtil { } } + public static void + configureOutputStorageHandler(HCatStorageHandler storageHandler, + JobContext context, + OutputJobInfo outputJobInfo) { + configureOutputStorageHandler(storageHandler, context.getConfiguration(), outputJobInfo); + } + /** * Replace the contents of dest with the contents of src * @param src diff --git src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java index 3532696..ca631d6 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java @@ -60,10 +60,14 @@ public abstract class HCatBaseInputFormat // TODO needs to go in InitializeInput? as part of InputJobInfo public static HCatSchema getOutputSchema(JobContext context) throws IOException { - String os = context.getConfiguration().get( - HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); + return getOutputSchema(context.getConfiguration()); + } + + public static HCatSchema getOutputSchema(Configuration conf) + throws IOException { + String os = conf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); if (os == null) { - return getTableSchema(context); + return getTableSchema(conf); } else { return (HCatSchema) HCatUtil.deserialize(os); } @@ -232,7 +236,22 @@ public abstract class HCatBaseInputFormat */ public static HCatSchema getTableSchema(JobContext context) throws IOException { - InputJobInfo inputJobInfo = getJobInfo(context); + return getTableSchema(context.getConfiguration()); + } + + + /** + * Gets the HCatTable schema for the table specified in the HCatInputFormat.setInput call + * on the specified job context. This information is available only after HCatInputFormat.setInput + * has been called for a JobContext. + * @param conf the Configuration object + * @return the table schema + * @throws IOException if HCatInputFormat.setInput has not been called + * for the current context + */ + public static HCatSchema getTableSchema(Configuration conf) + throws IOException { + InputJobInfo inputJobInfo = getJobInfo(conf); HCatSchema allCols = new HCatSchema(new LinkedList()); for (HCatFieldSchema field : inputJobInfo.getTableInfo().getDataColumns().getFields()) @@ -253,7 +272,20 @@ public abstract class HCatBaseInputFormat */ private static InputJobInfo getJobInfo(JobContext jobContext) throws IOException { - String jobString = jobContext.getConfiguration().get( + return getJobInfo(jobContext.getConfiguration()); + } + + /** + * Gets the InputJobInfo object by reading the Configuration and deserializing + * the string. If InputJobInfo is not present in the configuration, throws an + * exception since that means HCatInputFormat.setInput has not been called. + * @param conf the Configuration object + * @return the InputJobInfo object + * @throws IOException the exception + */ + private static InputJobInfo getJobInfo(Configuration conf) + throws IOException { + String jobString = conf.get( HCatConstants.HCAT_KEY_JOB_INFO); if (jobString == null) { throw new IOException("job information not found in JobContext." diff --git src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java index d741b7f..c42416b 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; @@ -44,10 +45,21 @@ public abstract class HCatBaseOutputFormat extends OutputFormat partMap = jobInfo.getPartitionValues(); setPartDetails(jobInfo, schema, partMap); - job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); + conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); } /** diff --git src/java/org/apache/hcatalog/mapreduce/InitializeInput.java src/java/org/apache/hcatalog/mapreduce/InitializeInput.java index ce65623..0a58618 100644 --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java @@ -69,14 +69,37 @@ public class InitializeInput { * @throws Exception */ public static void setInput(Job job, InputJobInfo theirInputJobInfo) throws Exception { + setInput(job.getConfiguration(), theirInputJobInfo); + } + + /** + * Set the input to use for the Job. This queries the metadata server with the specified + * partition predicates, gets the matching partitions, and puts the information in the job + * configuration object. + * + * To ensure a known InputJobInfo state, only the database name, table name, filter, and + * properties are preserved. All other modification from the given InputJobInfo are discarded. + * + * After calling setInput, InputJobInfo can be retrieved from the job configuration as follows: + * {code} + * InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize( + * job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)); + * {code} + * + * @param conf the job Configuration object + * @param theirInputJobInfo information on the Input to read + * @throws Exception + */ + public static void setInput(Configuration conf, + InputJobInfo theirInputJobInfo) throws Exception { InputJobInfo inputJobInfo = InputJobInfo.create( theirInputJobInfo.getDatabaseName(), theirInputJobInfo.getTableName(), theirInputJobInfo.getFilter()); inputJobInfo.getProperties().putAll(theirInputJobInfo.getProperties()); - job.getConfiguration().set( + conf.set( HCatConstants.HCAT_KEY_JOB_INFO, - HCatUtil.serialize(getInputJobInfo(job, inputJobInfo, null))); + HCatUtil.serialize(getInputJobInfo(conf, inputJobInfo, null))); } /** @@ -84,12 +107,23 @@ public class InitializeInput { */ private static InputJobInfo getInputJobInfo( Job job, InputJobInfo inputJobInfo, String locationFilter) throws Exception { + Configuration conf = null; + if (job != null) { + conf = job.getConfiguration(); + } + return getInputJobInfo(conf, inputJobInfo, locationFilter); + } + /** + * Returns the given InputJobInfo after populating with data queried from the metadata service. + */ + private static InputJobInfo getInputJobInfo( + Configuration conf, InputJobInfo inputJobInfo, String locationFilter) throws Exception { HiveMetaStoreClient client = null; HiveConf hiveConf = null; try { - if (job != null) { - hiveConf = HCatUtil.getHiveConf(job.getConfiguration()); + if (conf != null) { + hiveConf = HCatUtil.getHiveConf(conf); } else { hiveConf = new HiveConf(HCatInputFormat.class); } @@ -118,7 +152,7 @@ public class InitializeInput { HCatSchema schema = HCatUtil.extractSchema( new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn)); PartInfo partInfo = extractPartInfo(schema, ptn.getSd(), - ptn.getParameters(), job.getConfiguration(), inputJobInfo); + ptn.getParameters(), conf, inputJobInfo); partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn)); partInfoList.add(partInfo); } @@ -127,7 +161,7 @@ public class InitializeInput { //Non partitioned table HCatSchema schema = HCatUtil.extractSchema(table); PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(), - table.getParameters(), job.getConfiguration(), inputJobInfo); + table.getParameters(), conf, inputJobInfo); partInfo.setPartitionValues(new HashMap()); partInfoList.add(partInfo); } diff --git src/java/org/apache/hcatalog/mapreduce/Security.java src/java/org/apache/hcatalog/mapreduce/Security.java index 041a898..f505c4d 100644 --- src/java/org/apache/hcatalog/mapreduce/Security.java +++ src/java/org/apache/hcatalog/mapreduce/Security.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.thrift.DelegationTokenSelector; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -99,7 +100,7 @@ final class Security { } void handleSecurity( - Job job, + Credentials credentials, OutputJobInfo outputJobInfo, HiveMetaStoreClient client, Configuration conf, @@ -144,20 +145,32 @@ final class Security { if (jtToken == null) { //we don't need to cancel this token as the TokenRenewer for JT tokens //takes care of cancelling them - job.getCredentials().addToken(new Text("hcat jt token"), - HCatUtil.getJobTrackerDelegationToken(conf, ugi.getUserName())); + credentials.addToken( + new Text("hcat jt token"), + HCatUtil.getJobTrackerDelegationToken(conf, ugi.getUserName()) + ); } } - job.getCredentials().addToken(new Text(ugi.getUserName() + "_" + tokenSignature), hiveToken); + credentials.addToken(new Text(ugi.getUserName() + "_" + tokenSignature), hiveToken); // this will be used by the outputcommitter to pass on to the metastore client // which in turn will pass on to the TokenSelector so that it can select // the right token. - job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature); + conf.set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature); } } } + void handleSecurity( + Job job, + OutputJobInfo outputJobInfo, + HiveMetaStoreClient client, + Configuration conf, + boolean harRequested) + throws IOException, MetaException, TException, Exception { + handleSecurity(job.getCredentials(), outputJobInfo, client, conf, harRequested); + } + // we should cancel hcat token if it was acquired by hcat // and not if it was supplied (ie Oozie). In the latter // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set