Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
None
-
None
Description
While investigating SPARK-5928, I noticed some very strange behavior in the way spark retries stages after a FetchFailedException. It seems that on a FetchFailedException, instead of simply killing the task and retrying, Spark aborts the stage and retries. If it just retried the task, the task might fail 4 times and then trigger the usual job killing mechanism. But by killing the stage instead, the max retry logic is skipped (it looks to me like there is no limit for retries on a stage).
After a bit of discussion with Kay Ousterhout, it seems the idea is that if a fetch fails, we assume that the block manager we are fetching from has failed, and that it will succeed if we retry the stage w/out that block manager. In that case, it wouldn't make any sense to retry the task, since its doomed to fail every time, so we might as well kill the whole stage. But this raises two questions:
1) Is it really safe to assume that a FetchFailedException means that the BlockManager has failed, and ti will work if we just try another one? SPARK-5928 shows that there are at least some cases where that assumption is wrong. Even if we fix that case, this logic seems brittle to the next case we find. I guess the idea is that this behavior is what gives us the "R" in RDD ... but it seems like its not really that robust and maybe should be reconsidered.
2) Should stages only be retried a limited number of times? It would be pretty easy to put in a limited number of retries per stage. Though again, we encounter issues with keeping things resilient. Theoretically one stage could have many retries, but due to failures in different stages further downstream, so we might need to track the cause of each retry as well to still have the desired behavior.
In general it just seems there is some flakiness in the retry logic. This is the only reproducible example I have at the moment, but I vaguely recall hitting other cases of strange behavior w/ retries when trying to run long pipelines. Eg., if one executor is stuck in a GC during a fetch, the fetch fails, but the executor eventually comes back and the stage gets retried again, but the same GC issues happen the second time around, etc.
Copied from SPARK-5928, here's the example program that can regularly produce a loop of stage failures. Note that it will only fail from a remote fetch, so it can't be run locally – I ran with MASTER=yarn-client spark-shell --num-executors 2 --executor-memory 4000m
val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore => val n = 3e3.toInt val arr = new Array[Byte](n) //need to make sure the array doesn't compress to something small scala.util.Random.nextBytes(arr) arr } rdd.map { x => (1, x)}.groupByKey().count()
Attachments
Issue Links
- is blocked by
-
SPARK-6746 Refactor large functions in DAGScheduler to improve readibility
- Closed
-
SPARK-7308 Should there be multiple concurrent attempts for one stage?
- Resolved
- links to