Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-49472

Resubmit the task on executor decommission

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Bug
    • 3.5.0
    • None
    • Spark Core
    • None

    Description

      The task re-run many times and can't complete for a long running stage if
      both spark.decommission.enabled and spark.shuffle.service.enabled are enabled.


      Below is the additional log we added:

      ---
       .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 8 ++++++--
       1 file changed, 6 insertions(+), 2 deletions(-)
      
      diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
      index a1a54daf5f8..5846827c832 100644
      --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
      +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
      @@ -1117,6 +1117,7 @@ private[spark] class TaskSetManager(
       
         /** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */
         override def executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit = {
      +    logInfo(s"Executor lost: execId: $execId, host: $host, reason: $reason.")
           // Re-enqueue any tasks with potential shuffle data loss that ran on the failed executor
           // if this is a shuffle map stage, and we are not using an external shuffle server which
           // could serve the shuffle outputs or the executor lost is caused by decommission (which
      @@ -1148,8 +1149,11 @@ private[spark] class TaskSetManager(
                   //    This shouldn't not happen ideally since TaskSetManager handles executor lost first
                   //    before DAGScheduler. So the map statues for the successful task must be available
                   //    at this moment. keep it here in case the handling order changes.
      -            locationOpt.exists(_.host != host)
      -
      +            val isShuffleMapOutputLoss = locationOpt.exists(_.host != host)
      +            logInfo(s"Is shuffle map output available: partition id: ${info.partitionId}, " +
      +              s"tid: ${tid}, locationOpt: ${locationOpt}, " +
      +              s"isShuffleMapOutputLoss: ${isShuffleMapOutputLoss}.")
      +            isShuffleMapOutputLoss
                 case _ => false
               }
               // We may have a running task whose partition has been marked as successful,
      

      Output:

      24/08/26 16:56:22 INFO YarnClusterSchedulerBackend: Decommission executors: 1608
      24/08/26 16:56:22 INFO YarnClusterSchedulerBackend: Notify executor 1608 to decommission.
      24/08/26 16:56:22 INFO BlockManagerMasterEndpoint: Mark BlockManagers (BlockManagerId(1608, hdc42-mcc10-01-0110-7302-007.company.com, 30502, None)) as being decommissioning.
      24/08/26 16:56:22 INFO ExecutorAllocationManager: Executors 1608 removed due to idle timeout.
      24/08/26 16:56:23 INFO TaskSetManager: Finished task 4992.1 in stage 7.0 (TID 16851) in 807662 ms on hdc42-mcc10-01-0710-4001-001.company.com (executor 1300) (5000/6000)
      24/08/26 16:56:23 INFO TaskSetManager: Finished task 1335.2 in stage 7.0 (TID 16713) in 903010 ms on hdc42-mcc10-01-0110-7303-009.company.com (executor 1141) (5001/6000)
      24/08/26 16:56:23 INFO TaskSetManager: Finished task 2290.1 in stage 7.0 (TID 16573) in 1115189 ms on hdc42-mcc10-01-1110-3305-038.company (executor 568) (5002/6000)
      24/08/26 16:56:23 INFO TaskSetManager: Finished task 349.1 in stage 7.0 (TID 16916) in 777120 ms on hdc42-mcc10-01-0110-5803-003.company.com (executor 1345) (5003/6000)
      24/08/26 16:56:23 INFO YarnAllocator: Driver requested a total number of 499 executor(s) for resource profile id: 0.
      24/08/26 16:56:24 INFO YarnClusterScheduler: Executor 1608 on hdc42-mcc10-01-0110-7302-007.company.com is decommissioned after 1.3 s.
      24/08/26 16:56:24 INFO TaskSetManager: Executor lost: execId: 1608, host: hdc42-mcc10-01-0110-7302-007.company.com, reason: Executor decommission: spark scale down.
      24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: partition id: 3749, tid: 10302, locationOpt: Some(BlockManagerId(1608, hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: false.
      24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3749), so marking it as still running.
      24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: partition id: 763, tid: 16636, locationOpt: Some(BlockManagerId(1608, hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: false.
      24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 763), so marking it as still running.
      24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: partition id: 2971, tid: 15433, locationOpt: Some(BlockManagerId(1608, hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: false.
      24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 2971), so marking it as still running.
      24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: partition id: 5835, tid: 16587, locationOpt: Some(BlockManagerId(1608, hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: false.
      24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 5835), so marking it as still running.
      24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: partition id: 611, tid: 15118, locationOpt: Some(BlockManagerId(1608, hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: false.
      24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 611), so marking it as still running.
      24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: partition id: 3750, tid: 10303, locationOpt: Some(BlockManagerId(1608, hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: false.
      24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3750), so marking it as still running.
      24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: partition id: 3740, tid: 13610, locationOpt: Some(BlockManagerId(1608, hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss: false.
      24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3740), so marking it as still running.
      24/08/26 16:56:24 INFO DAGScheduler: Executor lost: 1608 (epoch 1)
      24/08/26 16:56:24 INFO BlockManagerMasterEndpoint: Trying to remove executor 1608 from BlockManagerMaster.
      

      Attachments

        1. stage.png
          120 kB
          Yuming Wang
        2. task.png
          384 kB
          Yuming Wang

        Issue Links

          Activity

            yumwang Yuming Wang added a comment -

            https://github.com/apache/spark/blob/b2f9aab8a2cabf04ac3267de0670bdd403831390/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L1056-L1057

            Before SPARK-41469, the condition is:

            isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie
            

            After SPARK-41469, the condition is:

            isShuffleMapTasks &&
                  (reason.isInstanceOf[ExecutorDecommission] || !env.blockManager.externalShuffleServiceEnabled) && !isZombie

             

            yumwang Yuming Wang added a comment - https://github.com/apache/spark/blob/b2f9aab8a2cabf04ac3267de0670bdd403831390/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L1056-L1057 Before SPARK-41469 , the condition is: isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie After SPARK-41469 , the condition is: isShuffleMapTasks && (reason.isInstanceOf[ExecutorDecommission] || !env.blockManager.externalShuffleServiceEnabled) && !isZombie  
            Ngone51 wuyi added a comment -

            yumwang  The resubmittion of tasks is the expected behaviour here. When decommission enabled, we expect the entire node to be down along with the external shuffle service. So without the external shuffle service, we'd like to rerun the tasks. The issue seems to be caused by the aggresive spark scale down for a long-running stage. 

            Ngone51 wuyi added a comment - yumwang   The resubmittion of tasks is the expected behaviour here. When decommission enabled, we expect the entire node to be down along with the external shuffle service. So without the external shuffle service, we'd like to rerun the tasks. The issue seems to be caused by the aggresive spark scale down for a long-running stage. 
            yumwang Yuming Wang added a comment -

            Thanks Ngone51. Our workflow is:

            1. Decommission executor
            2. Wait 2 hours, to make downstream consume shuffle data.
            3. Stop NodeManager and apply the patch(JDK related patch or OS related patch), then restart this node. This step takes 0.5 hours.

            We do not clean up the shuffle data in the whole process. We expected to use the previously shuffled data as much as possible. 

            yumwang Yuming Wang added a comment - Thanks Ngone51 . Our workflow is: Decommission executor Wait 2 hours, to make downstream consume shuffle data. Stop NodeManager and apply the patch(JDK related patch or OS related patch), then restart this node. This step takes 0.5 hours. We do not clean up the shuffle data in the whole process. We expected to use the previously shuffled data as much as possible. 
            Ngone51 wuyi added a comment -

            yumwang It sounds like a non-typical usage of decommission to me. We usually expect the entire node to shutdown shortly (e.g., 2 mins for EC2 spot instance interruption) after the executor decommission, which also implies the shuffle data would be lost under decommission if shuffle migration is disabled.

             

             

            Ngone51 wuyi added a comment - yumwang It sounds like a non-typical usage of decommission to me. We usually expect the entire node to shutdown shortly (e.g., 2 mins for EC2 spot instance interruption) after the executor decommission, which also implies the shuffle data would be lost under decommission if shuffle migration is disabled.    
            yumwang Yuming Wang added a comment -

            Thank you Ngone51 . We will fix this internally.

            yumwang Yuming Wang added a comment - Thank you Ngone51 . We will fix this internally.

            People

              Unassigned Unassigned
              yumwang Yuming Wang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: