This ticket describes a fault-tolerance edge-case which can cause Spark jobs to fail if a single external shuffle service process reboots and fails to recover the list of registered executors (something which can happen when using YARN if NodeManager recovery is disabled) and the Spark job has a large number of executors per host.
I believe this problem can be worked around today via a change of configurations, but I'm filing this issue to (a) better document this problem, and (b) propose either a change of default configurations or additional DAGScheduler logic to better handle this failure mode.
The external shuffle service process is mostly stateless except for a map tracking the set of registered applications and executors.
When processing a shuffle fetch request, the shuffle services first checks whether the requested block ID's executor is registered; if it's not registered then the shuffle service throws an exception like
and this exception becomes a FetchFailed error in the executor requesting the shuffle block.
In normal operation this error should not occur because executors shouldn't be mis-routing shuffle fetch requests. However, this can happen if the shuffle service crashes and restarts, causing it to lose its in-memory executor registration state. With YARN this state can be recovered from disk if YARN NodeManager recovery is enabled (using the mechanism added in
SPARK-9439), but I don't believe that we perform state recovery in Standalone and Mesos modes (see SPARK-24223).
If state cannot be recovered then map outputs cannot be served (even though the files probably still exist on disk). In theory, this shouldn't cause Spark jobs to fail because we can always redundantly recompute lost / unfetchable map outputs.
However, in practice this can cause total job failures in deployments where the node with the failed shuffle service was running a large number of executors: by default, the DAGScheduler unregisters map outputs only from individual executor whose shuffle blocks could not be fetched (see code), so it can take several rounds of failed stage attempts to fail and clear output from all executors on the faulty host. If the number of executors on a host is greater than the stage retry limit then this can exhaust stage retry attempts and cause job failures.
This "multiple rounds of recomputation to discover all failed executors on a host" problem was addressed by
SPARK-19753, which added a spark.files.fetchFailure.unRegisterOutputOnHost configuration which promotes executor fetch failures into host-wide fetch failures (clearing output from all neighboring executors upon a single failure). However, that configuration is false by default.
I have a few ideas about how we can improve this situation:
- Update the YARN external shuffle service documentation to recommend enabling node manager recovery.
- Consider defaulting spark.files.fetchFailure.unRegisterOutputOnHost to true. This would improve out-of-the-box resiliency for large clusters. The trade-off here is a reduction of efficiency in case there are transient "false positive" fetch failures, but I suspect this case may be unlikely in practice (so the change of default could be an acceptable trade-off). See prior discussion on GitHub.
- Modify DAGScheduler to add special-case handling for "Executor is not registered" exceptions that trigger FetchFailures: if we see this exception then it implies that the shuffle service failed to recover state, implying that all of its prior outputs are effectively unavailable. In this case, it might be safe to unregister all host outputs irrespective of whether the unRegisterOutputOnHost flag is set.
- This might require us to string-match on exceptions (so we can be backwards-compatible with old shuffle services, freeing users from needing to upgrade / restart NMs to pick up this fix).
- I suppose there's the potential for race conditions where the shuffle service restarts and produces new map outputs from freshly-registered executors, only for us to turn around and unnecessarily clear those outputs as part of the cleanup of the pre-shuffle-service-restart outputs. If we assume shuffle service and executor deaths are coupled (i.e. that death of the shuffle service process implies death of all executors, something which I believe is true of both YARN NM and Standalone Worker death) then we could be a bit more precise and invalidate outputs from all dead executors on that host.
I'm open to other suggestions, too.