Description
Trying to run some Tez MR jobs that write out some data using Parquet to HDFS. When I try to do so, end up seeing a NPE in the Parquet code:
java.lang.NullPointerException at org.apache.hadoop.fs.Path.<init>(Path.java:105) at org.apache.hadoop.fs.Path.<init>(Path.java:94) at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.getDefaultWorkFile(DeprecatedParquetOutputFormat.java:69) at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.access$100(DeprecatedParquetOutputFormat.java:36) at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat$RecordWriterWrapper.<init>(DeprecatedParquetOutputFormat.java:89) at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.getRecordWriter(DeprecatedParquetOutputFormat.java:77) at org.apache.tez.mapreduce.output.MROutput.initialize(MROutput.java:416)
The flow seems to be:
1) The Parquet deprecated output format class tries to read the workOutputPath - https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java#L69
2) This calls FileOutputFormat.getWorkOutputPath(...) - https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputFormat.java#L229
3) That in turn tries to read the JobContext.TASK_OUTPUT_DIR ("mapreduce.task.output.dir") constant.
4) This ends up being null and in the Parquet code we end up with an NPE in the Path class.
Looking at the Tez code, we are setting the workOutputPath in the MROutput.initCommitter method - https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java#L445.
This call however, is made after the call to access the workOutputPath as part of outputFormat.getRecordWriter().
I tried out a run where I moved this initCommitter call up:
else { oldApiTaskAttemptContext = new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl( jobConf, taskAttemptId, new MRTaskReporter(getContext())); initCommitter(jobConf, useNewApi); // before the getRecordWriter call oldOutputFormat = jobConf.getOutputFormat(); outputFormatClassName = oldOutputFormat.getClass().getName(); FileSystem fs = FileSystem.get(jobConf); String finalName = getOutputName(); oldRecordWriter = oldOutputFormat.getRecordWriter( fs, jobConf, finalName, new MRReporter(getContext().getCounters())); }
I tried out a run with this and it seems to succeed. If this sounds reasonable, I can cut a PR.
Attachments
Attachments
Issue Links
- links to