diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java index 4df912a..66d0b69 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java @@ -52,13 +52,13 @@ * {@link FileOutputFormatContainer} for more information */ class DynamicPartitionFileRecordWriterContainer extends FileRecordWriterContainer { + public static final String DYN_JOBINFO = "dynamicOutputJobInfo"; + private final List dynamicPartCols; private int maxDynamicPartitions; private final Map, ? super Writable>> baseDynamicWriters; private final Map baseDynamicSerDe; - private final Map baseDynamicCommitters; - private final Map dynamicContexts; private final Map dynamicObjectInspectors; private Map dynamicOutputJobInfo; @@ -82,8 +82,6 @@ public DynamicPartitionFileRecordWriterContainer( this.baseDynamicSerDe = new HashMap(); this.baseDynamicWriters = new HashMap, ? super Writable>>(); - this.baseDynamicCommitters = new HashMap(); - this.dynamicContexts = new HashMap(); this.dynamicObjectInspectors = new HashMap(); this.dynamicOutputJobInfo = new HashMap(); } @@ -97,14 +95,12 @@ public void close(TaskAttemptContext context) throws IOException, InterruptedExc // TaskInputOutput. bwriter.close(reporter); } - for (Map.Entry entry : baseDynamicCommitters - .entrySet()) { - org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey()); - OutputCommitter baseOutputCommitter = entry.getValue(); - if (baseOutputCommitter.needsTaskCommit(currContext)) { - baseOutputCommitter.commitTask(currContext); - } + // postpone the commit task to FileOutputCommitterContainer + ArrayList jobInfoList = new ArrayList(); + for (Map.Entry entry : dynamicOutputJobInfo.entrySet()) { + jobInfoList.add(HCatUtil.serialize(dynamicOutputJobInfo.get(entry.getKey()))); } + context.getConfiguration().set(DYN_JOBINFO, HCatUtil.serialize(jobInfoList)); } @Override @@ -156,17 +152,10 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio // but may become an issue for cases when the method is used to perform // other setup tasks. - // Get Output Committer - org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = - currTaskContext.getJobConf().getOutputCommitter(); - // Create currJobContext the latest so it gets all the config changes org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currTaskContext); - // Set up job. - baseOutputCommitter.setupJob(currJobContext); - // Recreate to refresh jobConf of currTask context. currTaskContext = HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), @@ -178,9 +167,6 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext) .getWorkPath().toString()); - // Set up task. - baseOutputCommitter.setupTask(currTaskContext); - Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir")); Path childPath = new Path(parentDir, FileOutputFormat.getUniqueFile(currTaskContext, "part", "")); @@ -192,12 +178,10 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio baseDynamicWriters.put(dynKey, baseRecordWriter); baseDynamicSerDe.put(dynKey, currSerDe); - baseDynamicCommitters.put(dynKey, baseOutputCommitter); - dynamicContexts.put(dynKey, currTaskContext); dynamicObjectInspectors.put(dynKey, InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); dynamicOutputJobInfo.put(dynKey, - HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration())); + HCatOutputFormat.getJobInfo(currTaskContext.getConfiguration())); } return new LocalFileWriter(baseDynamicWriters.get(dynKey), dynamicObjectInspectors.get(dynKey), diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java index 491da14..2a4194a 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java @@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatException; @@ -118,6 +119,17 @@ public FileOutputCommitterContainer(JobContext context, public void abortTask(TaskAttemptContext context) throws IOException { if (!dynamicPartitioningUsed) { getBaseOutputCommitter().abortTask(HCatMapRedUtil.createTaskAttemptContext(context)); + } else { + String jobInfoStr = context.getConfiguration().get(DynamicPartitionFileRecordWriterContainer.DYN_JOBINFO); + if (jobInfoStr != null){ + ArrayList jobInfoList = (ArrayList)HCatUtil.deserialize(jobInfoStr); + org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context); + for (String jobStr : jobInfoList) { + OutputJobInfo localJobInfo = (OutputJobInfo)HCatUtil.deserialize(jobStr); + FileOutputCommitter committer = new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext); + committer.abortTask(currTaskContext); + } + } } } @@ -127,6 +139,17 @@ public void commitTask(TaskAttemptContext context) throws IOException { //See HCATALOG-499 FileOutputFormatContainer.setWorkOutputPath(context); getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context)); + } else { + String jobInfoStr = context.getConfiguration().get(DynamicPartitionFileRecordWriterContainer.DYN_JOBINFO); + if (jobInfoStr != null) { + ArrayList jobInfoList = (ArrayList)HCatUtil.deserialize(jobInfoStr); + org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context); + for (String jobStr : jobInfoList) { + OutputJobInfo localJobInfo = (OutputJobInfo)HCatUtil.deserialize(jobStr); + FileOutputCommitter committer = new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext); + committer.commitTask(currTaskContext); + } + } } } @@ -135,8 +158,7 @@ public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { if (!dynamicPartitioningUsed) { return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtil.createTaskAttemptContext(context)); } else { - // called explicitly through FileRecordWriterContainer.close() if dynamic - return false by default - return false; + return true; } }