Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-41192

Task finished before speculative task scheduled leads to holding idle executors

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.2.2, 3.3.1
    • 3.4.0
    • Spark Core

    Description

      When task finished before speculative task has been scheduled by DAGScheduler, then the speculative tasks will be considered as pending and count towards the calculation of number of needed executors, which will lead to request more executors than needed

      Background & Reproduce

      In one of our production job, we found that ExecutorAllocationManager was holding more executors than needed. 

      We found it's difficult to reproduce in the test environment. In order to stably reproduce and debug, we temporarily annotated the scheduling code of speculative tasks in TaskSetManager:363 to ensure that the task be completed before the speculative task being scheduled.

      // Original code
      private def dequeueTask(
          execId: String,
          host: String,
          maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = {
        // Tries to schedule a regular task first; if it returns None, then schedules
        // a speculative task
        dequeueTaskHelper(execId, host, maxLocality, false).orElse(
          dequeueTaskHelper(execId, host, maxLocality, true))
      } 
      // Speculative task will never be scheduled
      private def dequeueTask(
          execId: String,
          host: String,
          maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = {
        // Tries to schedule a regular task first; if it returns None, then schedules
        // a speculative task
        dequeueTaskHelper(execId, host, maxLocality, false)
      }  

      Referring to examples in SPARK-30511

      You will see when running the last task, we would be hold 38 executors (see attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only 2 tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed.

      ./bin/spark-shell --master yarn --conf spark.speculation=true --conf spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=20 --conf spark.dynamicAllocation.maxExecutors=1000 
      val n = 4000
      val someRDD = sc.parallelize(1 to n, n)
      someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
      if (index > 3998) {
          Thread.sleep(1000 * 1000)
      } else if (index > 3850) {
          Thread.sleep(50 * 1000) // Fake running tasks
      } else {
          Thread.sleep(100)
      }
      Array.fill[Int](1)(1).iterator

       

      I will have a PR ready to fix this issue

      Attachments

        1. dynamic-executors
          41 kB
          Yazhi Wang
        2. dynamic-log
          49 kB
          Yazhi Wang

        Activity

          People

            toujours33 Yazhi Wang
            toujours33 Yazhi Wang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: