A customer's cluster has a node that goes down while a Spark application is running. (They are running Spark on YARN with the external shuffle service enabled.) An executor is lost (apparently the only one running on the node). This executor lost event is handled in the DAGScheduler, which removes the executor from its BlockManagerMaster. At this point, there is no unregistering of shuffle files for the executor or the node. Soon after, tasks trying to fetch shuffle files output by that executor fail with FetchFailed (because the node is down, there is no NodeManager available to serve shuffle files). By right, such fetch failures should cause the shuffle files for the executor to be unregistered, but they do not.
Due to task failure, the stage is re-attempted. Tasks continue to fail due to fetch failure form the lost executor's shuffle output. This time, since the failed epoch for the executor is higher, the executor is removed again (this doesn't really do anything, the executor was already removed when it was lost) and this time the shuffle output is unregistered.
So it takes two stage attempts instead of one to clear the shuffle output. We get 4 attempts by default. The customer was unlucky and two nodes went down during the stage, i.e., the same problem happened twice. So they used up 4 stage attempts and the stage failed and thus the job.