diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java index 93a03ad..5685e09 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; @@ -82,8 +83,9 @@ private static volatile HiveClientCache hiveClientCache; public static boolean checkJobContextIfRunningFromBackend(JobContext j) { - if (j.getConfiguration().get("mapred.task.id", "").equals("") && - !("true".equals(j.getConfiguration().get("pig.illustrating")))) { + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(j); + if (configuration.get("mapred.task.id", "").equals("") && + !("true".equals(configuration.get("pig.illustrating")))) { return false; } return true; @@ -632,7 +634,7 @@ public static JobConf getJobConfFromContext(JobContext jobContext) { // we need to convert the jobContext into a jobConf // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat) // begin conversion.. - jobConf = new JobConf(jobContext.getConfiguration()); + jobConf = new JobConf(ShimLoader.getHadoopShims().getConfiguration(jobContext)); // ..end of conversion diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java index 8669ded..6e35fdf 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java @@ -24,10 +24,12 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -65,9 +67,11 @@ public ReaderContext prepareRead() throws HCatException { HCatInputFormat hcif = HCatInputFormat.setInput( job, re.getDbName(), re.getTableName(), re.getFilterString()); ReaderContextImpl cntxt = new ReaderContextImpl(); - cntxt.setInputSplits(hcif.getSplits( - ShimLoader.getHadoopShims().getHCatShim().createJobContext(job.getConfiguration(), null))); - cntxt.setConf(job.getConfiguration()); + HadoopShims shims = ShimLoader.getHadoopShims(); + Configuration configuration = shims.getConfiguration(job); + JobContext jobContext = shims.getHCatShim().createJobContext(configuration, null); + cntxt.setInputSplits(hcif.getSplits(jobContext)); + cntxt.setConf(configuration); return cntxt; } catch (IOException e) { throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java index cead40d..f233c10 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java @@ -21,8 +21,10 @@ import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -92,10 +94,11 @@ public void cleanupJob(JobContext context) throws IOException { //Cancel HCat and JobTracker tokens HiveMetaStoreClient client = null; try { - HiveConf hiveConf = HCatUtil.getHiveConf(context.getConfiguration()); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + HiveConf hiveConf = HCatUtil.getHiveConf(configuration); client = HCatUtil.getHiveClient(hiveConf); String tokenStrForm = client.getTokenStrForm(); - if (tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + if (tokenStrForm != null && configuration.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { client.cancelDelegationToken(tokenStrForm); } } catch (Exception e) { diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java index 3a07b0c..ad593e6 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java @@ -19,6 +19,8 @@ package org.apache.hive.hcatalog.mapreduce; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -92,10 +94,11 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) */ @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); org.apache.hadoop.mapred.OutputFormat, ? super Writable> outputFormat = getBaseOutputFormat(); - JobConf jobConf = new JobConf(context.getConfiguration()); + JobConf jobConf = new JobConf(configuration); outputFormat.checkOutputSpecs(null, jobConf); - HCatUtil.copyConf(jobConf, context.getConfiguration()); + HCatUtil.copyConf(jobConf, configuration); } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java index 491da14..d9ef2d1 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java @@ -99,13 +99,14 @@ public FileOutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter baseCommitter) throws IOException { super(context, baseCommitter); - jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + jobInfo = HCatOutputFormat.getJobInfo(configuration); dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed(); this.partitionsDiscovered = !dynamicPartitioningUsed; - cachedStorageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); + cachedStorageHandler = HCatUtil.getStorageHandler(configuration, jobInfo.getTableInfo().getStorerInfo()); Table table = new Table(jobInfo.getTableInfo().getTable()); - if (dynamicPartitioningUsed && Boolean.valueOf((String)table.getProperty("EXTERNAL")) + if (dynamicPartitioningUsed && Boolean.valueOf(table.getProperty("EXTERNAL")) && jobInfo.getCustomDynamicPath() != null && jobInfo.getCustomDynamicPath().length() > 0) { customDynamicLocationUsed = true; @@ -157,6 +158,9 @@ public void setupTask(TaskAttemptContext context) throws IOException { @Override public void abortJob(JobContext jobContext, State state) throws IOException { + + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(jobContext); + try { if (dynamicPartitioningUsed) { discoverPartitions(jobContext); @@ -168,28 +172,28 @@ public void abortJob(JobContext jobContext, State state) throws IOException { } else if (dynamicPartitioningUsed) { for (JobContext currContext : contextDiscoveredByPath.values()) { try { - new JobConf(currContext.getConfiguration()) + new JobConf(ShimLoader.getHadoopShims().getConfiguration(currContext)) .getOutputCommitter().abortJob(currContext, - state); + state); } catch (Exception e) { throw new IOException(e); } } } Path src; - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext.getConfiguration()); + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(configuration); Path tblPath = new Path(jobInfo.getTableInfo().getTableLocation()); if (dynamicPartitioningUsed) { if (!customDynamicLocationUsed) { src = new Path(getPartitionRootLocation(jobInfo.getLocation(), jobInfo.getTableInfo().getTable() .getPartitionKeysSize())); } else { - src = new Path(getCustomPartitionRootLocation(jobInfo, jobContext.getConfiguration())); + src = new Path(getCustomPartitionRootLocation(jobInfo, configuration)); } } else { src = new Path(jobInfo.getLocation()); } - FileSystem fs = src.getFileSystem(jobContext.getConfiguration()); + FileSystem fs = src.getFileSystem(configuration); // Note fs.delete will fail on Windows. The reason is in OutputCommitter, // Hadoop is still writing to _logs/history. On Linux, OS don't care file is still // open and remove the directory anyway, but on Windows, OS refuse to remove a @@ -215,12 +219,13 @@ private static boolean getOutputDirMarking(Configuration conf) { @Override public void commitJob(JobContext jobContext) throws IOException { + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(jobContext); if (dynamicPartitioningUsed) { discoverPartitions(jobContext); // Commit each partition so it gets moved out of the job work // dir for (JobContext context : contextDiscoveredByPath.values()) { - new JobConf(context.getConfiguration()) + new JobConf(ShimLoader.getHadoopShims().getConfiguration(context)) .getOutputCommitter().commitJob(context); } } @@ -230,11 +235,10 @@ public void commitJob(JobContext jobContext) throws IOException { } registerPartitions(jobContext); // create _SUCCESS FILE if so requested. - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext.getConfiguration()); - if (getOutputDirMarking(jobContext.getConfiguration())) { + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(configuration); + if (getOutputDirMarking(configuration)) { Path outputPath = new Path(jobInfo.getLocation()); - FileSystem fileSys = outputPath.getFileSystem(jobContext - .getConfiguration()); + FileSystem fileSys = outputPath.getFileSystem(configuration); // create a file in the folder to mark it if (fileSys.exists(outputPath)) { Path filePath = new Path(outputPath, @@ -664,9 +668,10 @@ private Path getFinalPath(FileSystem fs, Path file, Path src, * Run to discover dynamic partitions available */ private void discoverPartitions(JobContext context) throws IOException { + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); if (!partitionsDiscovered) { // LOG.info("discover ptns called"); - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(configuration); harProcessor.setEnabled(jobInfo.getHarRequested()); @@ -674,7 +679,7 @@ private void discoverPartitions(JobContext context) throws IOException { int maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); Path loadPath = new Path(jobInfo.getLocation()); - FileSystem fs = loadPath.getFileSystem(context.getConfiguration()); + FileSystem fs = loadPath.getFileSystem(configuration); // construct a path pattern (e.g., /*/*) to find all dynamically generated paths String dynPathSpec = loadPath.toUri().getPath(); @@ -713,10 +718,10 @@ private void discoverPartitions(JobContext context) throws IOException { st.getPath().toString()); } partitionsDiscoveredByPath.put(st.getPath().toString(), fullPartSpec); - JobConf jobConf = (JobConf)context.getConfiguration(); + JobConf jobConf = (JobConf)configuration; JobContext currContext = HCatMapRedUtil.createJobContext( jobConf, - context.getJobID(), + ShimLoader.getHadoopShims().getJobID(context), InternalUtil.createReporter(HCatMapRedUtil.createTaskAttemptContext(jobConf, ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()))); HCatOutputFormat.configureOutputStorageHandler(currContext, jobInfo, fullPartSpec); @@ -739,8 +744,8 @@ private void registerPartitions(JobContext context) throws IOException{ if (dynamicPartitioningUsed){ discoverPartitions(context); } - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); - Configuration conf = context.getConfiguration(); + Configuration conf = ShimLoader.getHadoopShims().getConfiguration(context); + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(conf); Table table = new Table(jobInfo.getTableInfo().getTable()); Path tblPath = new Path(table.getTTable().getSd().getLocation()); FileSystem fs = tblPath.getFileSystem(conf); @@ -973,8 +978,8 @@ private void cancelDelegationTokens(JobContext context) throws IOException{ LOG.info("Cancelling delegation token for the job."); HiveMetaStoreClient client = null; try { - HiveConf hiveConf = HCatUtil - .getHiveConf(context.getConfiguration()); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + HiveConf hiveConf = HCatUtil.getHiveConf(configuration); client = HCatUtil.getHiveClient(hiveConf); // cancel the deleg. tokens that were acquired for this job now that // we are done - we should cancel if the tokens were acquired by @@ -983,8 +988,8 @@ private void cancelDelegationTokens(JobContext context) throws IOException{ // the conf will not be set String tokenStrForm = client.getTokenStrForm(); if (tokenStrForm != null - && context.getConfiguration().get( - HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + && configuration.get( + HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { client.cancelDelegationToken(tokenStrForm); } } catch (MetaException e) { diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java index 1a7595f..7b91ddf 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java @@ -19,10 +19,9 @@ package org.apache.hive.hcatalog.mapreduce; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; @@ -32,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -114,10 +114,11 @@ public FileOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat getSplits(JobContext jobContext) throws IOException, InterruptedException { - Configuration conf = jobContext.getConfiguration(); + Configuration conf = ShimLoader.getHadoopShims().getConfiguration(jobContext); //Get the job info from the configuration, //throws exception if not initialized @@ -185,13 +186,12 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema) HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split); PartInfo partitionInfo = hcatSplit.getPartitionInfo(); - JobContext jobContext = taskContext; - Configuration conf = jobContext.getConfiguration(); + Configuration conf = taskContext.getConfiguration(); HiveStorageHandler storageHandler = HCatUtil.getStorageHandler( conf, partitionInfo); - JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext); + JobConf jobConf = HCatUtil.getJobConfFromContext(taskContext); Map jobProperties = partitionInfo.getJobProperties(); HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf); diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java index 4f7a74a..a8fe008 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputFormat; @@ -73,18 +74,19 @@ public void checkOutputSpecs(JobContext context */ protected OutputFormat, HCatRecord> getOutputFormat(JobContext context) throws IOException { - OutputJobInfo jobInfo = getJobInfo(context.getConfiguration()); - HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + OutputJobInfo jobInfo = getJobInfo(configuration); + HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(configuration, jobInfo.getTableInfo().getStorerInfo()); // Always configure storage handler with jobproperties/jobconf before calling any methods on it configureOutputStorageHandler(context); if (storageHandler instanceof FosterStorageHandler) { return new FileOutputFormatContainer(ReflectionUtils.newInstance( - storageHandler.getOutputFormatClass(),context.getConfiguration())); + storageHandler.getOutputFormatClass(),configuration)); } else { return new DefaultOutputFormatContainer(ReflectionUtils.newInstance( - storageHandler.getOutputFormatClass(),context.getConfiguration())); + storageHandler.getOutputFormatClass(),configuration)); } } @@ -125,10 +127,10 @@ static void configureOutputStorageHandler( @SuppressWarnings("unchecked") static void configureOutputStorageHandler( JobContext jobContext, List dynamicPartVals) throws IOException { - Configuration conf = jobContext.getConfiguration(); + Configuration conf = ShimLoader.getHadoopShims().getConfiguration(jobContext); try { OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); - HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(jobContext.getConfiguration(),jobInfo.getTableInfo().getStorerInfo()); + HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(conf,jobInfo.getTableInfo().getStorerInfo()); Map partitionValues = jobInfo.getPartitionValues(); String location = jobInfo.getLocation(); diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java index b651cb3..e90cc33 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.mapreduce; +import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.conf.Configuration; @@ -48,9 +49,10 @@ public static TaskAttemptContext createTaskAttemptContext(JobConf conf, TaskAtte return ShimLoader.getHadoopShims().newTaskAttemptID(jobId, isMap, taskId, id); } public static org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapreduce.JobContext context) { - return createJobContext((JobConf)context.getConfiguration(), - context.getJobID(), - Reporter.NULL); + HadoopShims shims = ShimLoader.getHadoopShims(); + return createJobContext((JobConf) shims.getConfiguration(context), + shims.getJobID(context), + Reporter.NULL); } public static JobContext createJobContext(JobConf conf, org.apache.hadoop.mapreduce.JobID id, Progressable progressable) { diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/MultiOutputFormat.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/MultiOutputFormat.java index 69a7345..c67da80 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/MultiOutputFormat.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/MultiOutputFormat.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.hcatalog.common.HCatUtil; import org.slf4j.Logger; @@ -186,10 +187,12 @@ public static JobConfigurer createConfigurer(Job job) { * @return a copy of the JobContext with the alias configuration populated */ public static JobContext getJobContext(String alias, JobContext context) { - String aliasConf = context.getConfiguration().get(getAliasConfName(alias)); - JobContext aliasContext = ShimLoader.getHadoopShims().getHCatShim().createJobContext( - context.getConfiguration(), context.getJobID()); - addToConfig(aliasConf, aliasContext.getConfiguration()); + HadoopShims shims = ShimLoader.getHadoopShims(); + Configuration configuration = shims.getConfiguration(context); + String aliasConf = configuration.get(getAliasConfName(alias)); + JobContext aliasContext = shims.getHCatShim().createJobContext( + configuration, shims.getJobID(context)); + addToConfig(aliasConf, configuration); return aliasContext; } @@ -233,7 +236,8 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted OutputFormat outputFormat = getOutputFormatInstance(aliasContext); outputFormat.checkOutputSpecs(aliasContext); // Copy credentials and any new config added back to JobContext - context.getCredentials().addAll(aliasContext.getCredentials()); + Credentials credentials = (Credentials)ShimLoader.getHadoopShims().getCredentials(context); + credentials.addAll((Credentials)ShimLoader.getHadoopShims().getCredentials(aliasContext)); setAliasConf(alias, context, aliasContext); } } @@ -255,7 +259,7 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOE OutputFormat outputFormat; try { outputFormat = ReflectionUtils.newInstance(context.getOutputFormatClass(), - context.getConfiguration()); + ShimLoader.getHadoopShims().getConfiguration(context)); } catch (ClassNotFoundException e) { throw new IllegalStateException(e); } @@ -263,7 +267,7 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOE } private static String[] getOutputFormatAliases(JobContext context) { - return context.getConfiguration().getStrings(MO_ALIASES); + return ShimLoader.getHadoopShims().getConfiguration(context).getStrings(MO_ALIASES); } /** @@ -282,9 +286,9 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOE * configuration. */ private static void setAliasConf(String alias, JobContext userJob, JobContext aliasContext) { - Configuration userConf = userJob.getConfiguration(); + Configuration userConf = ShimLoader.getHadoopShims().getConfiguration(userJob); StringBuilder builder = new StringBuilder(); - for (Entry conf : aliasContext.getConfiguration()) { + for (Entry conf : ShimLoader.getHadoopShims().getConfiguration(aliasContext)) { String key = conf.getKey(); String value = conf.getValue(); String jobValue = userConf.getRaw(key); diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java index 39ef86e..980bca5 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java @@ -177,7 +177,8 @@ void handleSecurity( // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set void cancelToken(HiveMetaStoreClient client, JobContext context) throws IOException, MetaException { String tokenStrForm = client.getTokenStrForm(); - if (tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + if (tokenStrForm != null && + ShimLoader.getHadoopShims().getConfiguration(context).get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { try { client.cancelDelegationToken(tokenStrForm); } catch (TException e) { diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceInputFormat.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceInputFormat.java index 0c3d707..222ddbf 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceInputFormat.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceInputFormat.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; @@ -44,8 +45,8 @@ @Override public List getSplits(JobContext job) throws IOException { - - job.getConfiguration().setLong( + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(job); + configuration.setLong( ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"), SequenceFile.SYNC_INTERVAL); return super.getSplits(job); diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java index 02a13b3..b10a7d5 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java @@ -42,7 +42,6 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -229,7 +228,7 @@ private void writeThenReadByRecordReader(int intervalRecordCount, RCFileMapReduceInputFormat inputFormat = new RCFileMapReduceInputFormat(); Configuration jonconf = new Configuration(cloneConf); jonconf.set("mapred.input.dir", testDir.toString()); - JobContext context = new Job(jonconf); + Job context = new Job(jonconf); context.getConfiguration().setLong( ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMAXSPLITSIZE"), maxSplitSize); List splits = inputFormat.getSplits(context); diff --git shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index d03f270..5d0cd14 100644 --- shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -861,6 +861,16 @@ public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext con } @Override + public Object getCredentials(org.apache.hadoop.mapreduce.JobContext context) { + return null; + } + + @Override + public JobID getJobID(org.apache.hadoop.mapreduce.JobContext context) { + return context.getJobID(); + } + + @Override public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOException { boolean origDisableHDFSCache = conf.getBoolean("fs." + uri.getScheme() + ".impl.disable.cache", false); diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index b85a69c..c2e0c98 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -64,7 +64,7 @@ /** - * Implemention of shims against Hadoop 0.20 with Security. + * Implementation of shims against Hadoop 0.20 with Security. */ public class Hadoop20SShims extends HadoopShimsSecure { @@ -493,6 +493,16 @@ public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext con } @Override + public Object getCredentials(JobContext context) { + return context.getCredentials(); + } + + @Override + public JobID getJobID(JobContext context) { + return context.getJobID(); + } + + @Override public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOException { boolean origDisableHDFSCache = conf.getBoolean("fs." + uri.getScheme() + ".impl.disable.cache", false); diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index e98b54d..15a968b 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; -import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -78,7 +77,7 @@ import com.google.common.collect.Iterables; /** - * Implemention of shims against Hadoop 0.23.0. + * Implementation of shims against Hadoop 0.23.0. */ public class Hadoop23Shims extends HadoopShimsSecure { @@ -701,6 +700,16 @@ public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext con } @Override + public Object getCredentials(JobContext context) { + return context.getCredentials(); + } + + @Override + public JobID getJobID(JobContext context) { + return context.getJobID(); + } + + @Override public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOException { return FileSystem.newInstance(uri, conf); } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index eefd5e5..879aaba 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -665,6 +665,10 @@ public void setFullFileStatus(Configuration conf, HdfsFileStatus sourceStatus, */ public Configuration getConfiguration(JobContext context); + public Object getCredentials(JobContext context); + + public JobID getJobID(JobContext context); + public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOException; public void getMergedCredentials(JobConf jobConf) throws IOException;