Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java (revision ) +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java (revision ) @@ -0,0 +1,117 @@ +package org.apache.hive.hcatalog.mapreduce; + +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; + +/** + * Singleton Registry to track the commit of TaskAttempts. + * Used to manage commits for Tasks that create dynamic-partitions. + */ +public class TaskCommitContextRegistry { + + private static final Logger LOG = LoggerFactory.getLogger(TaskCommitContextRegistry.class); + + private static TaskCommitContextRegistry ourInstance = new TaskCommitContextRegistry(); + + /** + * Singleton instance getter. + */ + public static TaskCommitContextRegistry getInstance() { + return ourInstance; + } + + /** + * Implement this interface to register call-backs for committing TaskAttempts. + */ + public static interface TaskCommitterProxy { + + /** + * Call-back for Committer's abortTask(). + */ + public void abortTask(TaskAttemptContext context) throws IOException; + + /** + * Call-back for Committer's abortTask(). + */ + public void commitTask(TaskAttemptContext context) throws IOException; + } + + private HashMap taskCommitters + = new HashMap(); + + /** + * Trigger commit for TaskAttempt, as specified by the TaskAttemptContext argument. + */ + public synchronized void commitTask(TaskAttemptContext context) throws IOException { + TaskAttemptID id = context.getTaskAttemptID(); + if (!taskCommitters.containsKey(id)) { + throw new IOException("No callback registered for TaskAttemptID:" + id); + } + + try { + LOG.info("Committing TaskAttempt:" + id); + taskCommitters.get(id).commitTask(context); + } + catch (Throwable t) { + throw new IOException("Could not clean up TaskAttemptID:", t); + } + + } + + /** + * Trigger abort for TaskAttempt, as specified by the TaskAttemptContext argument. + */ + public synchronized void abortTask(TaskAttemptContext context) throws IOException { + + TaskAttemptID id = context.getTaskAttemptID(); + if (!taskCommitters.containsKey(id)) { + throw new IOException("No callback registered for TaskAttemptID:" + id); + } + + try { + LOG.info("Aborting TaskAttempt:" + id); + taskCommitters.get(id).abortTask(context); + } + catch (Throwable t) { + throw new IOException("Could not clean up TaskAttemptID:", t); + } + } + + /** + * Method to register call-backs to control commits and aborts of TaskAttempts. + * @param id TaskAttemptID identifying the TaskAttempt. + * @param committer Instance of TaskCommitterProxy, to commit/abort a TaskAttempt. + * @throws java.io.IOException On failure. + */ + public synchronized void register(TaskAttemptID id, TaskCommitterProxy committer) throws IOException { + LOG.info("Registering committer for TaskAttemptID:" + id); + if (taskCommitters.containsKey(id)) { + LOG.warn("Replacing previous committer:" + committer); + } + taskCommitters.put(id, committer); + } + + /** + * Method to discard the committer call-backs for a specified TaskAttemptID. + * @param id TaskAttemptID identifying the TaskAttempt. + * @throws java.io.IOException On failure. + */ + public synchronized void discardCleanupFor(TaskAttemptID id) throws IOException { + LOG.info("Discarding all cleanup for TaskAttemptID:" + id); + if (!taskCommitters.containsKey(id)) { + LOG.warn("No committer registered for TaskAttemptID:" + id); + } + else { + taskCommitters.remove(id); + } + } + + // Hide constructor, for make benefit glorious Singleton. + private TaskCommitContextRegistry() { + } +} Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1631016) +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision ) @@ -118,6 +118,8 @@ public void abortTask(TaskAttemptContext context) throws IOException { if (!dynamicPartitioningUsed) { getBaseOutputCommitter().abortTask(HCatMapRedUtil.createTaskAttemptContext(context)); + } else { + TaskCommitContextRegistry.getInstance().abortTask(context); } } @@ -127,6 +129,8 @@ //See HCATALOG-499 FileOutputFormatContainer.setWorkOutputPath(context); getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context)); + } else { + TaskCommitContextRegistry.getInstance().commitTask(context); } } @@ -136,7 +140,7 @@ return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtil.createTaskAttemptContext(context)); } else { // called explicitly through FileRecordWriterContainer.close() if dynamic - return false by default - return false; + return true; } } Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java (revision 1631016) +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java (revision ) @@ -46,12 +46,15 @@ import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.HCatRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Record writer container for tables using dynamic partitioning. See * {@link FileOutputFormatContainer} for more information */ class DynamicPartitionFileRecordWriterContainer extends FileRecordWriterContainer { + private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionFileRecordWriterContainer.class); private final List dynamicPartCols; private int maxDynamicPartitions; @@ -97,14 +100,24 @@ // TaskInputOutput. bwriter.close(reporter); } - for (Map.Entry entry : baseDynamicCommitters - .entrySet()) { - org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey()); - OutputCommitter baseOutputCommitter = entry.getValue(); - if (baseOutputCommitter.needsTaskCommit(currContext)) { - baseOutputCommitter.commitTask(currContext); + + TaskCommitContextRegistry.getInstance().register(context.getTaskAttemptID(), new TaskCommitContextRegistry.TaskCommitterProxy() { + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + for (OutputJobInfo outputJobInfo : dynamicOutputJobInfo.values()) { + LOG.info("Aborting task-attempt for " + outputJobInfo.getLocation()); + new FileOutputCommitter(new Path(outputJobInfo.getLocation()), context).abortTask(context); - } - } + } + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + for (OutputJobInfo outputJobInfo : dynamicOutputJobInfo.values()) { + LOG.info("Committing task-attempt for " + outputJobInfo.getLocation()); + new FileOutputCommitter(new Path(outputJobInfo.getLocation()), context).commitTask(context); + } + } + }); } @Override