Currently, when there is a fetch failure, you can end up with multiple concurrent attempts for the same stage. Is this intended? At best, it leads to some very confusing behavior, and it makes it hard for the user to make sense of what is going on. At worst, I think this is cause of some very strange errors we've seen errors we've seen from users, where stages start executing before all the dependent stages have completed.
This can happen in the following scenario: there is a fetch failure in attempt 0, so the stage is retried. attempt 1 starts. But, tasks from attempt 0 are still running – some of them can also hit fetch failures after attempt 1 starts. That will cause additional stage attempts to get fired up.
There is an attempt to handle this already https://github.com/apache/spark/blob/16860327286bc08b4e2283d51b4c8fe024ba5006/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1105
but that only checks whether the *stage* is running. It really should check whether that *attempt* is still running, but there isn't enough info to do that.
I'll also post some info on how to reproduce this.