We've seen a bad disk lead to corruption in a shuffle block, which lead to tasks repeatedly failing after fetching the data with an IOException. The tasks get retried, but the same corrupt data gets fetched again, and the tasks keep failing. As there isn't a fetch-failure, the jobs eventually fail, spark never tries to regenerate the shuffle data.
This is the same as
SPARK-4105, but that fix only covered small blocks. There was some discussion during that change about this limitation (https://github.com/apache/spark/pull/15923#discussion_r88756017) and followups to cover larger blocks (which would involve spilling to disk to avoid OOM), but it looks like that never happened.
I can think of a few approaches to this:
1) wrap the shuffle block input stream with another input stream, that converts all exceptions into FetchFailures. This is similar to the fix of
SPARK-4105, but that reads the entire input stream up-front, and instead I'm proposing to do it within the InputStream itself so its streaming and does not have a large memory overhead.
2) Add checksums to shuffle blocks. This was proposed here and abandoned as being too complex.
3) Try to tackle this with blacklisting instead: when there is any failure in a task that is reading shuffle data, assign some "blame" to the source of the shuffle data, and eventually blacklist the source. It seems really tricky to get sensible heuristics for this, though.