Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-10042

Use consistent behavior for Internal Accumulators across stage retries

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 1.5.0
    • None
    • Spark Core, Web UI

    Description

      andrewor14

      The internal accumulators introduced in SPARK-8735 aren't counted in a consistent manner during stage retries. Whether the accumulators are counted once or multiple times is very random.

      First a little interlude on how stage failures & retry works. When there is a fetch failure, spark looks at the block manager that it failed to fetch data from, and it assumes none of the data from that BM is available. It fails the stage with the fetch failure, then it goes back to the ShuffleMapStage that produced the data. It looks at which partitions were stored in the failed BM, and it reruns just those partitions. Meanwhile, all currently running tasks for current stage keep running, potentially producing more fetch failures. In fact, some of those tasks can even keep running until the dependent stage has been re-run, and this stage has been restarted. (Yes, this can and does happen under real workloads, and is the cause of a SPARK-8029, a serious failure in real workloads.)

      If Spark has lost multiple BMs (which might mean its lost all the shuffle map output of an earlier stage), there are a few different ways that shuffle map output will get regenerated. Perhaps there will be enough tasks running to trigger fetch failures on all the lost BMs before the earlier stage is restarted, so by the time the stage is re-scheduled, the scheduler knows to rerun all the tasks. Or maybe it only gets a failure on one block manager, so it re-generates the map output for that one block manager, and then on trying the downstream stage, it realizes another block manager is down, and repeats the process, one BM at a time, till everything has been regenerated. Or perhaps as its regenerating the map output from the first failure, the "zombie" tasks from the failed stage that are still running trigger fetch failures from all the other block managers. And then as soon as shuffle map stage is done regenerating data for one BM, it'll immediately regenerate the data for the other lost BMs before trying the downstream stage. (And then there are assorted combinations as well.)

      This means that it is totally unpredictable how many partitions will get rerun for the ShuffleMapStage that was previously successful. Eg., run your example program:

      import org.apache.spark._
      import org.apache.spark.shuffle.FetchFailedException
      
      val data = sc.parallelize(1 to 1e3.toInt, 500).map(identity).groupBy(identity)
      val shuffleHandle = data.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
      
      // Simulate fetch failures
      val mappedData = data.map { case (i, _) =>
        val taskContext = TaskContext.get
        if (taskContext.attemptNumber() == 0 && taskContext.partitionId() == 50) {
          // Cause the post-shuffle stage to fail on its first attempt with a single task failure
          val env = SparkEnv.get
          val bmAddress = env.blockManager.blockManagerId
          val shuffleId = shuffleHandle.shuffleId
          val mapId = 0
          val reduceId = taskContext.partitionId()
          val message = "Simulated fetch failure"
          throw new FetchFailedException(bmAddress, shuffleId, mapId, reduceId, message)
        } else {
          (i, i)
        }
      }
      
      mappedData.reduceByKey ({ _ + _ }, 500).count()
      
      

      with the current condition on resetting the accumulators, that is (stage.internalAccumulators.isEmpty || allPartitions == partitionsToCompute). In local mode all partitions will get re-run. Then try running it with local-cluster[2,1,1024] (which will create two block managers). Here's some example debug output from when I ran it:

      === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
        - all partitions: 0, 1, 2, 3, 4
        - partitions to compute: 0, 1, 2, 3, 4
        - internal accum values:
      === STAGE 0 IS CREATING NEW ACCUMULATORS ===
      === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
        - all partitions: 0, 1, 2, 3, 4
        - partitions to compute: 0, 1, 2, 3, 4
        - internal accum values:
      === STAGE 1 IS CREATING NEW ACCUMULATORS ===
      15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 6, 192.168.1.106): FetchFailed(BlockManagerId(0, 192.168.1.106, 61639), shuffleId=0, mapId=0, reduceId=0, message=
      org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
      ...)
      === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
        - all partitions: 0, 1, 2, 3, 4
        - partitions to compute: 1, 2, 4
        - internal accum values: 0
      === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
        - all partitions: 0, 1, 2, 3, 4
        - partitions to compute: 0, 2, 3, 4
        - internal accum values: 3936
      15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.1 (TID 11, 192.168.1.106): FetchFailed(BlockManagerId(0, 192.168.1.106, 61639), shuffleId=0, mapId=0, reduceId=0, message=
      org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
      ...
      === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
        - all partitions: 0, 1, 2, 3, 4
        - partitions to compute: 2
        - internal accum values: 0
      === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
        - all partitions: 0, 1, 2, 3, 4
        - partitions to compute: 0, 4
        - internal accum values: 7872
      15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.2 (TID 15, 192.168.1.106): FetchFailed(BlockManagerId(1, 192.168.1.106, 61640), shuffleId=0, mapId=0, reduceId=0, message=
      org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
      ...
      === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
        - all partitions: 0, 1, 2, 3, 4
        - partitions to compute: 0, 1, 3, 4
        - internal accum values: 0
      === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
        - all partitions: 0, 1, 2, 3, 4
        - partitions to compute: 0, 1, 2, 3, 4
        - internal accum values: 9840
      === STAGE 1 IS CREATING NEW ACCUMULATORS ===
      ...
      

      As you can see, partitionsToCompute != allPartitions in most cases. For example, in the second submission of stage 1, we would have *double counted* the accumulators for partitions 0,2,3,4. By the third submission of stage 1, we would have *triple counted* partitions 0 & 4. Or then again, we just might reset the values and count singly, as we do in the final iteration you see here.

      I had earlier suggested that we should never reset the value, just initialize it once, and have the value keep increasing. But maybe that isn't what you want – maybe you want to always reset the value? Then the metrics would clearly apply to that one stage attempt alone. In any case, we are stuck with the fact that skipped stages (which come from a shared narrow dependency) do not share the same Stage object, even though they are conceptually the same stage to a user. So retries from skipped stages also suggests that our goal should be for each attempt to have a cleared value for the accumulators, since that is the behavior we're stuck with on retries via a skipped stage in any case. We could either always reset internal accumulators, or have them be a property of the stage *attempt* which just gets intiailized w/ the attempt and never reset.

      Another option would be for the UI to just display the update from each task, rather than the accumulator value at the end of the task

      https://github.com/apache/spark/blob/cf016075a006034c24c5b758edb279f3e151d25d/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L372

      That would make the global value of the accumulator entirely irrelevant. In fact, I'm not certain how to interpret the sum of the memory used in each task. If I have 10K tasks, running in 20 slots, the sum across all 10K tasks is probably over-estimating the memory used by 500x. Its even stranger to report the quartiles of that partial sum as tasks complete. I highly doubt most users will understand what that summary metric means, and even if they did understand, it seems to have very little value.

      (Only using the update from each task would also mean that we wouldn't be using the accumulators to "accumulate" anything, it just becomes the place we happen to cram our per-task metrics.)

      Attachments

        Activity

          People

            Unassigned Unassigned
            irashid Imran Rashid
            Votes:
            1 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: