Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java (revision ) +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java (revision ) @@ -0,0 +1,129 @@ +package org.apache.hive.hcatalog.mapreduce; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; + +/** + * Singleton Registry to track the commit of TaskAttempts. + * Used to manage commits for Tasks that create dynamic-partitions. + */ +public class TaskCommitContextRegistry { + + private static final Logger LOG = LoggerFactory.getLogger(TaskCommitContextRegistry.class); + + private static TaskCommitContextRegistry ourInstance = new TaskCommitContextRegistry(); + + /** + * Singleton instance getter. + */ + public static TaskCommitContextRegistry getInstance() { + return ourInstance; + } + + /** + * Implement this interface to register call-backs for committing TaskAttempts. + */ + public static interface TaskCommitterProxy { + + /** + * Call-back for Committer's abortTask(). + */ + public void abortTask(TaskAttemptContext context) throws IOException; + + /** + * Call-back for Committer's abortTask(). + */ + public void commitTask(TaskAttemptContext context) throws IOException; + } + + private HashMap taskCommitters + = new HashMap(); + + /** + * Trigger commit for TaskAttempt, as specified by the TaskAttemptContext argument. + */ + public synchronized void commitTask(TaskAttemptContext context) throws IOException { + String key = generateKey(context); + if (!taskCommitters.containsKey(key)) { + throw new IOException("No callback registered for TaskAttemptID:" + key); + } + + try { + LOG.info("Committing TaskAttempt:" + key); + taskCommitters.get(key).commitTask(context); + } + catch (Throwable t) { + throw new IOException("Could not clean up TaskAttemptID:" + key, t); + } + + } + + private String generateKey(TaskAttemptContext context) throws IOException { + String jobInfoString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); + if (StringUtils.isBlank(jobInfoString)) { // Avoid the NPE. + throw new IOException("Could not retrieve OutputJobInfo for TaskAttempt " + context.getTaskAttemptID()); + } + OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(jobInfoString); + return context.getTaskAttemptID().toString() + "@" + jobInfo.getLocation(); + } + + /** + * Trigger abort for TaskAttempt, as specified by the TaskAttemptContext argument. + */ + public synchronized void abortTask(TaskAttemptContext context) throws IOException { + String key = generateKey(context); + if (!taskCommitters.containsKey(key)) { + throw new IOException("No callback registered for TaskAttemptID:" + key); + } + + try { + LOG.info("Aborting TaskAttempt:" + key); + taskCommitters.get(key).abortTask(context); + } + catch (Throwable t) { + throw new IOException("Could not clean up TaskAttemptID:" + key, t); + } + } + + /** + * Method to register call-backs to control commits and aborts of TaskAttempts. + * @param context The TaskAttemptContext instance for the task-attempt, identifying the output. + * @param committer Instance of TaskCommitterProxy, to commit/abort a TaskAttempt. + * @throws java.io.IOException On failure. + */ + public synchronized void register(TaskAttemptContext context, TaskCommitterProxy committer) throws IOException { + String key = generateKey(context); + LOG.info("Registering committer for TaskAttemptID:" + key); + if (taskCommitters.containsKey(key)) { + LOG.warn("Replacing previous committer:" + committer); + } + taskCommitters.put(key, committer); + } + + /** + * Method to discard the committer call-backs for a specified TaskAttemptID. + * @param context The TaskAttemptContext instance for the task-attempt, identifying the output. + * @throws java.io.IOException On failure. + */ + public synchronized void discardCleanupFor(TaskAttemptContext context) throws IOException { + String key = generateKey(context); + LOG.info("Discarding all cleanup for TaskAttemptID:" + key); + if (!taskCommitters.containsKey(key)) { + LOG.warn("No committer registered for TaskAttemptID:" + key); + } + else { + taskCommitters.remove(key); + } + } + + // Hide constructor, for make benefit glorious Singleton. + private TaskCommitContextRegistry() { + } +} Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1635644) +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision ) @@ -118,8 +118,15 @@ public void abortTask(TaskAttemptContext context) throws IOException { if (!dynamicPartitioningUsed) { getBaseOutputCommitter().abortTask(HCatMapRedUtil.createTaskAttemptContext(context)); + } else { + try { + TaskCommitContextRegistry.getInstance().abortTask(context); - } + } + finally { + TaskCommitContextRegistry.getInstance().discardCleanupFor(context); - } + } + } + } @Override public void commitTask(TaskAttemptContext context) throws IOException { @@ -127,8 +134,15 @@ //See HCATALOG-499 FileOutputFormatContainer.setWorkOutputPath(context); getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context)); + } else { + try { + TaskCommitContextRegistry.getInstance().commitTask(context); - } + } + finally { + TaskCommitContextRegistry.getInstance().discardCleanupFor(context); - } + } + } + } @Override public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { @@ -136,7 +150,7 @@ return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtil.createTaskAttemptContext(context)); } else { // called explicitly through FileRecordWriterContainer.close() if dynamic - return false by default - return false; + return true; } } Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java (revision 1635644) +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java (revision ) @@ -27,11 +27,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.Reporter; @@ -44,14 +42,16 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatException; -import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.HCatRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Record writer container for tables using dynamic partitioning. See * {@link FileOutputFormatContainer} for more information */ class DynamicPartitionFileRecordWriterContainer extends FileRecordWriterContainer { + private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionFileRecordWriterContainer.class); private final List dynamicPartCols; private int maxDynamicPartitions; @@ -97,14 +97,33 @@ // TaskInputOutput. bwriter.close(reporter); } - for (Map.Entry entry : baseDynamicCommitters - .entrySet()) { - org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey()); - OutputCommitter baseOutputCommitter = entry.getValue(); - if (baseOutputCommitter.needsTaskCommit(currContext)) { - baseOutputCommitter.commitTask(currContext); + + TaskCommitContextRegistry.getInstance().register(context, new TaskCommitContextRegistry.TaskCommitterProxy() { + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + for (Map.Entry outputJobInfoEntry : dynamicOutputJobInfo.entrySet()) { + String dynKey = outputJobInfoEntry.getKey(); + OutputJobInfo outputJobInfo = outputJobInfoEntry.getValue(); + LOG.info("Aborting task-attempt for " + outputJobInfo.getLocation()); + baseDynamicCommitters.get(dynKey) + .abortTask(dynamicContexts.get(dynKey)); - } - } + } + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + for (Map.Entry outputJobInfoEntry : dynamicOutputJobInfo.entrySet()) { + String dynKey = outputJobInfoEntry.getKey(); + OutputJobInfo outputJobInfo = outputJobInfoEntry.getValue(); + LOG.info("Committing task-attempt for " + outputJobInfo.getLocation()); + TaskAttemptContext dynContext = dynamicContexts.get(dynKey); + OutputCommitter dynCommitter = baseDynamicCommitters.get(dynKey); + if (dynCommitter.needsTaskCommit(dynContext)) { + dynCommitter.commitTask(dynContext); + } + } + } + }); } @Override