Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
3.2.2, 3.3.1
Description
When task finished before speculative task has been scheduled by DAGScheduler, then the speculative tasks will be considered as pending and count towards the calculation of number of needed executors, which will lead to request more executors than needed
Background & Reproduce
In one of our production job, we found that ExecutorAllocationManager was holding more executors than needed.
We found it's difficult to reproduce in the test environment. In order to stably reproduce and debug, we temporarily annotated the scheduling code of speculative tasks in TaskSetManager:363 to ensure that the task be completed before the speculative task being scheduled.
// Original code private def dequeueTask( execId: String, host: String, maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = { // Tries to schedule a regular task first; if it returns None, then schedules // a speculative task dequeueTaskHelper(execId, host, maxLocality, false).orElse( dequeueTaskHelper(execId, host, maxLocality, true)) } // Speculative task will never be scheduled private def dequeueTask( execId: String, host: String, maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = { // Tries to schedule a regular task first; if it returns None, then schedules // a speculative task dequeueTaskHelper(execId, host, maxLocality, false) }
Referring to examples in SPARK-30511
You will see when running the last task, we would be hold 38 executors (see attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only 2 tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed.
./bin/spark-shell --master yarn --conf spark.speculation=true --conf spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=20 --conf spark.dynamicAllocation.maxExecutors=1000
val n = 4000 val someRDD = sc.parallelize(1 to n, n) someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => { if (index > 3998) { Thread.sleep(1000 * 1000) } else if (index > 3850) { Thread.sleep(50 * 1000) // Fake running tasks } else { Thread.sleep(100) } Array.fill[Int](1)(1).iterator
I will have a PR ready to fix this issue