Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19262

DAGScheduler should handle stage's pendingPartitions properly in handleTaskCompletion.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 2.1.0
    • None
    • Scheduler, Spark Core
    • None

    Description

      In current DAGScheduler handleTaskCompletion code, when event.reason is Success, it will first do stage.pendingPartitions -= task.partitionId, which maybe a bug when FetchFailed happens. Think about below:
      1. There are 2 executors A and B, executorA got assigned with ShuffleMapTask1 and ShuffleMapTask2;
      2. ShuffleMapTask1 want's to fetch blocks from local but failed;
      3. Driver receive the FetchFailed caused by ShuffleMapTask1 on executorA and mark executorA as lost and update failedEpoch;
      4. Driver resubmit stages, containing ShuffleMapTask1x and ShuffleMapTask2x;
      5. ShuffleMapTask2 is successfully finished on executorA and send Success back to driver;
      6. Driver receives Success and do stage.pendingPartitions -= task.partitionId, but then driver finds task's epoch is not big enough <= failedEpoch(execId) and just take it as bogus, do not add the MapStatus to stage;
      7. ShuffleMapTask1x is successfully finished on executorB;
      8. Driver receives Success from ShuffleMapTask1x on executorB and do stage.pendingPartitions -= task.partitionId, thus no pending partitions, but then finds not all partitions are available because of step 6;
      9. Driver resubmit stage; but at this moment ShuffleMapTask2x is still running; in TaskSchedulerImpl submitTasks, it finds conflictingTaskSet, then throw IllegalStateException
      10. Failed.

      To reproduce the bug:
      1. We need to do some modification in ShuffleBlockFetcherIterator: check whether the task's index in TaskSetManager and stage attempt equal to 0 at the same time, if so, throw FetchFailedException;
      2. Rebuild spark then submit following job:

          val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 3), (2, 1), (3, 1)), 2)
          rdd.reduceByKey {
            (v1, v2) => {
              Thread.sleep(10000)
              v1 + v2
            }
          }.map {
            keyAndValue => {
              (keyAndValue._1 % 2, keyAndValue._2)
            }
          }.reduceByKey {
            (v1, v2) => {
              Thread.sleep(10000)
              v1 + v2
      
            }
          }.collect
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              jinxing6042@126.com Jin Xing
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: