Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23423

Application declines any offers when killed+active executors rich spark.dynamicAllocation.maxExecutors

    XMLWordPrintableJSON

Details

    Description

      Hi

      Mesos Version:1.1.0

      I've noticed rather strange behavior of MesosCoarseGrainedSchedulerBackend when running on Mesos with dynamic allocation on and limiting number of max executors by spark.dynamicAllocation.maxExecutors.

      Suppose we have long running driver that has cyclic pattern of resource consumption(with some idle times in between), due to dyn.allocation it receives offers and then releases them after current chunk of work processed.

      Since at https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L573 the backend compares numExecutors < executorLimit and 

      numExecutors is defined as slaves.values.map(_.taskIDs.size).sum and slaves holds all slaves ever "met", i.e. both active and killed (see comment https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L122) 

      On the other hand, number of taskIds should be updated due to statusUpdate, but suppose this update is lost(actually I don't see logs of 'is now TASK_KILLED') so this number of executors might be wrong

       

      I've created test that "reproduces" this behavior, not sure how good it is:

      //MesosCoarseGrainedSchedulerBackendSuite
      test("max executors registered stops to accept offers when dynamic allocation enabled") {
        setBackend(Map(
          "spark.dynamicAllocation.maxExecutors" -> "1",
          "spark.dynamicAllocation.enabled" -> "true",
          "spark.dynamicAllocation.testing" -> "true"))
      
        backend.doRequestTotalExecutors(1)
      
        val (mem, cpu) = (backend.executorMemory(sc), 4)
      
        val offer1 = createOffer("o1", "s1", mem, cpu)
        backend.resourceOffers(driver, List(offer1).asJava)
        verifyTaskLaunched(driver, "o1")
      
        backend.doKillExecutors(List("0"))
        verify(driver, times(1)).killTask(createTaskId("0"))
      
        val offer2 = createOffer("o2", "s2", mem, cpu)
        backend.resourceOffers(driver, List(offer2).asJava)
        verify(driver, times(1)).declineOffer(offer2.getId)
      }

       

       

      Workaround: Don't set maxExecutors with dynamicAllocation on

       

      Please advice

      Igor

      marking you friends since you were last to touch this piece of code and probably can advice something(vanzin, skonto, susanxhuynh)

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              igor.berman Igor Berman
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: