Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Incomplete
-
2.4.4
-
None
Description
Steps to reproduce the bug:
1. Spark speculation is enabled.
2. For a running task X.0, a speculative task X.1 is launched.
3. Then this speculative task X.1 failed by FetchFailedException, and successful(index) is set to true
def TaskSetManager#handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason) { ... val failureException: Option[Throwable] = reason match { case fetchFailed: FetchFailed => logWarning(failureReason) if (!successful(index)) { successful(index) = true tasksSuccessful += 1 } ...
4. When the origin running task X.0 finished successfully, it found successful(index) == true, then output log: Ignoring task-finished event ....
def TaskSetManager#handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { ... if (!successful(index)) { tasksSuccessful += 1 logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" + s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" + s" ($tasksSuccessful/$numTasks)") // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { isZombie = true } } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } ...
But task X.0 should output log: {{Finished task X.0 in stage .... }}
Relevant spark driver logs as follows:
20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at NativeMethodAccessorImpl.java:0) with 2 output partitions 20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) 20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0) 20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents 20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1)) 20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2 tasks 20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes) 20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes) 20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 32491 ms on 9.179.143.4 (executor 1) (1/2) 20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 40544 ms on 9.76.13.26 (executor 2) (2/2) 20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) finished in 40.854 s 20/01/02 11:22:26 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 20/01/02 11:22:26 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0), which has no missing parents 20/01/02 11:22:26 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1)) 20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 1.0 with 2 tasks 20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 9.179.143.4, executor 1, partition 0, NODE_LOCAL, 7929 bytes) 20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 9.76.13.26, executor 2, partition 1, NODE_LOCAL, 7929 bytes) 20/01/02 11:22:26 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 79 ms on 9.179.143.4 (executor 1) (1/2) 20/01/02 11:22:26 INFO TaskSetManager: Marking task 1 in stage 1.0 (on 9.76.13.26) as speculatable because it ran more than 158 ms 20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.1 in stage 1.0 (TID 4, 9.179.143.52, executor 3, partition 1, ANY, 7929 bytes) 20/01/02 11:22:26 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 4, 9.179.143.52, executor 3): FetchFailed(BlockManagerId(1, 9.179.143.4, 7337, None), shuffleId=0, mapId=0, reduceId=1, message=org.apache.spark.shuffle.FetchFailedException: Connection reset by peer) 20/01/02 11:22:26 INFO TaskSetManager: Task 1.1 in stage 1.0 (TID 4) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded). 20/01/02 11:22:26 INFO DAGScheduler: Marking ResultStage 1 (main at NativeMethodAccessorImpl.java:0) as failed due to a fetch failure from ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) 20/01/02 11:22:26 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) failed in 0.261 s due to org.apache.spark.shuffle.FetchFailedException: Connection reset by peer 20/01/02 11:22:26 INFO DAGScheduler: Resubmitting ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) and ResultStage 1 (main at NativeMethodAccessorImpl.java:0) due to fetch failure 20/01/02 11:22:26 INFO DAGScheduler: Resubmitting failed stages 20/01/02 11:22:26 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents 20/01/02 11:22:26 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 0.1 with 1 tasks 20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 0.1 (TID 5, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes) // NOTE: Here should be "INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 10000 ms on 9.76.13.26 (executor 2) (2/2)" 20/01/02 11:22:36 INFO TaskSetManager: Ignoring task-finished event for 1.0 in stage 1.0 because task 1 has already completed successfully 20/01/02 11:22:36 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 20/01/02 11:22:36 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) finished in 10.131 s 20/01/02 11:22:36 INFO DAGScheduler: Job 0 finished: main at NativeMethodAccessorImpl.java:0, took 51.031212 s 20/01/02 11:22:58 INFO TaskSetManager: Finished task 0.0 in stage 0.1 (TID 5) in 32029 ms on 9.179.143.4 (executor 1) (1/1) 20/01/02 11:22:58 INFO YarnClusterScheduler: Removed TaskSet 0.1, whose tasks have all completed, from pool
Attachments
Issue Links
- links to