Index: src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java (revision 1334233) +++ src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java (working copy) @@ -37,8 +37,11 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -157,6 +160,8 @@ configurer.addOutputFormat("out1", TextOutputFormat.class, IntWritable.class, Text.class); configurer.addOutputFormat("out2", SequenceFileOutputFormat.class, Text.class, IntWritable.class); + configurer.addOutputFormat("out3", NullOutputFormat.class, Text.class, + IntWritable.class); Path outDir = new Path(workDir.getPath(), job.getJobName()); FileOutputFormat.setOutputPath(configurer.getJob("out1"), new Path(outDir, "out1")); FileOutputFormat.setOutputPath(configurer.getJob("out2"), new Path(outDir, "out2")); @@ -249,19 +254,41 @@ private static class MultiOutWordCountReducer extends Reducer { - private IntWritable result = new IntWritable(); + private IntWritable count = new IntWritable(); @Override - protected void reduce(Text key, Iterable values, Context context) + protected void reduce(Text word, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } - result.set(sum); - MultiOutputFormat.write("out1", result, key, context); - MultiOutputFormat.write("out2", key, result, context); + count.set(sum); + MultiOutputFormat.write("out1", count, word, context); + MultiOutputFormat.write("out2", word, count, context); + MultiOutputFormat.write("out3", word, count, context); } } + private static class NullOutputFormat extends + org.apache.hadoop.mapreduce.lib.output.NullOutputFormat { + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + return new OutputCommitter() { + public void abortTask(TaskAttemptContext taskContext) { } + public void cleanupJob(JobContext jobContext) { } + public void commitJob(JobContext jobContext) { } + public void commitTask(TaskAttemptContext taskContext) { + Assert.fail("needsTaskCommit is false but commitTask was called"); + } + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + public void setupJob(JobContext jobContext) { } + public void setupTask(TaskAttemptContext taskContext) { } + }; + } + } + } Index: src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (revision 1334233) +++ src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (working copy) @@ -467,7 +467,7 @@ } } - private class MultiOutputCommitter extends OutputCommitter { + public class MultiOutputCommitter extends OutputCommitter { private final Map outputCommitters; @@ -516,7 +516,11 @@ public void commitTask(TaskAttemptContext taskContext) throws IOException { for (String alias : outputCommitters.keySet()) { BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); - outputContainer.getBaseCommitter().commitTask(outputContainer.getContext()); + OutputCommitter baseCommitter = outputContainer.getBaseCommitter(); + TaskAttemptContext committerContext = outputContainer.getContext(); + if (baseCommitter.needsTaskCommit(committerContext)) { + baseCommitter.commitTask(committerContext); + } } }