Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30511

Spark marks intentionally killed speculative tasks as pending leads to holding idle executors

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.0
    • 3.0.0
    • Scheduler, Spark Core
    • None

    Description

      TL;DR
      When speculative tasks fail/get killed, they are still considered as pending and count towards the calculation of number of needed executors.

      Symptom

      In one of our production job (where it's running 4 tasks per executor), we found that it was holding 6 executors at the end with only 2 tasks running (1 speculative). With more logging enabled, we found the job printed:

      pendingTasks is 0 pendingSpeculativeTasks is 17 totalRunningTasks is 2
      

       while the job only had 1 speculative task running and 16 speculative tasks intentionally killed because of corresponding original tasks had finished.

      An easy repro of the issue (`--conf spark.speculation=true --conf spark.executor.cores=4 --conf spark.dynamicAllocation.maxExecutors=1000` in cluster mode):

      val n = 4000
      val someRDD = sc.parallelize(1 to n, n)
      someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
      if (index < 300 && index >= 150) {
          Thread.sleep(index * 1000) // Fake running tasks
      } else if (index == 300) {
          Thread.sleep(1000 * 1000) // Fake long running tasks
      }
      it.toList.map(x => index + ", " + x).iterator
      }).collect
      

      You will see when running the last task, we would be hold 38 executors (see attachment), which is exactly (152 + 3) / 4 = 38.

      The Bug

      Upon examining the code of pendingSpeculativeTasks

      stageAttemptToNumSpeculativeTasks.map { case (stageAttempt, numTasks) =>
        numTasks - stageAttemptToSpeculativeTaskIndices.get(stageAttempt).map(_.size).getOrElse(0)
      }.sum
      

      where stageAttemptToNumSpeculativeTasks(stageAttempt) is incremented on onSpeculativeTaskSubmitted, but never decremented.  stageAttemptToNumSpeculativeTasks -= stageAttempt is performed on stage completion. This means Spark is marking ended speculative tasks as pending, which leads to Spark to hold more executors that it actually needs!

      I will have a PR ready to fix this issue, along with SPARK-28403 too

       

       

       

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            zebingl Zebing Lin
            zebingl Zebing Lin
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment