diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java index 4df912a..d45a3a7 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java @@ -52,6 +52,8 @@ * {@link FileOutputFormatContainer} for more information */ class DynamicPartitionFileRecordWriterContainer extends FileRecordWriterContainer { + public static final String DYN_JOBINFO = "dynamicOutputJobInfo"; + private final List dynamicPartCols; private int maxDynamicPartitions; @@ -97,14 +99,12 @@ public void close(TaskAttemptContext context) throws IOException, InterruptedExc // TaskInputOutput. bwriter.close(reporter); } - for (Map.Entry entry : baseDynamicCommitters - .entrySet()) { - org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey()); - OutputCommitter baseOutputCommitter = entry.getValue(); - if (baseOutputCommitter.needsTaskCommit(currContext)) { - baseOutputCommitter.commitTask(currContext); - } + // postpone the commit task to FileOutputCommitterContainer + ArrayList jobInfoList = new ArrayList(); + for (Map.Entry entry : dynamicOutputJobInfo.entrySet()) { + jobInfoList.add(HCatUtil.serialize(dynamicOutputJobInfo.get(entry.getKey()))); } + context.getConfiguration().set(DYN_JOBINFO, HCatUtil.serialize(jobInfoList)); } @Override diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java index 491da14..0712ef3 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java @@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatException; @@ -118,6 +119,17 @@ public FileOutputCommitterContainer(JobContext context, public void abortTask(TaskAttemptContext context) throws IOException { if (!dynamicPartitioningUsed) { getBaseOutputCommitter().abortTask(HCatMapRedUtil.createTaskAttemptContext(context)); + } else { + String jobInfoStr = context.getConfiguration().get(DynamicPartitionFileRecordWriterContainer.DYN_JOBINFO); + if (jobInfoStr != null){ + ArrayList jobInfoList = (ArrayList)HCatUtil.deserialize(jobInfoStr); + org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context); + for (String jobStr : jobInfoList) { + OutputJobInfo localJobInfo = (OutputJobInfo)HCatUtil.deserialize(jobStr); + FileOutputCommitter committer = new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext); + committer.abortTask(currTaskContext); + } + } } } @@ -127,6 +139,17 @@ public void commitTask(TaskAttemptContext context) throws IOException { //See HCATALOG-499 FileOutputFormatContainer.setWorkOutputPath(context); getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context)); + } else { + String jobInfoStr = context.getConfiguration().get(DynamicPartitionFileRecordWriterContainer.DYN_JOBINFO); + if (jobInfoStr != null) { + ArrayList jobInfoList = (ArrayList)HCatUtil.deserialize(jobInfoStr); + org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context); + for (String jobStr : jobInfoList) { + OutputJobInfo localJobInfo = (OutputJobInfo)HCatUtil.deserialize(jobStr); + FileOutputCommitter committer = new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext); + committer.commitTask(currTaskContext); + } + } } }