commit aee62ab755ddbafd515074348eb441df7c1cc1e7 Author: Nitay Joffe Date: Fri Sep 28 14:54:08 2012 -0400 HCATALOG-516: HCOF refactor to allow calling without Job diff --git src/java/org/apache/hcatalog/common/HCatUtil.java src/java/org/apache/hcatalog/common/HCatUtil.java index 10446e1..0aa1014 100644 --- src/java/org/apache/hcatalog/common/HCatUtil.java +++ src/java/org/apache/hcatalog/common/HCatUtil.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hive.common.JavaUtils; @@ -467,10 +469,11 @@ public class HCatUtil { return jobProperties; } - + @InterfaceAudience.Private + @InterfaceStability.Evolving public static void configureOutputStorageHandler(HCatStorageHandler storageHandler, - JobContext context, + Configuration conf, OutputJobInfo outputJobInfo) { //TODO replace IgnoreKeyTextOutputFormat with a //HiveOutputFormatWrapper in StorageHandler @@ -480,7 +483,7 @@ public class HCatUtil { outputJobInfo.getTableInfo().getStorerInfo().getProperties()); if (tableDesc.getJobProperties() == null) tableDesc.setJobProperties(new HashMap()); - for (Map.Entry el : context.getConfiguration()) { + for (Map.Entry el : conf) { tableDesc.getJobProperties().put(el.getKey(), el.getValue()); } @@ -494,7 +497,7 @@ public class HCatUtil { jobProperties); for (Map.Entry el : jobProperties.entrySet()) { - context.getConfiguration().set(el.getKey(), el.getValue()); + conf.set(el.getKey(), el.getValue()); } } catch (IOException e) { throw new IllegalStateException( diff --git src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java index 3532696..8ca35dc 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java @@ -58,12 +58,11 @@ public abstract class HCatBaseInputFormat private Class inputFileFormatClass; // TODO needs to go in InitializeInput? as part of InputJobInfo - public static HCatSchema getOutputSchema(JobContext context) + private static HCatSchema getOutputSchema(Configuration conf) throws IOException { - String os = context.getConfiguration().get( - HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); + String os = conf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); if (os == null) { - return getTableSchema(context); + return getTableSchema(conf); } else { return (HCatSchema) HCatUtil.deserialize(os); } @@ -98,12 +97,13 @@ public abstract class HCatBaseInputFormat @Override public List getSplits(JobContext jobContext) throws IOException, InterruptedException { + Configuration conf = jobContext.getConfiguration(); //Get the job info from the configuration, //throws exception if not initialized InputJobInfo inputJobInfo; try { - inputJobInfo = getJobInfo(jobContext); + inputJobInfo = getJobInfo(conf); } catch (Exception e) { throw new IOException(e); } @@ -117,7 +117,6 @@ public abstract class HCatBaseInputFormat HCatStorageHandler storageHandler; JobConf jobConf; - Configuration conf = jobContext.getConfiguration(); //For each matching partition, call getSplits on the underlying InputFormat for (PartInfo partitionInfo : partitionInfoList) { jobConf = HCatUtil.getJobConfFromContext(jobContext); @@ -183,16 +182,17 @@ public abstract class HCatBaseInputFormat HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split); PartInfo partitionInfo = hcatSplit.getPartitionInfo(); JobContext jobContext = taskContext; + Configuration conf = jobContext.getConfiguration(); HCatStorageHandler storageHandler = HCatUtil.getStorageHandler( - jobContext.getConfiguration(), partitionInfo); + conf, partitionInfo); JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext); Map jobProperties = partitionInfo.getJobProperties(); HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf); Map valuesNotInDataCols = getColValsNotInDataColumns( - getOutputSchema(jobContext), partitionInfo + getOutputSchema(conf), partitionInfo ); return new HCatRecordReader(storageHandler, valuesNotInDataCols); @@ -222,17 +222,27 @@ public abstract class HCatBaseInputFormat } /** + * @see org.apache.hcatalog.mapreduce.HCatBaseInputFormat#getTableSchema(org.apache.hadoop.conf.Configuration) + * @deprecated Use {@link #getTableSchema(org.apache.hadoop.conf.Configuration)} + */ + public static HCatSchema getTableSchema(JobContext context) + throws IOException { + return getTableSchema(context.getConfiguration()); + } + + + /** * Gets the HCatTable schema for the table specified in the HCatInputFormat.setInput call * on the specified job context. This information is available only after HCatInputFormat.setInput * has been called for a JobContext. - * @param context the context + * @param conf the Configuration object * @return the table schema * @throws IOException if HCatInputFormat.setInput has not been called * for the current context */ - public static HCatSchema getTableSchema(JobContext context) + public static HCatSchema getTableSchema(Configuration conf) throws IOException { - InputJobInfo inputJobInfo = getJobInfo(context); + InputJobInfo inputJobInfo = getJobInfo(conf); HCatSchema allCols = new HCatSchema(new LinkedList()); for (HCatFieldSchema field : inputJobInfo.getTableInfo().getDataColumns().getFields()) @@ -247,13 +257,13 @@ public abstract class HCatBaseInputFormat * Gets the InputJobInfo object by reading the Configuration and deserializing * the string. If InputJobInfo is not present in the configuration, throws an * exception since that means HCatInputFormat.setInput has not been called. - * @param jobContext the job context + * @param conf the Configuration object * @return the InputJobInfo object * @throws IOException the exception */ - private static InputJobInfo getJobInfo(JobContext jobContext) + private static InputJobInfo getJobInfo(Configuration conf) throws IOException { - String jobString = jobContext.getConfiguration().get( + String jobString = conf.get( HCatConstants.HCAT_KEY_JOB_INFO); if (jobString == null) { throw new IOException("job information not found in JobContext." diff --git src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java index d741b7f..7e2f0ec 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; @@ -40,14 +41,22 @@ public abstract class HCatBaseOutputFormat extends OutputFormat dynamicPartVals) throws IOException { + Configuration conf = jobContext.getConfiguration(); try { - OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); - HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(jobContext.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); + OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); + HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, jobInfo.getTableInfo().getStorerInfo()); Map partitionValues = jobInfo.getPartitionValues(); String location = jobInfo.getLocation(); @@ -143,7 +161,7 @@ public abstract class HCatBaseOutputFormat extends OutputFormat partMap = jobInfo.getPartitionValues(); setPartDetails(jobInfo, schema, partMap); - job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); + conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); } /** diff --git src/java/org/apache/hcatalog/mapreduce/InitializeInput.java src/java/org/apache/hcatalog/mapreduce/InitializeInput.java index df5afad..e91ddc7 100644 --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java @@ -50,6 +50,13 @@ public class InitializeInput { private static final Logger LOG = LoggerFactory.getLogger(InitializeInput.class); /** + * @see org.apache.hcatalog.mapreduce.InitializeInput#setInput(org.apache.hadoop.conf.Configuration, InputJobInfo) + */ + public static void setInput(Job job, InputJobInfo theirInputJobInfo) throws Exception { + setInput(job.getConfiguration(), theirInputJobInfo); + } + + /** * Set the input to use for the Job. This queries the metadata server with the specified * partition predicates, gets the matching partitions, and puts the information in the job * configuration object. @@ -63,32 +70,32 @@ public class InitializeInput { * job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)); * {code} * - * @param job the job object + * @param conf the job Configuration object * @param theirInputJobInfo information on the Input to read * @throws Exception */ - public static void setInput(Job job, InputJobInfo theirInputJobInfo) throws Exception { + public static void setInput(Configuration conf, + InputJobInfo theirInputJobInfo) throws Exception { InputJobInfo inputJobInfo = InputJobInfo.create( theirInputJobInfo.getDatabaseName(), theirInputJobInfo.getTableName(), theirInputJobInfo.getFilter()); inputJobInfo.getProperties().putAll(theirInputJobInfo.getProperties()); - job.getConfiguration().set( + conf.set( HCatConstants.HCAT_KEY_JOB_INFO, - HCatUtil.serialize(getInputJobInfo(job, inputJobInfo, null))); + HCatUtil.serialize(getInputJobInfo(conf, inputJobInfo, null))); } /** * Returns the given InputJobInfo after populating with data queried from the metadata service. */ private static InputJobInfo getInputJobInfo( - Job job, InputJobInfo inputJobInfo, String locationFilter) throws Exception { - + Configuration conf, InputJobInfo inputJobInfo, String locationFilter) throws Exception { HiveMetaStoreClient client = null; HiveConf hiveConf = null; try { - if (job != null) { - hiveConf = HCatUtil.getHiveConf(job.getConfiguration()); + if (conf != null) { + hiveConf = HCatUtil.getHiveConf(conf); } else { hiveConf = new HiveConf(HCatInputFormat.class); } @@ -117,7 +124,7 @@ public class InitializeInput { HCatSchema schema = HCatUtil.extractSchema( new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn)); PartInfo partInfo = extractPartInfo(schema, ptn.getSd(), - ptn.getParameters(), job.getConfiguration(), inputJobInfo); + ptn.getParameters(), conf, inputJobInfo); partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table, ptn)); partInfoList.add(partInfo); } @@ -126,7 +133,7 @@ public class InitializeInput { //Non partitioned table HCatSchema schema = HCatUtil.extractSchema(table); PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(), - table.getParameters(), job.getConfiguration(), inputJobInfo); + table.getParameters(), conf, inputJobInfo); partInfo.setPartitionValues(new HashMap()); partInfoList.add(partInfo); } diff --git src/java/org/apache/hcatalog/mapreduce/Security.java src/java/org/apache/hcatalog/mapreduce/Security.java index 041a898..f505c4d 100644 --- src/java/org/apache/hcatalog/mapreduce/Security.java +++ src/java/org/apache/hcatalog/mapreduce/Security.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.thrift.DelegationTokenSelector; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -99,7 +100,7 @@ final class Security { } void handleSecurity( - Job job, + Credentials credentials, OutputJobInfo outputJobInfo, HiveMetaStoreClient client, Configuration conf, @@ -144,20 +145,32 @@ final class Security { if (jtToken == null) { //we don't need to cancel this token as the TokenRenewer for JT tokens //takes care of cancelling them - job.getCredentials().addToken(new Text("hcat jt token"), - HCatUtil.getJobTrackerDelegationToken(conf, ugi.getUserName())); + credentials.addToken( + new Text("hcat jt token"), + HCatUtil.getJobTrackerDelegationToken(conf, ugi.getUserName()) + ); } } - job.getCredentials().addToken(new Text(ugi.getUserName() + "_" + tokenSignature), hiveToken); + credentials.addToken(new Text(ugi.getUserName() + "_" + tokenSignature), hiveToken); // this will be used by the outputcommitter to pass on to the metastore client // which in turn will pass on to the TokenSelector so that it can select // the right token. - job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature); + conf.set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature); } } } + void handleSecurity( + Job job, + OutputJobInfo outputJobInfo, + HiveMetaStoreClient client, + Configuration conf, + boolean harRequested) + throws IOException, MetaException, TException, Exception { + handleSecurity(job.getCredentials(), outputJobInfo, client, conf, harRequested); + } + // we should cancel hcat token if it was acquired by hcat // and not if it was supplied (ie Oozie). In the latter // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set