Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30404

Fix wrong log for FetchFailed task's successful speculation

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Incomplete
    • 2.4.4
    • None
    • Spark Core

    Description

      Steps to reproduce the bug:

      1. Spark speculation is enabled.

      2. For a running task X.0, a speculative task X.1 is launched.

      3. Then this speculative task X.1 failed by FetchFailedException, and successful(index) is set to true

        def TaskSetManager#handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason) {
          ...
          val failureException: Option[Throwable] = reason match {
            case fetchFailed: FetchFailed =>
              logWarning(failureReason)
              if (!successful(index)) {
                successful(index) = true
                tasksSuccessful += 1
              }
          ...
      

      4. When the origin running task X.0 finished successfully, it found successful(index) == true, then output log: Ignoring task-finished event ....

        def TaskSetManager#handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
          ...
          if (!successful(index)) {
            tasksSuccessful += 1
            logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" +
              s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" +
              s" ($tasksSuccessful/$numTasks)")
            // Mark successful and stop if all the tasks have succeeded.
            successful(index) = true
            if (tasksSuccessful == numTasks) {
              isZombie = true
            }
          } else {
            logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
              " because task " + index + " has already completed successfully")
          }
          ...
      

      But task X.0 should output log: {{Finished task X.0 in stage .... }}
       

       

      Relevant spark driver logs as follows:

      20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at NativeMethodAccessorImpl.java:0) with 2 output partitions
      20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at NativeMethodAccessorImpl.java:0)
      20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
      20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
      
      20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
      20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1))
      20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2 tasks
      20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
      20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes)
      20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 32491 ms on 9.179.143.4 (executor 1) (1/2)
      20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 40544 ms on 9.76.13.26 (executor 2) (2/2)
      20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) finished in 40.854 s
      20/01/02 11:22:26 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
      
      20/01/02 11:22:26 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
      20/01/02 11:22:26 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1))
      20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 1.0 with 2 tasks
      20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 9.179.143.4, executor 1, partition 0, NODE_LOCAL, 7929 bytes)
      20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 9.76.13.26, executor 2, partition 1, NODE_LOCAL, 7929 bytes)
      20/01/02 11:22:26 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 79 ms on 9.179.143.4 (executor 1) (1/2)
      
      20/01/02 11:22:26 INFO TaskSetManager: Marking task 1 in stage 1.0 (on 9.76.13.26) as speculatable because it ran more than 158 ms
      20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.1 in stage 1.0 (TID 4, 9.179.143.52, executor 3, partition 1, ANY, 7929 bytes)
      20/01/02 11:22:26 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 4, 9.179.143.52, executor 3): FetchFailed(BlockManagerId(1, 9.179.143.4, 7337, None), shuffleId=0, mapId=0, reduceId=1, message=org.apache.spark.shuffle.FetchFailedException: Connection reset by peer)
      20/01/02 11:22:26 INFO TaskSetManager: Task 1.1 in stage 1.0 (TID 4) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).
      20/01/02 11:22:26 INFO DAGScheduler: Marking ResultStage 1 (main at NativeMethodAccessorImpl.java:0) as failed due to a fetch failure from ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0)
      20/01/02 11:22:26 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) failed in 0.261 s due to org.apache.spark.shuffle.FetchFailedException: Connection reset by peer
      20/01/02 11:22:26 INFO DAGScheduler: Resubmitting ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) and ResultStage 1 (main at NativeMethodAccessorImpl.java:0) due to fetch failure
      20/01/02 11:22:26 INFO DAGScheduler: Resubmitting failed stages
      
      20/01/02 11:22:26 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
      20/01/02 11:22:26 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
      20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 0.1 with 1 tasks
      20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 0.1 (TID 5, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
      
      
      
      
      // NOTE: Here should be "INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 10000 ms on 9.76.13.26 (executor 2) (2/2)"
      
      20/01/02 11:22:36 INFO TaskSetManager: Ignoring task-finished event for 1.0 in stage 1.0 because task 1 has already completed successfully
      
      
      
      
      
      20/01/02 11:22:36 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
      20/01/02 11:22:36 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) finished in 10.131 s
      20/01/02 11:22:36 INFO DAGScheduler: Job 0 finished: main at NativeMethodAccessorImpl.java:0, took 51.031212 s
      
      20/01/02 11:22:58 INFO TaskSetManager: Finished task 0.0 in stage 0.1 (TID 5) in 32029 ms on 9.179.143.4 (executor 1) (1/1)
      20/01/02 11:22:58 INFO YarnClusterScheduler: Removed TaskSet 0.1, whose tasks have all completed, from pool
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              liangxs Xuesen Liang
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: