The scheduler handles node failures by looking for a special FetchFailedException thrown by the shuffle block fetcher. This is handled in Executor and then passed as a special msg back to the driver: https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/core/src/main/scala/org/apache/spark/executor/Executor.scala#L403
However, user code exists in between the shuffle block fetcher and that catch block – it could intercept the exception, wrap it with something else, and throw a different exception. If that happens, spark treats it as an ordinary task failure, and retries the task, rather than regenerating the missing shuffle data. The task eventually is retried 4 times, its doomed to fail each time, and the job is failed.
You might think that no user code should do that – but even sparksql does it:
Here's an example stack trace. This is from Spark 1.6, so the sql code is not the same, but the problem is still there:
17/01/13 19:18:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1983.0 (TID 304851, xxx): org.apache.spark.SparkException: Task failed while writing rows.
Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect to xxx/yyy:zzz
17/01/13 19:19:29 ERROR scheduler.TaskSetManager: Task 0 in stage 1983.0 failed 4 times; aborting job
I think the right fix here is to also set a fetch failure status in the TaskContextImpl, so the executor can check that instead of just one exception.