We recently had a scenario where a race condition occurred when a task from previous stage attempt just finished before new attempt for the same stage was created due to fetch failure, so the new task created in the second attempt on the same partition id was retrying multiple times due to TaskCommitDenied Exception without realizing that the task in earlier attempt was already successful.
For example, consider a task with partition id 9000 and index 9000 running in stage 4.0. We see a fetch failure so thus, we spawn a new stage attempt 4.1. Just within this timespan, the above task completes successfully, thus, marking the partition id 9000 as complete for 4.0. However, as stage 4.1 has not yet been created, the taskset info for that stage is not available to the TaskScheduler so, naturally, the partition id 9000 has not been marked completed for 4.1. Stage 4.1 now spawns task with index 2000 on the same partition id 9000. This task fails due to CommitDeniedException and since, it does not see the corresponding partition id as been marked successful, it keeps retrying multiple times until the job finally succeeds. It doesn't cause any job failures because the DAG scheduler is tracking the partitions separate from the task set managers.
Steps to Reproduce:
- Run any large job involving shuffle operation.
- When the ShuffleMap stage finishes and the ResultStage begins running, cause this stage to throw a fetch failure exception(Try deleting certain shuffle files on any host).
- Observe the task attempt numbers for the next stage attempt. Please note that this issue is an intermittent one, so it might not happen all the time.