Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
-
None
Description
Accumulator could be undercounted when the retried task has rdd cache. See the example below and you could also find the completed and reproducible example at https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc
test("SPARK-XXX") { // Set up a cluster with 2 executors val conf = new SparkConf() .setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite") sc = new SparkContext(conf) // Set up a custom task scheduler. The scheduler will fail the first task attempt of the job // submitted below. In particular, the failed first attempt task would success on computation // (accumulator accounting, result caching) but only fail to report its success status due // to the concurrent executor lost. The second task attempt would success. taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) val myAcc = sc.longAccumulator("myAcc") // Initiate a rdd with only one partition so there's only one task and specify the storage level // with MEMORY_ONLY_2 so that the rdd result will be cached on both two executors. val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => myAcc.add(100) iter.map(x => x + 1) }.persist(StorageLevel.MEMORY_ONLY_2) // This will pass since the second task attempt will succeed assert(rdd.count() === 10) // This will fail due to `myAcc.add(100)` won't be executed during the second task attempt's // execution. Because the second task attempt will load the rdd cache directly instead of // executing the task function so `myAcc.add(100)` is skipped. assert(myAcc.value === 100) }
We could also hit this issue with decommission even if the rdd only has one copy. For example, decommission could migrate the rdd cache block to another executor (the result is actually the same with 2 copies) and the decommissioned executor lost before the task reports its success status to the driver.
And the issue is a bit more complicated than expected to fix. I have tried to give some fixes but all of them are not ideal:
Option 1: Clean up any rdd cache related to the failed task: in practice, this option can already fix the issue in most cases. However, theoretically, rdd cache could be reported to the driver right after the driver cleans up the failed task's caches due to asynchronous communication. So this option can’t resolve the issue thoroughly;
Option 2: Disallow rdd cache reuse across the task attempts for the same task: this option can 100% fix the issue. The problem is this way can also affect the case where rdd cache can be reused across the attempts (e.g., when there is no accumulator operation in the task), which can have perf regression;
Option 3: Introduce accumulator cache: first, this requires a new framework for supporting accumulator cache; second, the driver should improve its logic to distinguish whether the accumulator cache value should be reported to the user to avoid overcounting. For example, in the case of task retry, the value should be reported. However, in the case of rdd cache reuse, the value shouldn’t be reported (should it?);
Option 4: Do task success validation when a task trying to load the rdd cache: this way defines a rdd cache is only valid/accessible if the task has succeeded. This way could be either overkill or a bit complex (because currently Spark would clean up the task state once it’s finished. So we need to maintain a structure to know if task once succeeded or not. )