Details
Description
When using multiple named outputs with different Key/Value Schemas, the last provided schema overrides any previous schema definitions after first write attempt. This happens due to issue with the following code in AvroMultipleOutputs.java:509
/begin/
Job job = new Job(context.getConfiguration());
...
setSchema(job, keySchema, valSchema);
taskContext = createTaskAttemptContext(
job.getConfiguration(), context.getTaskAttemptID());
/end/
Every time this code runs, actual configuration instance passed to createTaskAttemptContext remains the same, because Job constructor creates new configuration copy only if it is not instanceof JobConf. This way we have properties "avro.schema.output.XXX" overwrote each time new TaskAttemptContext is initialised and also mistakenly shared Configuration instance for all TaskAttemptContextes
Proposed fix:
a) use "Job getInstance(Configuration conf)" or
b) call "new Job(new Configuration(context.getConfiguration))"