Details
Description
FileFormatWriter uses the current time to create a Job ID that is used when calling Hadoop committers. This ID is used to produce task and task attempt IDs used in commits.
The problem is that Spark generates this Job ID in executeTask for every task:
/** Writes data out in a single Spark task. */ private def executeTask( description: WriteJobDescription, sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[InternalRow]): WriteTaskResult = { val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) ...
Because this is called in each task, the Job ID used is not consistent across tasks, which violates the contract expected by Hadoop committers.
If a committer expects identical task IDs across attempts for correctness, this breaks correctness. For example, a Hadoop committer should be able to rename an output file to a path based on the task ID to ensure that only one copy is committed.
We hit this issue when preemption caused a task to die just after the commit operation. The commit coordinator authorized a second task commit because the first did not complete due to preemption.