Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
3.4.0
-
None
Description
test("SPARK-XXX") { val conf = new SparkConf().setAppName("test").setMaster("local-cluster[1,4,1024]") sc = new SparkContext(conf) val req = new TaskResourceRequests().cpus(3) val rp = new ResourceProfileBuilder().require(req).build() val res = sc.parallelize(Seq(0, 1), 2).withResources(rp).map { x => Thread.sleep(5000) x * 2 }.collect() assert(res === Array(0, 2)) }
In this test, tasks are supposed to be scheduled in order since each task requires 3 cores but the executor only has 4 cores. However, we noticed 2 tasks are launched concurrently from the logs.
It turns out that we used the TaskResourceProfile (taskCpus=3) of the taskset for task scheduling:
val rpId = taskSet.taskSet.resourceProfileId val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId) val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, conf)
but the ResourceProfile (taskCpus=1) of the executor for updating the free cores in ExecutorData:
val rpId = executorData.resourceProfileId val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) executorData.freeCores -= taskCpus
which results in the inconsistency of the available cores.
Attachments
Issue Links
- is caused by
-
SPARK-39853 Support stage level schedule for standalone cluster when dynamic allocation is disabled
- Resolved
- links to