Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.1.0
-
None
Description
In current DAGScheduler handleTaskCompletion code, when event.reason is Success, it will first do stage.pendingPartitions -= task.partitionId, which maybe a bug when FetchFailed happens. Think about below:
- Stage 0 runs and generates shuffle output data.
- Stage 1 reads the output from stage 0 and generates more shuffle data. It has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are launched on executorA.
- ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to the driver. The driver marks executorA as lost and updates failedEpoch;
- The driver resubmits stage 0 so the missing output can be re-generated, and then once it completes, resubmits stage 1 with ShuffleMapTask1x and ShuffleMapTask2x.
- ShuffleMapTask2 (from the original attempt of stage 1) successfully finishes on executorA and sends Success back to driver. This causes DAGScheduler::handleTaskCompletion to remove partition 2 from stage.pendingPartitions (line 1149), but it does not add the partition to the set of output locations (line 1192), because the task’s epoch is less than the failure epoch for the executor (because of the earlier failure on executor A)
- ShuffleMapTask1x successfully finishes on executorB, causing the driver to remove partition 1 from stage.pendingPartitions. Combined with the previous step, this means that there are no more pending partitions for the stage, so the DAGScheduler marks the stage as finished (line 1196). However, the shuffle stage is not available (line 1215) because the completion for ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler resubmits the stage.
- ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks is called for the re-submitted stage, it throws an error, because there’s an existing active task set
To reproduce the bug:
1. We need to do some modification in ShuffleBlockFetcherIterator: check whether the task's index in TaskSetManager and stage attempt equal to 0 at the same time, if so, throw FetchFailedException;
2. Rebuild spark then submit following job:
val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 3), (2, 1), (3, 1)), 2) rdd.reduceByKey { (v1, v2) => { Thread.sleep(10000) v1 + v2 } }.map { keyAndValue => { (keyAndValue._1 % 2, keyAndValue._2) } }.reduceByKey { (v1, v2) => { Thread.sleep(10000) v1 + v2 } }.collect
Attachments
Issue Links
- blocks
-
SPARK-19502 Remove unnecessary code to re-submit stages in the DAGScheduler
- Closed
- is duplicated by
-
SPARK-14658 when executor lost DagScheduer may submit one stage twice even if the first running taskset for this stage is not finished
- Resolved
- relates to
-
SPARK-19262 DAGScheduler should handle stage's pendingPartitions properly in handleTaskCompletion.
- Closed
- links to