Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-48394

Cleanup mapIdToMapIndex on mapoutput unregister

    XMLWordPrintableJSON

Details

    Description

      There is only one valid mapstatus for the same mapIndex at the same time in Spark. mapIdToMapIndex should also follows the same rule to avoid chaos.

       

      The issue leads to shuffle fetch failure and the job failure in the end. It happens this way:

      1. Stage A computes the partition P0 by task t1 (TID, a.k.a mapId) on executor e1
      2. Executor Y starts deommission
      3. Executor Y reports false-positve lost to driver during its decommission
      4. Stage B reuse the shuffle dependency with Stage A, and computes the partition P0 again by task t2 on executor e2
      5. When task t2 finishes, we see two items ((t1 -> P0), (t2 -> P0)) for the same paritition in mapIdToMapIndex but only one item (mapStatuses(P0)=MapStatus(t2, e2)) in mapStatuses.
      6. Executor Y starts to migrate task t1's mapstatus (to executor e3 for example) and call updateMapOutput on driver. Regarding to 5), we'd use mapId (i.e., t1) to get mapIndex (i.e., P0) and use P0 to get task t2's mapstatus.

      // updateMapOutputval mapIndex = mapIdToMapIndex.get(mapId)val mapStatusOpt = mapIndex.map(mapStatuses()).flatMap(Option())
        # Task t2's mapstatus's location then would be updated to executor e3 but it's indeed still located on executor e2. This finally leads to the fetch failure in the end.

      Attachments

        Activity

          People

            Ngone51 wuyi
            Ngone51 wuyi
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: