Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-5259

Do not submit stage until its dependencies map outputs are registered

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.1.1, 1.2.0
    • Fix Version/s: 1.6.0
    • Component/s: Spark Core
    • Labels:
      None
    • Target Version/s:

      Description

      We should track pending tasks by partition ID instead of Task objects.

      Before this, failure & retry could result in a case where a stage got submitted before the map output from its dependencies get registered. This was due to an error in the condition for registering map outputs.

      More complete explanation of the original problem:

      1. while shuffle stage was retry, there may have 2 taskSet running.

      we call the 2 taskSet:taskSet0.0, taskSet0.1, and we know, taskSet0.1 will re-run taskSet0.0's un-complete task

      if taskSet0.0 was run all the task that the taskSet0.1 not complete yet but covered the partitions.

      then stage is Available is true.

        def isAvailable: Boolean = {
          if (!isShuffleMap) {
            true
          } else {
            numAvailableOutputs == numPartitions
          }
        } 
      

      but stage.pending task is not empty, to protect register mapStatus in mapOutputTracker.

      because if task is complete success, pendingTasks is minus Task in reference-level because the task is not override hashcode() and equals()
      pendingTask -= task

      but numAvailableOutputs is according to partitionID.

      here is the testcase to prove:

        test("Make sure mapStage.pendingtasks is set() " +
          "while MapStage.isAvailable is true while stage was retry ") {
          val firstRDD = new MyRDD(sc, 6, Nil)
          val firstShuffleDep = new ShuffleDependency(firstRDD, null)
          val firstShuyffleId = firstShuffleDep.shuffleId
          val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep))
          val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
          val shuffleId = shuffleDep.shuffleId
          val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
          submit(reduceRdd, Array(0, 1))
          complete(taskSets(0), Seq(
            (Success, makeMapStatus("hostB", 1)),
            (Success, makeMapStatus("hostB", 2)),
            (Success, makeMapStatus("hostC", 3)),
            (Success, makeMapStatus("hostB", 4)),
            (Success, makeMapStatus("hostB", 5)),
            (Success, makeMapStatus("hostC", 6))
          ))
          complete(taskSets(1), Seq(
            (Success, makeMapStatus("hostA", 1)),
            (Success, makeMapStatus("hostB", 2)),
            (Success, makeMapStatus("hostA", 1)),
            (Success, makeMapStatus("hostB", 2)),
            (Success, makeMapStatus("hostA", 1))
          ))
          runEvent(ExecutorLost("exec-hostA"))
          runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, null, null))
          runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, null, null))
          runEvent(CompletionEvent(taskSets(1).tasks(0),
            FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"),
            null, null, null, null))
          scheduler.resubmitFailedStages()
          runEvent(CompletionEvent(taskSets(1).tasks(0), Success,
            makeMapStatus("hostC", 1), null, null, null))
          runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
            makeMapStatus("hostC", 1), null, null, null))
          runEvent(CompletionEvent(taskSets(1).tasks(4), Success,
            makeMapStatus("hostC", 1), null, null, null))
          runEvent(CompletionEvent(taskSets(1).tasks(5), Success,
            makeMapStatus("hostB", 2), null, null, null))
          val stage = scheduler.stageIdToStage(taskSets(1).stageId)
          assert(stage.attemptId == 2)
          assert(stage.isAvailable)
          assert(stage.pendingTasks.size == 0)
        }
      
      
      

        Issue Links

          Activity

          Hide
          apachespark Apache Spark added a comment -

          User 'suyanNone' has created a pull request for this issue:
          https://github.com/apache/spark/pull/4055

          Show
          apachespark Apache Spark added a comment - User 'suyanNone' has created a pull request for this issue: https://github.com/apache/spark/pull/4055
          Hide
          apachespark Apache Spark added a comment -

          User 'squito' has created a pull request for this issue:
          https://github.com/apache/spark/pull/7699

          Show
          apachespark Apache Spark added a comment - User 'squito' has created a pull request for this issue: https://github.com/apache/spark/pull/7699
          Hide
          rxin Reynold Xin added a comment -

          I have retargeted this and downgraded it from Blocker to Critical since it's been there for a while and not a regression.

          Show
          rxin Reynold Xin added a comment - I have retargeted this and downgraded it from Blocker to Critical since it's been there for a while and not a regression.
          Hide
          irashid Imran Rashid added a comment -

          Issue resolved by pull request 7699
          https://github.com/apache/spark/pull/7699

          Show
          irashid Imran Rashid added a comment - Issue resolved by pull request 7699 https://github.com/apache/spark/pull/7699
          Hide
          xukun xukun added a comment -

          squito SuYan Would it be possible to backport this to branch 1.5?

          Show
          xukun xukun added a comment - squito SuYan Would it be possible to backport this to branch 1.5?

            People

            • Assignee:
              SuYan SuYan
              Reporter:
              SuYan SuYan
            • Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development