Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-41187

[Core] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.2, 3.2.0, 3.1.3, 3.3.1
    • 3.3.2, 3.4.0
    • Spark Core
    • None


      We have a long running thriftserver, which we found memory leak happened. One of the memory leak is like below.

      The event queue size in our prod env is set to very large to avoid message drop, but we still find the message drop in log. And the event processing time is very long , event is accumulated in queue.

      In heap dump we found LiveExecutor  instances number is also become very huge. After check the heap dump, Finally we found the reason. 

      The reason is:

      For a shuffle map stage tasks, if a executor lost happen,  the finished task will be resubmitted, and send out a taskEnd Message with reason "Resubmitted" in TaskSetManager.scala, this will cause the  activeTask in AppStatusListner's liveStage become negative

      override def executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit = {
        // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage,
        // and we are not using an external shuffle server which could serve the shuffle outputs.
        // The reason is the next stage wouldn't be able to fetch the data from this dead executor
        // so we would need to rerun these tasks on other executors.
        if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) {
          for ((tid, info) <- taskInfos if info.executorId == execId) {
            val index = info.index
            // We may have a running task whose partition has been marked as successful,
            // this partition has another task completed in another stage attempt.
            // We treat it as a running task and will call handleFailedTask later.
            if (successful(index) && !info.running && !killedByOtherAttempt.contains(tid)) {
              successful(index) = false
              copiesRunning(index) -= 1
              tasksSuccessful -= 1
              // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
              // stage finishes when a total of tasks.size tasks finish.
                tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)


      if liveStage activeTask is negative, it will never be removed, thus cause the executor moved to deadExecutors will never to removed, cause it need to check there is no stage submission less than its remove time before removed. 

      /** Was the specified executor active for any currently live stages? */
      private def isExecutorActiveForLiveStages(exec: LiveExecutor): Boolean = {
        liveStages.values.asScala.exists { stage =>
          stage.info.submissionTime.getOrElse(0L) < exec.removeTime.getTime
      override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
        // remove any dead executors that were not running for any currently active stages
        deadExecutors.retain((execId, exec) => isExecutorActiveForLiveStages(exec))


      Add the corresponding logs in prod env as attachment. The resubmitted task number is equals to the activeTasks in heap dump for that stage.

      Hope I describe it clear, I will create a pull request later,  we just ignore the resubmitted message in AppStatusListener to fix it.


        1. image-20221113232233952.png
          1.66 MB
        2. image-20221113232214179.png
          423 kB
        3. image-2022-11-18-11-09-34-760.png
          592 kB
        4. image-2022-11-18-11-01-57-435.png
          374 kB
        5. image-2022-11-18-10-57-49-230.png
          275 kB



            yimo_yym wineternity
            yimo_yym wineternity
            0 Vote for this issue
            2 Start watching this issue