DAGScheduler should not launch multiple concurrent attempts for one stage on fetch failures

      When there is a fetch failure, DAGScheduler is supposed to fail the stage, retry the necessary portions of the preceding shuffle stage which generated the shuffle data, and eventually rerun the stage.

      We generally expect to get multiple fetch failures together, but only want to re-start the stage once. The code already makes an attempt to address this https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1108 .

             // It is likely that we receive multiple FetchFailed for a single stage (because we have
              // multiple tasks running concurrently on different executors). In that case, it is possible
              // the fetch failure has already been handled by the scheduler.
              if (runningStages.contains(failedStage)) {

      However, this logic is flawed because the stage may have been *resubmitted* by the time we get these fetch failures. In that case, runningStages.contains(failedStage) will be true, but we've already handled these failures.

      This results in multiple concurrent non-zombie attempts for one stage. In addition to being very confusing, and a waste of resources, this also can lead to later stages being submitted before the previous stage has registered its map output. This happens because

      (a) when one attempt finishes all its tasks, it may not register its map output because the stage still has pending tasks, from other attempts https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1046

                  if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {

      and (b) submitStage thinks the following stage is ready to go, because getMissingParentStages thinks the stage is complete as long it has all of its map outputs: https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L397

                      if (!mapStage.isAvailable) {
                        missing += mapStage

      So the following stage is submitted repeatedly, but it is doomed to fail because its shuffle output has never been registered with the map output tracker. Here's an example failure in this case:

      WARN TaskSetManager: Lost task 5.0 in stage 3.2 (TID 294, FetchFailed(null, shuffleId=0, mapId=-1, reduceId=5, message=
      org.apache.spark.shuffle.MetadataFetchFailedException: Missing output locations for shuffle ...

      Note that this is a subset of the problems originally described in SPARK-7308, limited to just the issues effecting the DAGScheduler


