When a fetch failure occurs, the DAGScheduler re-launches the previous stage (to re-generate output that was missing), and then re-launches all tasks in the stage with the fetch failure that hadn't completed when the fetch failure occurred (the DAGScheduler re-lanches all of the tasks whose output data is not available – which is equivalent to the set of tasks that hadn't yet completed).
The assumption when this code was originally written was that when a fetch failure occurred, the output from at least one of the tasks in the previous stage was no longer available, so all of the tasks in the current stage would eventually fail due to not being able to access that output. This assumption does not hold for some large-scale, long-running workloads. E.g., there's one use case where a job has ~100k tasks that each run for about 1 hour, and only the first 5-10 minutes are spent fetching data. Because of the large number of tasks, it's very common to see a few tasks fail in the fetch phase, and it's wasteful to re-run other tasks that had finished fetching data so aren't affected by the fetch failure (and may be most of the way through their hour-long execution). The DAGScheduler should not re-start these tasks.