      I have a simple test case for dynamic allocation on YARN that fails with the following stack trace-

      15/04/16 00:52:14 ERROR Utils: Uncaught exception in thread spark-dynamic-executor-allocation-0
      java.lang.IllegalArgumentException: Attempted to request a negative number of executor(s) -21 from the cluster manager. Please specify a positive number!
      	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338)
      	at org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137)
      	at org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
      	at org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
      	at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
      	at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
      	at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
      	at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
      	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
      	at org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)

      My test is as follows-

      1. Start spark-shell with a single executor.
      2. Run a select count(*) query. The number of executors rises as input size is non-trivial.
      3. After the job finishes, the number of executors falls as most of them become idle.
      4. Rerun the same query again, and the request to add executors fails with the above error. In fact, the job itself continues to run with whatever executors it already has, but it never gets more executors unless the shell is closed and restarted.

      In fact, this error only happens when I configure executorIdleTimeout very small. For eg, I can reproduce it with the following configs-

      spark.dynamicAllocation.executorIdleTimeout     5
      spark.dynamicAllocation.schedulerBacklogTimeout 5

      Although I can simply increase executorIdleTimeout to something like 60 secs to avoid the error, I think this is still a bug to be fixed.

      The root cause seems that numExecutorsPending accidentally becomes negative if executors are killed too aggressively (i.e. executorIdleTimeout is too small) because under that circumstance, the new target # of executors can be smaller than the current # of executors. When that happens, ExecutorAllocationManager ends up trying to add a negative number of executors, which throws an exception.


