Index: src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1298977) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision ) @@ -102,12 +102,11 @@ @Override public void commitTask(TaskAttemptContext context) throws IOException { if (!dynamicPartitioningUsed){ - OutputJobInfo outputJobInfo = HCatOutputFormat.getJobInfo(context); //TODO fix this hack, something wrong with pig //running multiple storers in a single job, the real output dir got overwritten or something //the location in OutputJobInfo is still correct so we'll use that //TestHCatStorer.testMultiPartColsInData() used to fail without this - context.getConfiguration().set("mapred.output.dir",outputJobInfo.getLocation()); + resetMapRedOutputDirFromJobInfo(context.getConfiguration()); getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context)); } } @@ -125,6 +124,9 @@ @Override public void setupJob(JobContext context) throws IOException { if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { + // TODO: Hack! Pig messes up mapred.output.dir, when 2 Storers are used in the same Pig script. + // Workaround: Set mapred.output.dir from OutputJobInfo. + context.getConfiguration().set("mapred.output.dir", jobInfo.getLocation()); getBaseOutputCommitter().setupJob(HCatMapRedUtil.createJobContext(context)); } // in dynamic usecase, called through FileRecordWriterContainer @@ -140,13 +142,16 @@ @Override public void abortJob(JobContext jobContext, State state) throws IOException { org.apache.hadoop.mapred.JobContext - marpedJobContext = HCatMapRedUtil.createJobContext(jobContext); + mapRedJobContext = HCatMapRedUtil.createJobContext(jobContext); if (dynamicPartitioningUsed){ discoverPartitions(jobContext); } if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { - getBaseOutputCommitter().abortJob(marpedJobContext, state); + // TODO: Hack! Pig messes up mapred.output.dir, when 2 Storers are used in the same Pig script. + // Workaround: Set mapred.output.dir from OutputJobInfo. + resetMapRedOutputDirFromJobInfo(mapRedJobContext.getConfiguration()); + getBaseOutputCommitter().abortJob(mapRedJobContext, state); } else if (dynamicPartitioningUsed){ for(JobContext currContext : contextDiscoveredByPath.values()){ @@ -219,6 +224,9 @@ discoverPartitions(jobContext); } if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { + // TODO: Hack! Pig messes up mapred.output.dir, when 2 Storers are used in the same Pig script. + // Workaround: Set mapred.output.dir from OutputJobInfo. + resetMapRedOutputDirFromJobInfo(jobContext.getConfiguration()); getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext)); } // create _SUCCESS FILE if so requested. @@ -256,7 +264,10 @@ if( table.getPartitionKeys().size() == 0 ) { //non partitioned table if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) { + // TODO: Hack! Pig messes up mapred.output.dir, when 2 Storers are used in the same Pig script. + // Workaround: Set mapred.output.dir from OutputJobInfo. + resetMapRedOutputDirFromJobInfo(context.getConfiguration()); - getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context)); + getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context)); } else if (dynamicPartitioningUsed){ for(JobContext currContext : contextDiscoveredByPath.values()){ @@ -688,4 +699,15 @@ } } + /** + * TODO: Clean up this Hack! Resetting mapred.output.dir from OutputJobInfo. + * This works around PIG-2578, where Pig messes up output-directory + * if multiple storers are used in the same pig-script. + * @param config The configuration whose mapred.output.dir is to be reset. + */ + private void resetMapRedOutputDirFromJobInfo(Configuration config) { + String outputLocation = jobInfo.getLocation(); + if (outputLocation != null) + config.set("mapred.output.dir", outputLocation); + } } Index: src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (revision 1298977) +++ src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (revision ) @@ -231,7 +231,6 @@ public RecordWriter, HCatRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - getOutputFormat(context).getOutputCommitter(context).setupJob(context); return getOutputFormat(context).getRecordWriter(context); }