diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java index 93a03ad..5685e09 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; @@ -82,8 +83,9 @@ private static volatile HiveClientCache hiveClientCache; public static boolean checkJobContextIfRunningFromBackend(JobContext j) { - if (j.getConfiguration().get("mapred.task.id", "").equals("") && - !("true".equals(j.getConfiguration().get("pig.illustrating")))) { + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(j); + if (configuration.get("mapred.task.id", "").equals("") && + !("true".equals(configuration.get("pig.illustrating")))) { return false; } return true; @@ -632,7 +634,7 @@ public static JobConf getJobConfFromContext(JobContext jobContext) { // we need to convert the jobContext into a jobConf // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat) // begin conversion.. - jobConf = new JobConf(jobContext.getConfiguration()); + jobConf = new JobConf(ShimLoader.getHadoopShims().getConfiguration(jobContext)); // ..end of conversion diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java index 8669ded..6e35fdf 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java @@ -24,10 +24,12 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -65,9 +67,11 @@ public ReaderContext prepareRead() throws HCatException { HCatInputFormat hcif = HCatInputFormat.setInput( job, re.getDbName(), re.getTableName(), re.getFilterString()); ReaderContextImpl cntxt = new ReaderContextImpl(); - cntxt.setInputSplits(hcif.getSplits( - ShimLoader.getHadoopShims().getHCatShim().createJobContext(job.getConfiguration(), null))); - cntxt.setConf(job.getConfiguration()); + HadoopShims shims = ShimLoader.getHadoopShims(); + Configuration configuration = shims.getConfiguration(job); + JobContext jobContext = shims.getHCatShim().createJobContext(configuration, null); + cntxt.setInputSplits(hcif.getSplits(jobContext)); + cntxt.setConf(configuration); return cntxt; } catch (IOException e) { throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java index cead40d..f233c10 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputCommitterContainer.java @@ -21,8 +21,10 @@ import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -92,10 +94,11 @@ public void cleanupJob(JobContext context) throws IOException { //Cancel HCat and JobTracker tokens HiveMetaStoreClient client = null; try { - HiveConf hiveConf = HCatUtil.getHiveConf(context.getConfiguration()); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + HiveConf hiveConf = HCatUtil.getHiveConf(configuration); client = HCatUtil.getHiveClient(hiveConf); String tokenStrForm = client.getTokenStrForm(); - if (tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + if (tokenStrForm != null && configuration.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { client.cancelDelegationToken(tokenStrForm); } } catch (Exception e) { diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java index 3a07b0c..42f05c9 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java @@ -19,6 +19,8 @@ package org.apache.hive.hcatalog.mapreduce; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -27,6 +29,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.HCatRecord; @@ -65,9 +68,11 @@ static synchronized String getOutputName(int partition) { @Override public RecordWriter, HCatRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - String name = getOutputName(context.getTaskAttemptID().getTaskID().getId()); + TaskAttemptID attemptID = ShimLoader.getHadoopShims().getHCatShim().getTaskAttemptID(context); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + String name = getOutputName(attemptID.getTaskID().getId()); return new DefaultRecordWriterContainer(context, - getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()), name, InternalUtil.createReporter(context))); + getBaseOutputFormat().getRecordWriter(null, new JobConf(configuration), name, InternalUtil.createReporter(context))); } @@ -82,7 +87,8 @@ static synchronized String getOutputName(int partition) { @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - return new DefaultOutputCommitterContainer(context, new JobConf(context.getConfiguration()).getOutputCommitter()); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + return new DefaultOutputCommitterContainer(context, new JobConf(configuration).getOutputCommitter()); } /** @@ -92,10 +98,11 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) */ @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); org.apache.hadoop.mapred.OutputFormat, ? super Writable> outputFormat = getBaseOutputFormat(); - JobConf jobConf = new JobConf(context.getConfiguration()); + JobConf jobConf = new JobConf(configuration); outputFormat.checkOutputSpecs(null, jobConf); - HCatUtil.copyConf(jobConf, context.getConfiguration()); + HCatUtil.copyConf(jobConf, configuration); } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultRecordWriterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultRecordWriterContainer.java index 209d7bc..52495bb 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultRecordWriterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultRecordWriterContainer.java @@ -21,10 +21,12 @@ import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -52,13 +54,14 @@ public DefaultRecordWriterContainer(TaskAttemptContext context, org.apache.hadoop.mapred.RecordWriter, ? super Writable> baseRecordWriter) throws IOException, InterruptedException { super(context, baseRecordWriter); - jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); - storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + jobInfo = HCatOutputFormat.getJobInfo(configuration); + storageHandler = HCatUtil.getStorageHandler(configuration, jobInfo.getTableInfo().getStorerInfo()); HCatOutputFormat.configureOutputStorageHandler(context); - serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration()); + serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), configuration); hcatRecordOI = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()); try { - InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo); + InternalUtil.initializeOutputSerDe(serDe, configuration, jobInfo); } catch (SerDeException e) { throw new IOException("Failed to initialize SerDe", e); } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java index 4df912a..eed61a6 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java @@ -25,17 +25,19 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -44,7 +46,6 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatException; -import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.HCatRecord; /** @@ -129,14 +130,16 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context); + JobConf jobConf = ShimLoader.getHadoopShims().getHCatShim().getJobConf(currTaskContext); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(currTaskContext); configureDynamicStorageHandler(currTaskContext, dynamicPartValues); - localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration()); + localJobInfo = HCatBaseOutputFormat.getJobInfo(configuration); // Setup serDe. SerDe currSerDe = - ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf()); + ReflectionUtils.newInstance(storageHandler.getSerDeClass(), jobConf); try { - InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), + InternalUtil.initializeOutputSerDe(currSerDe, configuration, localJobInfo); } catch (SerDeException e) { throw new IOException("Failed to initialize SerDe", e); @@ -145,7 +148,7 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio // create base OutputFormat org.apache.hadoop.mapred.OutputFormat baseOF = ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), - currTaskContext.getJobConf()); + jobConf); // We are skipping calling checkOutputSpecs() for each partition // As it can throw a FileAlreadyExistsException when more than one @@ -158,7 +161,7 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio // Get Output Committer org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = - currTaskContext.getJobConf().getOutputCommitter(); + jobConf.getOutputCommitter(); // Create currJobContext the latest so it gets all the config changes org.apache.hadoop.mapred.JobContext currJobContext = @@ -168,12 +171,17 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio baseOutputCommitter.setupJob(currJobContext); // Recreate to refresh jobConf of currTask context. + TaskAttemptID attemptID = + ShimLoader.getHadoopShims().getHCatShim().getTaskAttemptID(currTaskContext); currTaskContext = HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), - currTaskContext.getTaskAttemptID(), currTaskContext.getProgressible()); + attemptID, currTaskContext.getProgressible()); + + jobConf = ShimLoader.getHadoopShims().getHCatShim().getJobConf(currTaskContext); + configuration = ShimLoader.getHadoopShims().getConfiguration(currTaskContext); // Set temp location. - currTaskContext.getConfiguration().set( + configuration.set( "mapred.work.output.dir", new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext) .getWorkPath().toString()); @@ -181,13 +189,13 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio // Set up task. baseOutputCommitter.setupTask(currTaskContext); - Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir")); + Path parentDir = new Path(configuration.get("mapred.work.output.dir")); Path childPath = new Path(parentDir, FileOutputFormat.getUniqueFile(currTaskContext, "part", "")); RecordWriter baseRecordWriter = - baseOF.getRecordWriter(parentDir.getFileSystem(currTaskContext.getConfiguration()), - currTaskContext.getJobConf(), childPath.toString(), + baseOF.getRecordWriter(parentDir.getFileSystem(configuration), + jobConf, childPath.toString(), InternalUtil.createReporter(currTaskContext)); baseDynamicWriters.put(dynKey, baseRecordWriter); @@ -196,8 +204,10 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio dynamicContexts.put(dynKey, currTaskContext); dynamicObjectInspectors.put(dynKey, InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); + dynamicOutputJobInfo.put(dynKey, - HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration())); + HCatOutputFormat.getJobInfo( + ShimLoader.getHadoopShims().getConfiguration((dynamicContexts.get(dynKey))))); } return new LocalFileWriter(baseDynamicWriters.get(dynKey), dynamicObjectInspectors.get(dynKey), diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java index 491da14..d9ef2d1 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java @@ -99,13 +99,14 @@ public FileOutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter baseCommitter) throws IOException { super(context, baseCommitter); - jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + jobInfo = HCatOutputFormat.getJobInfo(configuration); dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed(); this.partitionsDiscovered = !dynamicPartitioningUsed; - cachedStorageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); + cachedStorageHandler = HCatUtil.getStorageHandler(configuration, jobInfo.getTableInfo().getStorerInfo()); Table table = new Table(jobInfo.getTableInfo().getTable()); - if (dynamicPartitioningUsed && Boolean.valueOf((String)table.getProperty("EXTERNAL")) + if (dynamicPartitioningUsed && Boolean.valueOf(table.getProperty("EXTERNAL")) && jobInfo.getCustomDynamicPath() != null && jobInfo.getCustomDynamicPath().length() > 0) { customDynamicLocationUsed = true; @@ -157,6 +158,9 @@ public void setupTask(TaskAttemptContext context) throws IOException { @Override public void abortJob(JobContext jobContext, State state) throws IOException { + + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(jobContext); + try { if (dynamicPartitioningUsed) { discoverPartitions(jobContext); @@ -168,28 +172,28 @@ public void abortJob(JobContext jobContext, State state) throws IOException { } else if (dynamicPartitioningUsed) { for (JobContext currContext : contextDiscoveredByPath.values()) { try { - new JobConf(currContext.getConfiguration()) + new JobConf(ShimLoader.getHadoopShims().getConfiguration(currContext)) .getOutputCommitter().abortJob(currContext, - state); + state); } catch (Exception e) { throw new IOException(e); } } } Path src; - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext.getConfiguration()); + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(configuration); Path tblPath = new Path(jobInfo.getTableInfo().getTableLocation()); if (dynamicPartitioningUsed) { if (!customDynamicLocationUsed) { src = new Path(getPartitionRootLocation(jobInfo.getLocation(), jobInfo.getTableInfo().getTable() .getPartitionKeysSize())); } else { - src = new Path(getCustomPartitionRootLocation(jobInfo, jobContext.getConfiguration())); + src = new Path(getCustomPartitionRootLocation(jobInfo, configuration)); } } else { src = new Path(jobInfo.getLocation()); } - FileSystem fs = src.getFileSystem(jobContext.getConfiguration()); + FileSystem fs = src.getFileSystem(configuration); // Note fs.delete will fail on Windows. The reason is in OutputCommitter, // Hadoop is still writing to _logs/history. On Linux, OS don't care file is still // open and remove the directory anyway, but on Windows, OS refuse to remove a @@ -215,12 +219,13 @@ private static boolean getOutputDirMarking(Configuration conf) { @Override public void commitJob(JobContext jobContext) throws IOException { + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(jobContext); if (dynamicPartitioningUsed) { discoverPartitions(jobContext); // Commit each partition so it gets moved out of the job work // dir for (JobContext context : contextDiscoveredByPath.values()) { - new JobConf(context.getConfiguration()) + new JobConf(ShimLoader.getHadoopShims().getConfiguration(context)) .getOutputCommitter().commitJob(context); } } @@ -230,11 +235,10 @@ public void commitJob(JobContext jobContext) throws IOException { } registerPartitions(jobContext); // create _SUCCESS FILE if so requested. - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext.getConfiguration()); - if (getOutputDirMarking(jobContext.getConfiguration())) { + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(configuration); + if (getOutputDirMarking(configuration)) { Path outputPath = new Path(jobInfo.getLocation()); - FileSystem fileSys = outputPath.getFileSystem(jobContext - .getConfiguration()); + FileSystem fileSys = outputPath.getFileSystem(configuration); // create a file in the folder to mark it if (fileSys.exists(outputPath)) { Path filePath = new Path(outputPath, @@ -664,9 +668,10 @@ private Path getFinalPath(FileSystem fs, Path file, Path src, * Run to discover dynamic partitions available */ private void discoverPartitions(JobContext context) throws IOException { + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); if (!partitionsDiscovered) { // LOG.info("discover ptns called"); - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(configuration); harProcessor.setEnabled(jobInfo.getHarRequested()); @@ -674,7 +679,7 @@ private void discoverPartitions(JobContext context) throws IOException { int maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); Path loadPath = new Path(jobInfo.getLocation()); - FileSystem fs = loadPath.getFileSystem(context.getConfiguration()); + FileSystem fs = loadPath.getFileSystem(configuration); // construct a path pattern (e.g., /*/*) to find all dynamically generated paths String dynPathSpec = loadPath.toUri().getPath(); @@ -713,10 +718,10 @@ private void discoverPartitions(JobContext context) throws IOException { st.getPath().toString()); } partitionsDiscoveredByPath.put(st.getPath().toString(), fullPartSpec); - JobConf jobConf = (JobConf)context.getConfiguration(); + JobConf jobConf = (JobConf)configuration; JobContext currContext = HCatMapRedUtil.createJobContext( jobConf, - context.getJobID(), + ShimLoader.getHadoopShims().getJobID(context), InternalUtil.createReporter(HCatMapRedUtil.createTaskAttemptContext(jobConf, ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()))); HCatOutputFormat.configureOutputStorageHandler(currContext, jobInfo, fullPartSpec); @@ -739,8 +744,8 @@ private void registerPartitions(JobContext context) throws IOException{ if (dynamicPartitioningUsed){ discoverPartitions(context); } - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); - Configuration conf = context.getConfiguration(); + Configuration conf = ShimLoader.getHadoopShims().getConfiguration(context); + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(conf); Table table = new Table(jobInfo.getTableInfo().getTable()); Path tblPath = new Path(table.getTTable().getSd().getLocation()); FileSystem fs = tblPath.getFileSystem(conf); @@ -973,8 +978,8 @@ private void cancelDelegationTokens(JobContext context) throws IOException{ LOG.info("Cancelling delegation token for the job."); HiveMetaStoreClient client = null; try { - HiveConf hiveConf = HCatUtil - .getHiveConf(context.getConfiguration()); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + HiveConf hiveConf = HCatUtil.getHiveConf(configuration); client = HCatUtil.getHiveClient(hiveConf); // cancel the deleg. tokens that were acquired for this job now that // we are done - we should cancel if the tokens were acquired by @@ -983,8 +988,8 @@ private void cancelDelegationTokens(JobContext context) throws IOException{ // the conf will not be set String tokenStrForm = client.getTokenStrForm(); if (tokenStrForm != null - && context.getConfiguration().get( - HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + && configuration.get( + HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { client.cancelDelegationToken(tokenStrForm); } } catch (MetaException e) { diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java index 1a7595f..2a55dc9 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java @@ -19,10 +19,9 @@ package org.apache.hive.hcatalog.mapreduce; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; @@ -32,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -73,38 +73,39 @@ public FileOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat serde = storageHandler.getSerDeClass(); SerDe sd = (SerDe) ReflectionUtils.newInstance(serde, - context.getConfiguration()); - context.getConfiguration().set("mapred.output.value.class", + configuration); + configuration.set("mapred.output.value.class", sd.getSerializedClass().getName()); RecordWriter, HCatRecord> rw; - if (HCatBaseOutputFormat.getJobInfo(context.getConfiguration()).isDynamicPartitioningUsed()){ + if (HCatBaseOutputFormat.getJobInfo(configuration).isDynamicPartitioningUsed()){ // When Dynamic partitioning is used, the RecordWriter instance initialized here isn't used. Can use null. // (That's because records can't be written until the values of the dynamic partitions are deduced. // By that time, a new local instance of RecordWriter, with the correct output-path, will be constructed.) rw = new DynamicPartitionFileRecordWriterContainer( (org.apache.hadoop.mapred.RecordWriter)null, context); } else { - Path parentDir = new Path(context.getConfiguration().get("mapred.work.output.dir")); - Path childPath = new Path(parentDir,FileOutputFormat.getUniqueName(new JobConf(context.getConfiguration()), "part")); + Path parentDir = new Path(configuration.get("mapred.work.output.dir")); + Path childPath = new Path(parentDir,FileOutputFormat.getUniqueName(new JobConf(configuration), "part")); rw = new StaticPartitionFileRecordWriterContainer( getBaseOutputFormat().getRecordWriter( - parentDir.getFileSystem(context.getConfiguration()), - new JobConf(context.getConfiguration()), + parentDir.getFileSystem(configuration), + new JobConf(configuration), childPath.toString(), InternalUtil.createReporter(context)), context); @@ -114,10 +115,11 @@ public FileOutputFormatContainer(org.apache.hadoop.mapred.OutputFormatmapred abstraction if (outputPath != null) - context.getConfiguration().set("mapred.work.output.dir", + configuration.set("mapred.work.output.dir", new FileOutputCommitter(new Path(outputPath), context).getWorkPath().toString()); } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java index 2a883d6..c0fff72 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java @@ -20,29 +20,20 @@ package org.apache.hive.hcatalog.mapreduce; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.HCatRecord; @@ -73,15 +64,16 @@ public FileRecordWriterContainer( TaskAttemptContext context) throws IOException, InterruptedException { super(context, baseWriter); this.context = context; - jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + jobInfo = HCatOutputFormat.getJobInfo(configuration); storageHandler = - HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo() + HCatUtil.getStorageHandler(configuration, jobInfo.getTableInfo() .getStorerInfo()); - serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration()); + serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), configuration); objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()); try { - InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo); + InternalUtil.initializeOutputSerDe(serDe, configuration, jobInfo); } catch (SerDeException e) { throw new IOException("Failed to inialize SerDe", e); } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java index 33807f5..a2c1b18 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -99,7 +100,7 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema) @Override public List getSplits(JobContext jobContext) throws IOException, InterruptedException { - Configuration conf = jobContext.getConfiguration(); + Configuration conf = ShimLoader.getHadoopShims().getConfiguration(jobContext); //Get the job info from the configuration, //throws exception if not initialized @@ -185,13 +186,12 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema) HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split); PartInfo partitionInfo = hcatSplit.getPartitionInfo(); - JobContext jobContext = taskContext; - Configuration conf = jobContext.getConfiguration(); + Configuration conf = ShimLoader.getHadoopShims().getConfiguration(taskContext); HiveStorageHandler storageHandler = HCatUtil.getStorageHandler( conf, partitionInfo); - JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext); + JobConf jobConf = HCatUtil.getJobConfFromContext(taskContext); Map jobProperties = partitionInfo.getJobProperties(); HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf); diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java index 4f7a74a..a8fe008 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputFormat; @@ -73,18 +74,19 @@ public void checkOutputSpecs(JobContext context */ protected OutputFormat, HCatRecord> getOutputFormat(JobContext context) throws IOException { - OutputJobInfo jobInfo = getJobInfo(context.getConfiguration()); - HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + OutputJobInfo jobInfo = getJobInfo(configuration); + HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(configuration, jobInfo.getTableInfo().getStorerInfo()); // Always configure storage handler with jobproperties/jobconf before calling any methods on it configureOutputStorageHandler(context); if (storageHandler instanceof FosterStorageHandler) { return new FileOutputFormatContainer(ReflectionUtils.newInstance( - storageHandler.getOutputFormatClass(),context.getConfiguration())); + storageHandler.getOutputFormatClass(),configuration)); } else { return new DefaultOutputFormatContainer(ReflectionUtils.newInstance( - storageHandler.getOutputFormatClass(),context.getConfiguration())); + storageHandler.getOutputFormatClass(),configuration)); } } @@ -125,10 +127,10 @@ static void configureOutputStorageHandler( @SuppressWarnings("unchecked") static void configureOutputStorageHandler( JobContext jobContext, List dynamicPartVals) throws IOException { - Configuration conf = jobContext.getConfiguration(); + Configuration conf = ShimLoader.getHadoopShims().getConfiguration(jobContext); try { OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); - HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(jobContext.getConfiguration(),jobInfo.getTableInfo().getStorerInfo()); + HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(conf,jobInfo.getTableInfo().getStorerInfo()); Map partitionValues = jobInfo.getPartitionValues(); String location = jobInfo.getLocation(); diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java index b651cb3..6fb1f02 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.mapreduce; +import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.conf.Configuration; @@ -32,8 +33,10 @@ public class HCatMapRedUtil { public static TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapreduce.TaskAttemptContext context) { - return createTaskAttemptContext(new JobConf(context.getConfiguration()), - org.apache.hadoop.mapred.TaskAttemptID.forName(context.getTaskAttemptID().toString()), + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + String attemptID = ShimLoader.getHadoopShims().getHCatShim().getTaskAttemptID(context).toString(); + return createTaskAttemptContext(new JobConf(configuration), + org.apache.hadoop.mapred.TaskAttemptID.forName(attemptID), Reporter.NULL); } @@ -48,9 +51,10 @@ public static TaskAttemptContext createTaskAttemptContext(JobConf conf, TaskAtte return ShimLoader.getHadoopShims().newTaskAttemptID(jobId, isMap, taskId, id); } public static org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapreduce.JobContext context) { - return createJobContext((JobConf)context.getConfiguration(), - context.getJobID(), - Reporter.NULL); + HadoopShims shims = ShimLoader.getHadoopShims(); + return createJobContext((JobConf) shims.getConfiguration(context), + shims.getJobID(context), + Reporter.NULL); } public static JobContext createJobContext(JobConf conf, org.apache.hadoop.mapreduce.JobID id, Progressable progressable) { diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatRecordReader.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatRecordReader.java index 3ee6157..6bb96ae 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatRecordReader.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatRecordReader.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; @@ -91,9 +92,10 @@ public void initialize(org.apache.hadoop.mapreduce.InputSplit split, baseRecordReader = createBaseRecordReader(hcatSplit, storageHandler, taskContext); createDeserializer(hcatSplit, storageHandler, taskContext); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(taskContext); // Pull the output schema out of the TaskAttemptContext outputSchema = (HCatSchema) HCatUtil.deserialize( - taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA)); + configuration.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA)); if (outputSchema == null) { outputSchema = hcatSplit.getTableSchema(); @@ -103,7 +105,7 @@ public void initialize(org.apache.hadoop.mapreduce.InputSplit split, // TODO This should be passed in the TaskAttemptContext instead dataSchema = hcatSplit.getDataSchema(); - errorTracker = new InputErrorTracker(taskContext.getConfiguration()); + errorTracker = new InputErrorTracker(configuration); } private org.apache.hadoop.mapred.RecordReader createBaseRecordReader(HCatSplit hcatSplit, @@ -120,8 +122,9 @@ public void initialize(org.apache.hadoop.mapreduce.InputSplit split, private void createDeserializer(HCatSplit hcatSplit, HiveStorageHandler storageHandler, TaskAttemptContext taskContext) throws IOException { + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(taskContext); deserializer = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), - taskContext.getConfiguration()); + configuration); try { InternalUtil.initializeDeserializer(deserializer, storageHandler.getConf(), diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java index 9b97939..e0c1eeb 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -176,7 +177,7 @@ private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s) } static Reporter createReporter(TaskAttemptContext context) { - return new ProgressReporter(context); + return ShimLoader.getHadoopShims().getHCatShim().createReporter(context); } /** diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/MultiOutputFormat.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/MultiOutputFormat.java index 69a7345..0559a3d 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/MultiOutputFormat.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/MultiOutputFormat.java @@ -47,7 +47,9 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.hcatalog.common.HCatUtil; import org.slf4j.Logger; @@ -186,10 +188,12 @@ public static JobConfigurer createConfigurer(Job job) { * @return a copy of the JobContext with the alias configuration populated */ public static JobContext getJobContext(String alias, JobContext context) { - String aliasConf = context.getConfiguration().get(getAliasConfName(alias)); - JobContext aliasContext = ShimLoader.getHadoopShims().getHCatShim().createJobContext( - context.getConfiguration(), context.getJobID()); - addToConfig(aliasConf, aliasContext.getConfiguration()); + HadoopShims shims = ShimLoader.getHadoopShims(); + Configuration configuration = shims.getConfiguration(context); + String aliasConf = configuration.get(getAliasConfName(alias)); + JobContext aliasContext = shims.getHCatShim().createJobContext( + configuration, shims.getJobID(context)); + addToConfig(aliasConf, shims.getConfiguration(aliasContext)); return aliasContext; } @@ -201,10 +205,12 @@ public static JobContext getJobContext(String alias, JobContext context) { * @return a copy of the TaskAttemptContext with the alias configuration populated */ public static TaskAttemptContext getTaskAttemptContext(String alias, TaskAttemptContext context) { - String aliasConf = context.getConfiguration().get(getAliasConfName(alias)); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(context); + TaskAttemptID attemptID = ShimLoader.getHadoopShims().getHCatShim().getTaskAttemptID(context); + String aliasConf = configuration.get(getAliasConfName(alias)); TaskAttemptContext aliasContext = ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext( - context.getConfiguration(), context.getTaskAttemptID()); - addToConfig(aliasConf, aliasContext.getConfiguration()); + configuration, attemptID); + addToConfig(aliasConf, ShimLoader.getHadoopShims().getConfiguration(aliasContext)); return aliasContext; } @@ -233,7 +239,8 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted OutputFormat outputFormat = getOutputFormatInstance(aliasContext); outputFormat.checkOutputSpecs(aliasContext); // Copy credentials and any new config added back to JobContext - context.getCredentials().addAll(aliasContext.getCredentials()); + Credentials credentials = (Credentials)ShimLoader.getHadoopShims().getCredentials(context); + credentials.addAll((Credentials) ShimLoader.getHadoopShims().getCredentials(aliasContext)); setAliasConf(alias, context, aliasContext); } } @@ -254,8 +261,10 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOE private static OutputFormat getOutputFormatInstance(JobContext context) { OutputFormat outputFormat; try { - outputFormat = ReflectionUtils.newInstance(context.getOutputFormatClass(), - context.getConfiguration()); + HadoopShims shims = ShimLoader.getHadoopShims(); + outputFormat = ReflectionUtils.newInstance( + shims.getOutputFormatClass(context), + shims.getConfiguration(context)); } catch (ClassNotFoundException e) { throw new IllegalStateException(e); } @@ -263,7 +272,7 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOE } private static String[] getOutputFormatAliases(JobContext context) { - return context.getConfiguration().getStrings(MO_ALIASES); + return ShimLoader.getHadoopShims().getConfiguration(context).getStrings(MO_ALIASES); } /** @@ -282,9 +291,9 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOE * configuration. */ private static void setAliasConf(String alias, JobContext userJob, JobContext aliasContext) { - Configuration userConf = userJob.getConfiguration(); + Configuration userConf = ShimLoader.getHadoopShims().getConfiguration(userJob); StringBuilder builder = new StringBuilder(); - for (Entry conf : aliasContext.getConfiguration()) { + for (Entry conf : ShimLoader.getHadoopShims().getConfiguration(aliasContext)) { String key = conf.getKey(); String value = conf.getValue(); String jobValue = userConf.getRaw(key); @@ -451,7 +460,7 @@ public MultiRecordWriter(TaskAttemptContext context) throws IOException, for (String alias : aliases) { LOGGER.info("Creating record writer for alias: " + alias); TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context); - Configuration aliasConf = aliasContext.getConfiguration(); + Configuration aliasConf = ShimLoader.getHadoopShims().getConfiguration(aliasContext); // Create output directory if not already created. String outDir = aliasConf.get("mapred.output.dir"); if (outDir != null) { diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/ProgressReporter.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/ProgressReporter.java index 40b9bb1..65a6dae 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/ProgressReporter.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/ProgressReporter.java @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +@Deprecated class ProgressReporter extends StatusReporter implements Reporter { private TaskInputOutputContext context = null; diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java index 39ef86e..980bca5 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java @@ -177,7 +177,8 @@ void handleSecurity( // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set void cancelToken(HiveMetaStoreClient client, JobContext context) throws IOException, MetaException { String tokenStrForm = client.getTokenStrForm(); - if (tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + if (tokenStrForm != null && + ShimLoader.getHadoopShims().getConfiguration(context).get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { try { client.cancelDelegationToken(tokenStrForm); } catch (TException e) { diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceInputFormat.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceInputFormat.java index 0c3d707..defba64 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceInputFormat.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceInputFormat.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; @@ -38,14 +39,14 @@ public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - context.setStatus(split.toString()); + ShimLoader.getHadoopShims().getHCatShim().createReporter(context).setStatus(split.toString()); return new RCFileMapReduceRecordReader(); } @Override public List getSplits(JobContext job) throws IOException { - - job.getConfiguration().setLong( + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(job); + configuration.setLong( ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"), SequenceFile.SYNC_INTERVAL); return super.getSplits(job); diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceOutputFormat.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceOutputFormat.java index 64699f2..769ddf0 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceOutputFormat.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceOutputFormat.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; @@ -66,7 +67,8 @@ public static void setColumnNumber(Configuration conf, int columnNum) { FileOutputCommitter committer = (FileOutputCommitter) getOutputCommitter(task); Path outputPath = committer.getWorkPath(); - FileSystem fs = outputPath.getFileSystem(task.getConfiguration()); + Configuration configuration = ShimLoader.getHadoopShims().getConfiguration(task); + FileSystem fs = outputPath.getFileSystem(configuration); if (!fs.exists(outputPath)) { fs.mkdirs(outputPath); @@ -77,10 +79,10 @@ public static void setColumnNumber(Configuration conf, int columnNum) { CompressionCodec codec = null; if (getCompressOutput(task)) { Class codecClass = getOutputCompressorClass(task, DefaultCodec.class); - codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, task.getConfiguration()); + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, configuration); } - final RCFile.Writer out = new RCFile.Writer(fs, task.getConfiguration(), file, task, codec); + final RCFile.Writer out = new RCFile.Writer(fs, configuration, file, task, codec); return new RecordWriter, BytesRefArrayWritable>() { diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceRecordReader.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceRecordReader.java index fd42b5a..28c29ca 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceRecordReader.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceRecordReader.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.RCFile.Reader; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; @@ -104,7 +105,7 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx FileSplit fSplit = (FileSplit) split; Path path = fSplit.getPath(); - Configuration conf = context.getConfiguration(); + Configuration conf = ShimLoader.getHadoopShims().getConfiguration(context); this.in = new RCFile.Reader(path.getFileSystem(conf), path, conf); this.end = fSplit.getStart() + fSplit.getLength(); diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java index 02a13b3..b10a7d5 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java @@ -42,7 +42,6 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -229,7 +228,7 @@ private void writeThenReadByRecordReader(int intervalRecordCount, RCFileMapReduceInputFormat inputFormat = new RCFileMapReduceInputFormat(); Configuration jonconf = new Configuration(cloneConf); jonconf.set("mapred.input.dir", testDir.toString()); - JobContext context = new Job(jonconf); + Job context = new Job(jonconf); context.getConfiguration().setLong( ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMAXSPLITSIZE"), maxSplitSize); List splits = inputFormat.getSplits(context); diff --git shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index d03f270..617792a 100644 --- shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -71,6 +71,7 @@ import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UnixUserGroupInformation; @@ -861,6 +862,22 @@ public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext con } @Override + public Class> getOutputFormatClass(org.apache.hadoop.mapreduce.JobContext context) + throws ClassNotFoundException { + return context.getOutputFormatClass(); + } + + @Override + public Object getCredentials(org.apache.hadoop.mapreduce.JobContext context) { + return null; + } + + @Override + public JobID getJobID(org.apache.hadoop.mapreduce.JobContext context) { + return context.getJobID(); + } + + @Override public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOException { boolean origDisableHDFSCache = conf.getBoolean("fs." + uri.getScheme() + ".impl.disable.cache", false); diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index b85a69c..ebd2bf1 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -43,10 +43,13 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.ClusterStatus; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobInProgress; import org.apache.hadoop.mapred.JobTracker; import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TaskLogServlet; import org.apache.hadoop.mapred.WebHCatJTShim20S; import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; @@ -58,13 +61,14 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.VersionInfo; /** - * Implemention of shims against Hadoop 0.20 with Security. + * Implementation of shims against Hadoop 0.20 with Security. */ public class Hadoop20SShims extends HadoopShimsSecure { @@ -282,6 +286,88 @@ public TaskAttemptContext createTaskAttemptContext(Configuration conf, TaskAttem } @Override + public JobConf getJobConf(org.apache.hadoop.mapred.TaskAttemptContext context) { + return context.getJobConf(); + } + + @Override + public org.apache.hadoop.mapred.TaskAttemptID getTaskAttemptID(org.apache.hadoop.mapred.TaskAttemptContext context) { + return context.getTaskAttemptID(); + } + + @Override + public TaskAttemptID getTaskAttemptID(org.apache.hadoop.mapreduce.TaskAttemptContext context) { + return context.getTaskAttemptID(); + } + + @Override + public Reporter createReporter(TaskAttemptContext tcontext) { + + final TaskInputOutputContext context; + final TaskAttemptContext taskAttemptContext; + + if (tcontext instanceof TaskInputOutputContext) { + context = (TaskInputOutputContext) tcontext; + taskAttemptContext = null; + } else { + context = null; + taskAttemptContext = tcontext; + } + + return new Reporter() { + @Override + public void setStatus(String status) { + if (context != null) { + context.setStatus(status); + } + } + + @Override + public Counters.Counter getCounter(Enum name) { + return context != null ? (Counters.Counter) context.getCounter(name) : null; + } + + @Override + public Counters.Counter getCounter(String group, String name) { + return (context != null) ? (Counters.Counter) context.getCounter(group, name) : null; + } + + @Override + public void incrCounter(Enum key, long amount) { + if (context != null) { + context.getCounter(key).increment(amount); + } + } + + @Override + public void incrCounter(String group, String counter, long amount) { + if (context != null) { + context.getCounter(group, counter).increment(amount); + } + } + + @Override + public InputSplit getInputSplit() throws UnsupportedOperationException { + return null; + } + + @Override + public float getProgress() { + return 0.0f; + } + + @Override + public void progress() { + if (context != null) { + context.progress(); + } else { + taskAttemptContext.progress(); + } + } + }; + } + + @Override public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf, org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) { org.apache.hadoop.mapred.TaskAttemptContext newContext = null; @@ -493,6 +579,22 @@ public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext con } @Override + public Class> getOutputFormatClass(org.apache.hadoop.mapreduce.JobContext context) + throws ClassNotFoundException { + return context.getOutputFormatClass(); + } + + @Override + public Object getCredentials(JobContext context) { + return context.getCredentials(); + } + + @Override + public JobID getJobID(JobContext context) { + return context.getJobID(); + } + + @Override public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOException { boolean origDisableHDFSCache = conf.getBoolean("fs." + uri.getScheme() + ".impl.disable.cache", false); diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index e98b54d..8f4933a 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; -import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -52,6 +51,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.ClusterStatus; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.Reporter; @@ -62,8 +63,10 @@ import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; @@ -78,7 +81,7 @@ import com.google.common.collect.Iterables; /** - * Implemention of shims against Hadoop 0.23.0. + * Implementation of shims against Hadoop 0.23.0. */ public class Hadoop23Shims extends HadoopShimsSecure { @@ -435,6 +438,88 @@ public JobContext createJobContext(Configuration conf, } @Override + public JobConf getJobConf(org.apache.hadoop.mapred.TaskAttemptContext context) { + return context.getJobConf(); + } + + @Override + public org.apache.hadoop.mapred.TaskAttemptID getTaskAttemptID(org.apache.hadoop.mapred.TaskAttemptContext context) { + return context.getTaskAttemptID(); + } + + @Override + public TaskAttemptID getTaskAttemptID(org.apache.hadoop.mapreduce.TaskAttemptContext context) { + return context.getTaskAttemptID(); + } + + @Override + public Reporter createReporter(TaskAttemptContext tcontext) { + + final TaskInputOutputContext context; + final TaskAttemptContext taskAttemptContext; + + if (tcontext instanceof TaskInputOutputContext) { + context = (TaskInputOutputContext) tcontext; + taskAttemptContext = null; + } else { + context = null; + taskAttemptContext = tcontext; + } + + return new Reporter() { + @Override + public void setStatus(String status) { + if (context != null) { + context.setStatus(status); + } + } + + @Override + public Counters.Counter getCounter(Enum name) { + return context != null ? (Counters.Counter) context.getCounter(name) : null; + } + + @Override + public Counters.Counter getCounter(String group, String name) { + return (context != null) ? (Counters.Counter) context.getCounter(group, name) : null; + } + + @Override + public void incrCounter(Enum key, long amount) { + if (context != null) { + context.getCounter(key).increment(amount); + } + } + + @Override + public void incrCounter(String group, String counter, long amount) { + if (context != null) { + context.getCounter(group, counter).increment(amount); + } + } + + @Override + public InputSplit getInputSplit() throws UnsupportedOperationException { + return null; + } + + @Override + public float getProgress() { + return 0.0f; + } + + @Override + public void progress() { + if (context != null) { + context.progress(); + } else { + taskAttemptContext.progress(); + } + } + }; + } + + @Override public void commitJob(OutputFormat outputFormat, Job job) throws IOException { // Do nothing as this was fixed by MAPREDUCE-1447. } @@ -701,6 +786,22 @@ public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext con } @Override + public Class> getOutputFormatClass(org.apache.hadoop.mapreduce.JobContext context) + throws ClassNotFoundException { + return context.getOutputFormatClass(); + } + + @Override + public Object getCredentials(JobContext context) { + return context.getCredentials(); + } + + @Override + public JobID getJobID(JobContext context) { + return context.getJobID(); + } + + @Override public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOException { return FileSystem.newInstance(uri, conf); } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index eefd5e5..f95950a 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -524,6 +524,14 @@ public void setFullFileStatus(Configuration conf, HdfsFileStatus sourceStatus, public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(JobConf conf, org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable); + public org.apache.hadoop.mapred.TaskAttemptID getTaskAttemptID(org.apache.hadoop.mapred.TaskAttemptContext context); + + public JobConf getJobConf(org.apache.hadoop.mapred.TaskAttemptContext context); + + public TaskAttemptID getTaskAttemptID(TaskAttemptContext context); + + public Reporter createReporter(TaskAttemptContext context); + public JobContext createJobContext(Configuration conf, JobID jobId); public org.apache.hadoop.mapred.JobContext createJobContext(JobConf conf, JobID jobId, Progressable progressable); @@ -665,6 +673,13 @@ public void setFullFileStatus(Configuration conf, HdfsFileStatus sourceStatus, */ public Configuration getConfiguration(JobContext context); + public Class> getOutputFormatClass(JobContext context) + throws ClassNotFoundException; + + public Object getCredentials(JobContext context); + + public JobID getJobID(JobContext context); + public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOException; public void getMergedCredentials(JobConf jobConf) throws IOException;