Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-5945

Spark should not retry a stage infinitely on a FetchFailedException

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.6.0
    • Component/s: Spark Core
    • Labels:
      None
    • Target Version/s:

      Description

      While investigating SPARK-5928, I noticed some very strange behavior in the way spark retries stages after a FetchFailedException. It seems that on a FetchFailedException, instead of simply killing the task and retrying, Spark aborts the stage and retries. If it just retried the task, the task might fail 4 times and then trigger the usual job killing mechanism. But by killing the stage instead, the max retry logic is skipped (it looks to me like there is no limit for retries on a stage).

      After a bit of discussion with Kay Ousterhout, it seems the idea is that if a fetch fails, we assume that the block manager we are fetching from has failed, and that it will succeed if we retry the stage w/out that block manager. In that case, it wouldn't make any sense to retry the task, since its doomed to fail every time, so we might as well kill the whole stage. But this raises two questions:

      1) Is it really safe to assume that a FetchFailedException means that the BlockManager has failed, and ti will work if we just try another one? SPARK-5928 shows that there are at least some cases where that assumption is wrong. Even if we fix that case, this logic seems brittle to the next case we find. I guess the idea is that this behavior is what gives us the "R" in RDD ... but it seems like its not really that robust and maybe should be reconsidered.

      2) Should stages only be retried a limited number of times? It would be pretty easy to put in a limited number of retries per stage. Though again, we encounter issues with keeping things resilient. Theoretically one stage could have many retries, but due to failures in different stages further downstream, so we might need to track the cause of each retry as well to still have the desired behavior.

      In general it just seems there is some flakiness in the retry logic. This is the only reproducible example I have at the moment, but I vaguely recall hitting other cases of strange behavior w/ retries when trying to run long pipelines. Eg., if one executor is stuck in a GC during a fetch, the fetch fails, but the executor eventually comes back and the stage gets retried again, but the same GC issues happen the second time around, etc.

      Copied from SPARK-5928, here's the example program that can regularly produce a loop of stage failures. Note that it will only fail from a remote fetch, so it can't be run locally – I ran with MASTER=yarn-client spark-shell --num-executors 2 --executor-memory 4000m

          val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =>
            val n = 3e3.toInt
            val arr = new Array[Byte](n)
            //need to make sure the array doesn't compress to something small
            scala.util.Random.nextBytes(arr)
            arr
          }
          rdd.map { x => (1, x)}.groupByKey().count()
      

        Issue Links

          Activity

          Hide
          SuYan SuYan added a comment -

          I encounter stage retry infinitely when a executor lost because a Spark bug which I already fix in SPARK-5259

          For solve stage retry, I add a retry limit.

          case FetchFailed{
              ....
              if (disallowStageRetryForTest) {
                    abortStage(failedStage, "Fetch failure will not retry stage due to testing config")
                  } else if (failedStage.attemptId >= maxStageFailures) {
                    abortStage(failedStage, s"Fetch failure will not retry stage" +
                      " due to reach to max Failure times: " + maxStageFailures)
                  } else if (failedStages.isEmpty && eventProcessActor != null) {
                    // Don't schedule an event to resubmit failed stages if failed isn't empty, because
             .....
          }
          
          Show
          SuYan SuYan added a comment - I encounter stage retry infinitely when a executor lost because a Spark bug which I already fix in SPARK-5259 For solve stage retry, I add a retry limit. case FetchFailed{ .... if (disallowStageRetryForTest) { abortStage(failedStage, "Fetch failure will not retry stage due to testing config" ) } else if (failedStage.attemptId >= maxStageFailures) { abortStage(failedStage, s "Fetch failure will not retry stage" + " due to reach to max Failure times: " + maxStageFailures) } else if (failedStages.isEmpty && eventProcessActor != null ) { // Don't schedule an event to resubmit failed stages if failed isn't empty, because ..... }
          Hide
          ilganeli Ilya Ganelin added a comment -

          Hi Imran - I'd be happy to tackle this. Could you please assign it to me? Thank you.

          Show
          ilganeli Ilya Ganelin added a comment - Hi Imran - I'd be happy to tackle this. Could you please assign it to me? Thank you.
          Hide
          irashid Imran Rashid added a comment -

          Hi Ilya Ganelin,

          sorry for taking a while to respond. I think the main issue here is not so much just implementing the code (as SuYan already has shown the small required patch). The big issue is figuring out what the desired semantics are (see the questions I listed above), which means just getting feedback from all the required people on this one. But if you want to drive that process, that sounds great, it would really be appreciated!

          Show
          irashid Imran Rashid added a comment - Hi Ilya Ganelin , sorry for taking a while to respond. I think the main issue here is not so much just implementing the code (as SuYan already has shown the small required patch). The big issue is figuring out what the desired semantics are (see the questions I listed above), which means just getting feedback from all the required people on this one. But if you want to drive that process, that sounds great, it would really be appreciated!
          Hide
          apachespark Apache Spark added a comment -

          User 'ilganeli' has created a pull request for this issue:
          https://github.com/apache/spark/pull/5636

          Show
          apachespark Apache Spark added a comment - User 'ilganeli' has created a pull request for this issue: https://github.com/apache/spark/pull/5636
          Hide
          kayousterhout Kay Ousterhout added a comment -

          Commenting here rather than on the github for archiving purposes!

          I took at look at the proposed pull request, and I'd be in favor of a much simpler approach, where for each stage, we track the number of failures (from any cause), and then fail the job once a stage fails 4 times (4, for consistency with the max task failures). If the stage succeeds, we can reset the count to 0, to avoid the potential problem Imran Rashid mentioned for stages that are re-used by many jobs (so the counter would be numConsecutiveFailures or something like that). This can just be added to the Stage class, I think. This is consistent with the approach we use for tasks, where if a task has failed 4 times (for any reason), we abort the stage.

          I also would advocate against adding a configuration parameter for this. I can't imagine a case where someone would want to keep trying after 4 failures, and I think sometimes configuration parameters for things like this lead people to believe they can fix a problem by changing the configuration variable (just up the max number of failures!!) when really there is some bigger underlying issue they should fix. 4 seems to have worked well for tasks, so I'd just use the same default here (and it's always easy to add a configuration variable later on if lots of people say they need it).

          Show
          kayousterhout Kay Ousterhout added a comment - Commenting here rather than on the github for archiving purposes! I took at look at the proposed pull request, and I'd be in favor of a much simpler approach, where for each stage, we track the number of failures (from any cause), and then fail the job once a stage fails 4 times (4, for consistency with the max task failures). If the stage succeeds, we can reset the count to 0, to avoid the potential problem Imran Rashid mentioned for stages that are re-used by many jobs (so the counter would be numConsecutiveFailures or something like that). This can just be added to the Stage class, I think. This is consistent with the approach we use for tasks, where if a task has failed 4 times (for any reason), we abort the stage. I also would advocate against adding a configuration parameter for this. I can't imagine a case where someone would want to keep trying after 4 failures, and I think sometimes configuration parameters for things like this lead people to believe they can fix a problem by changing the configuration variable (just up the max number of failures!!) when really there is some bigger underlying issue they should fix. 4 seems to have worked well for tasks, so I'd just use the same default here (and it's always easy to add a configuration variable later on if lots of people say they need it).
          Hide
          ilganeli Ilya Ganelin added a comment -

          Kay Ousterhout - thanks for the review. If I understand correctly, your suggestion would still address Imran Rashid's second comment since the first stage would always (or mostly succeed), e.g. it wouldn't have N consecutive failures so even if subsequent stages fail, those wouldn't count towards the failure count for this particular stage since it would have been reset when it succeeded.

          Do you have any thoughts on the first comment? Specifically, is retrying a stage likely to succeed at all or is it a waste of effort in the first place?

          Show
          ilganeli Ilya Ganelin added a comment - Kay Ousterhout - thanks for the review. If I understand correctly, your suggestion would still address Imran Rashid 's second comment since the first stage would always (or mostly succeed), e.g. it wouldn't have N consecutive failures so even if subsequent stages fail, those wouldn't count towards the failure count for this particular stage since it would have been reset when it succeeded. Do you have any thoughts on the first comment? Specifically, is retrying a stage likely to succeed at all or is it a waste of effort in the first place?
          Hide
          kayousterhout Kay Ousterhout added a comment -

          When there's a fetch failed exception, what happens is that we mark the corresponding map tasks as failed, and then re-run all of the failed tasks from the previous stage. When that's done, we re-run the stage with the original fetch failed exception – so in the normal case, the stage with the fetch failed exception should succeed the second time.

          Show
          kayousterhout Kay Ousterhout added a comment - When there's a fetch failed exception, what happens is that we mark the corresponding map tasks as failed, and then re-run all of the failed tasks from the previous stage. When that's done, we re-run the stage with the original fetch failed exception – so in the normal case, the stage with the fetch failed exception should succeed the second time.
          Hide
          irashid Imran Rashid added a comment -

          good point about moving the design discussion to jira, thanks Kay.

          Yes, totally agree that we want to retry at least once, its definitely "normal" that in a big cluster, a node will go bad from time-to-time, but the good thing is Spark knows how to recover. I also agree that we shouldn't care about the cause of the failure, just the failure count.

          I totally see your points about the different configuration parameters, but let me just play devil's advocate. yes, configuration parameters are confusing to users, but thats a reason we should have sensible defaults and most users should never need to touch them. That doesn't mean nobody will want them. Tasks and stages are in some ways very different things – tasks are meant to be very small and lightweight, so failing a few extra times is no big deal. But stages can be really big – I would imagine in most cases, you actually might want to fail completely if the stage fails even twice, just because you can waste so much time in stage failure. Then again, there might be the other extreme, with really big clusters and unstable hardware, maybe two failures won't be that big a deal, so some users will want it higher.

          I disagree that its easy to add the config later. Yes, its easy to make the code change. But its hard to deploy the change in a production environment. And I can see this as a parameter that devops team needs to play with for their exact system / workload / SLAs etc. – not sure at all, but I think we just don't know, and so we should leave the door open.

          I also can't see any reason why anyone would want infinite retries – but I'm hesitant (and asked for the change) just b/c of changing from the old behavior. I guess int.maxvalue is close enough if somebody needs it?

          Show
          irashid Imran Rashid added a comment - good point about moving the design discussion to jira, thanks Kay. Yes, totally agree that we want to retry at least once, its definitely "normal" that in a big cluster, a node will go bad from time-to-time, but the good thing is Spark knows how to recover. I also agree that we shouldn't care about the cause of the failure, just the failure count. I totally see your points about the different configuration parameters, but let me just play devil's advocate. yes, configuration parameters are confusing to users, but thats a reason we should have sensible defaults and most users should never need to touch them. That doesn't mean nobody will want them. Tasks and stages are in some ways very different things – tasks are meant to be very small and lightweight, so failing a few extra times is no big deal. But stages can be really big – I would imagine in most cases, you actually might want to fail completely if the stage fails even twice, just because you can waste so much time in stage failure. Then again, there might be the other extreme, with really big clusters and unstable hardware, maybe two failures won't be that big a deal, so some users will want it higher. I disagree that its easy to add the config later. Yes, its easy to make the code change. But its hard to deploy the change in a production environment. And I can see this as a parameter that devops team needs to play with for their exact system / workload / SLAs etc. – not sure at all, but I think we just don't know, and so we should leave the door open. I also can't see any reason why anyone would want infinite retries – but I'm hesitant (and asked for the change) just b/c of changing from the old behavior. I guess int.maxvalue is close enough if somebody needs it?
          Hide
          kayousterhout Kay Ousterhout added a comment -

          I realized there might be a cleaner solution here: I wonder if we should just break the FetchFailedException into two subtypes, one of which is UnrecoverableFetchFailedException. If a shuffle block is too large, there's no point in retrying the stage; we should just fail it. I realized maybe this is what you were alluding to in your earlier comment, when you asked whether it's worth it to retry. It seems like, at the point when we throw the exception, we do know whether it's worth it to retry, and it would be cleaner to just throw an appropriate exception. Thoughts?

          Show
          kayousterhout Kay Ousterhout added a comment - I realized there might be a cleaner solution here: I wonder if we should just break the FetchFailedException into two subtypes, one of which is UnrecoverableFetchFailedException. If a shuffle block is too large, there's no point in retrying the stage; we should just fail it. I realized maybe this is what you were alluding to in your earlier comment, when you asked whether it's worth it to retry. It seems like, at the point when we throw the exception, we do know whether it's worth it to retry, and it would be cleaner to just throw an appropriate exception. Thoughts?
          Hide
          irashid Imran Rashid added a comment -

          I think we want to do something more than just handling the case of blocks that are too large. Putting in a special case to avoid retrying at all in that case would be fine, but to me that is a separate issue, the more important thing to do is to put in a general retry limit. There could be all sorts of other reasons for fetch failures (I just looked into a case where an OOM would kill an executor, which lead to 10 hours of stage retry attempts before somebody manually killed the job).

          Show
          irashid Imran Rashid added a comment - I think we want to do something more than just handling the case of blocks that are too large. Putting in a special case to avoid retrying at all in that case would be fine, but to me that is a separate issue, the more important thing to do is to put in a general retry limit. There could be all sorts of other reasons for fetch failures (I just looked into a case where an OOM would kill an executor, which lead to 10 hours of stage retry attempts before somebody manually killed the job).
          Hide
          kayousterhout Kay Ousterhout added a comment -

          Good point – that makes sense.

          I'm still in favor of not adding a config parameter: I think we're in agreement that it's hard to imagine a case where someone wants this to be more than 4, so making it 4 seems strictly better than the current approach where it is infinite. If people complain or want to configure it to be less, we can always change this in a minor release.

          Show
          kayousterhout Kay Ousterhout added a comment - Good point – that makes sense. I'm still in favor of not adding a config parameter: I think we're in agreement that it's hard to imagine a case where someone wants this to be more than 4, so making it 4 seems strictly better than the current approach where it is infinite. If people complain or want to configure it to be less, we can always change this in a minor release.
          Hide
          ilganeli Ilya Ganelin added a comment -

          So to recap:
          a) Move failure count tracking into Stage
          b) Reset failure count on Stage success, so even if that stage is re-submitted due to failures downstream, we never hit the cap
          c) Remove config parameter.

          Show
          ilganeli Ilya Ganelin added a comment - So to recap: a) Move failure count tracking into Stage b) Reset failure count on Stage success, so even if that stage is re-submitted due to failures downstream, we never hit the cap c) Remove config parameter.
          Hide
          irashid Imran Rashid added a comment -

          Kay Ousterhout can you please clarify – did you want to just hardcode to 4, or did you want to reuse spark.task.maxFailures for stage failures as well?

          Show
          irashid Imran Rashid added a comment - Kay Ousterhout can you please clarify – did you want to just hardcode to 4, or did you want to reuse spark.task.maxFailures for stage failures as well?
          Hide
          kayousterhout Kay Ousterhout added a comment -

          I wanted to hardcode to 4 (totally agree with the sentiment you expressed earlier in this thread, that it doesn't make sense / is very confusing to re-use a config parameter for two different things).

          Show
          kayousterhout Kay Ousterhout added a comment - I wanted to hardcode to 4 (totally agree with the sentiment you expressed earlier in this thread, that it doesn't make sense / is very confusing to re-use a config parameter for two different things).
          Hide
          irashid Imran Rashid added a comment -

          blocked by SPARK-7308 b/c you need to know which attempt is being failed

          Show
          irashid Imran Rashid added a comment - blocked by SPARK-7308 b/c you need to know which attempt is being failed
          Hide
          darabos Daniel Darabos added a comment -

          At the moment we have a ton of these infinite retries. A stage is retried a few dozen times, then its parent goes missing and Spark starts retrying the parent until it also goes missing... We are still debugging the cause of our fetch failures, but I just wanted to mention that if there were a spark.stage.maxFailures option, we would be setting it to 1 at this point.

          Thanks for all the work on this bug. Even if it's not fixed yet, it's very informative.

          Show
          darabos Daniel Darabos added a comment - At the moment we have a ton of these infinite retries. A stage is retried a few dozen times, then its parent goes missing and Spark starts retrying the parent until it also goes missing... We are still debugging the cause of our fetch failures, but I just wanted to mention that if there were a spark.stage.maxFailures option, we would be setting it to 1 at this point. Thanks for all the work on this bug. Even if it's not fixed yet, it's very informative.
          Hide
          rxin Reynold Xin added a comment -

          I have retargeted this and downgraded it from Blocker to Critical since it's been there for a while and not a regression.

          Show
          rxin Reynold Xin added a comment - I have retargeted this and downgraded it from Blocker to Critical since it's been there for a while and not a regression.

            People

            • Assignee:
              ilganeli Ilya Ganelin
              Reporter:
              irashid Imran Rashid
            • Votes:
              4 Vote for this issue
              Watchers:
              19 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development