Index: src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (revision 1411975) +++ src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (working copy) @@ -32,6 +32,7 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapred.HCatMapRedUtil; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.JobContext; @@ -194,18 +195,21 @@ //create base OutputFormat org.apache.hadoop.mapred.OutputFormat baseOF = ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf()); + try { + // HCATALOG-553: Though we are ignoring the FileAlreadyExistsException, we still need it. + // As checkOutputSpecs is also responsible for obtaining the right delegation token + // This call talks to the namenode, so there is scope of optimization herer + baseOF.checkOutputSpecs(null, currTaskContext.getJobConf()); + } catch (FileAlreadyExistsException e) { + // HCATALOG-490: Ignore since a FileAlreadyExistsException can be thrown when more than one mapper is writing + // to the same partition partition. + } + //get Output Committer org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter(); //create currJobContext the latest so it gets all the config changes org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currTaskContext); - //We are skipping calling checkOutputSpecs() for each partition - //As it can throw a FileAlreadyExistsException when more than one mapper is writing to a partition - //See HCATALOG-490, also to avoid contacting the namenode for each new FileOutputFormat instance - //In general this should be ok for most FileOutputFormat implementations - //but may become an issue for cases when the method is used to perform other setup tasks - - //setupJob() baseOutputCommitter.setupJob(currJobContext); //recreate to refresh jobConf of currTask context currTaskContext =