The Spark’s DAGScheduler currently does not recompute all the lost shuffle blocks on a host when a FetchFailed exception occurs, while fetching shuffle blocks from another executor with external shuffle service enabled. Instead it only recomputes the lost shuffle blocks computed by the executor for which the FetchFailed exception occurred. This works fine for Internal shuffle scenario, where the executors serve their own shuffle blocks and hence only the shuffle blocks for that executor should be considered lost. However, when External Shuffle Service is being used, a FetchFailed exception would mean that the external shuffle service running on that host has become unavailable. This in turn is sufficient to assume that all the shuffle blocks which were managed by the Shuffle service on that host are lost. Therefore, just recomputing the shuffle blocks associated with the particular Executor for which FetchFailed exception occurred is not sufficient. We need to recompute all the shuffle blocks, managed by that service because there could be multiple executors running on that host.
Since not all the shuffle blocks (for all the executors on the host) are recomputed, this causes future attempts of the reduce stage to fail as well because the new tasks scheduled still keep trying to reach the old location of the shuffle blocks (which were not recomputed) and keep throwing further FetchFailed exceptions. This ultimately causes the job to fail, after the reduce stage has been retried 4 times.