Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
2.1.0
-
None
-
None
Description
In current DAGScheduler handleTaskCompletion code, when event.reason is Success, it will first do stage.pendingPartitions -= task.partitionId, which maybe a bug when FetchFailed happens. Think about below:
1. There are 2 executors A and B, executorA got assigned with ShuffleMapTask1 and ShuffleMapTask2;
2. ShuffleMapTask1 want's to fetch blocks from local but failed;
3. Driver receive the FetchFailed caused by ShuffleMapTask1 on executorA and mark executorA as lost and update failedEpoch;
4. Driver resubmit stages, containing ShuffleMapTask1x and ShuffleMapTask2x;
5. ShuffleMapTask2 is successfully finished on executorA and send Success back to driver;
6. Driver receives Success and do stage.pendingPartitions -= task.partitionId, but then driver finds task's epoch is not big enough <= failedEpoch(execId) and just take it as bogus, do not add the MapStatus to stage;
7. ShuffleMapTask1x is successfully finished on executorB;
8. Driver receives Success from ShuffleMapTask1x on executorB and do stage.pendingPartitions -= task.partitionId, thus no pending partitions, but then finds not all partitions are available because of step 6;
9. Driver resubmit stage; but at this moment ShuffleMapTask2x is still running; in TaskSchedulerImpl submitTasks, it finds conflictingTaskSet, then throw IllegalStateException
10. Failed.
To reproduce the bug:
1. We need to do some modification in ShuffleBlockFetcherIterator: check whether the task's index in TaskSetManager and stage attempt equal to 0 at the same time, if so, throw FetchFailedException;
2. Rebuild spark then submit following job:
val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 3), (2, 1), (3, 1)), 2) rdd.reduceByKey { (v1, v2) => { Thread.sleep(10000) v1 + v2 } }.map { keyAndValue => { (keyAndValue._1 % 2, keyAndValue._2) } }.reduceByKey { (v1, v2) => { Thread.sleep(10000) v1 + v2 } }.collect
Attachments
Issue Links
- is related to
-
SPARK-19263 DAGScheduler should avoid sending conflicting task set.
- Resolved