SPARK-17370 (a patch authored by Eric Liang and reviewed by me), we added logic to the DAGScheduler to mark external shuffle service instances as unavailable upon task failure when the task failure reason was "SlaveLost" and this was known to be caused by worker death. If the Spark Master discovered that a worker was dead then it would notify any drivers with executors on those workers to mark those executors as dead. The linked patch simply piggybacked on this logic to have the executor death notification also imply worker death and to have worker-death-caused-executor-death imply shuffle file loss.
However, there are modes of external shuffle service loss which this mechanism does not detect, leaving the system prone race conditions. Consider the following:
- Spark standalone is configured to run an external shuffle service embedded in the Worker.
- Application has shuffle outputs and executors on Worker A.
- Stage depending on outputs of tasks that ran on Worker A starts.
- All executors on worker A are removed due to dying with exceptions, scaling-down via the dynamic allocation APIs, but not due to worker death. Worker A is still healthy at this point.
- At this point the MapOutputTracker still records map output locations on Worker A's shuffle service. This is expected behavior.
- Worker A dies at an instant where the application has no executors running on it.
- The Master knows that Worker A died but does not inform the driver (which had no executors on that worker at the time of its death).
- Some task from the running stage attempts to fetch map outputs from Worker A but these requests time out because Worker A's shuffle service isn't available.
- Due to other logic in the scheduler, these preventable FetchFailures don't wind up invaliding the now-invalid unavailable map output locations (this is a distinct bug / behavior which I'll discuss in a separate JIRA ticket).
- This behavior leads to several unsuccessful stage reattempts and ultimately to a job failure.
A simple way to address this would be to have the Master explicitly notify drivers of all Worker deaths, even if those drivers don't currently have executors. The Spark Standalone scheduler backend can receive the explicit WorkerLost message and can bubble up the right calls to the task scheduler and DAGScheduler to invalidate map output locations from the now-dead external shuffle service.
This relates to
SPARK-20115 in the sense that both tickets aim to address issues where the external shuffle service is unavailable. The key difference is the mechanism for detection: SPARK-20115 marks the external shuffle service as unavailable whenever any fetch failure occurs from it, whereas the proposal here relies on more explicit signals. This JIRA ticket's proposal is scoped only to Spark Standalone mode. As a compromise, we might be able to consider "all of a single shuffle's outputs lost on a single external shuffle service" following a fetch failure (to be discussed in separate JIRA).