Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30849

Application failed due to failed to get MapStatuses broadcast

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.1.0
    • None
    • Spark Core
    • None

    Description

      Currently, we encountered an issue in Spark2.1. The exception is as follows:

      	Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, most recent failure: Lost task 18.3 in stage 2.0 (TID 13819, xxxx , executor 8): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_9_piece1 of broadcast_9
      java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_9_piece1 of broadcast_9
      	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
      	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
      	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
      	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
      	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
      	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
      	at org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
      	at org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
      	at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
      	at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
      	at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
      	at org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
      	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
      	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
      	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
      

      I looked into the code and the logs, it seems that it's caused by the mapStatuses broadcast id is sent to executor, but was invalidated immediately by the driver before the real fetching of the broadcast.

      This can be described as follows:

      Let's say we have an rdd1,
      rdd2 = rdd1.repartition(100) // stage 0
      rdd3 = rdd2.map(xxx)  // stage 1
      rdd4 = rdd2.map(xxx)  // stage 2
      // and then do some join and output result
      rdd3.join(rdd4).save
      

      When FetchFailedException happened in stage 1, then stage 0 and stage 1 will be resubmitted and re-executed, but stage 2 is still running, it's task will fetch mapStatuses from driver, but the mapStatuses cache will be invalidated when tasks of stage 0.1 completes and registerMapOutput.

      I checked the master branch, seems that we are fixed correctness issues on `repartition`, but I think this issue may still exist?

      Some ScreenShot:

      Attachments

        1. image-2020-02-16-11-13-18-195.png
          222 kB
          liupengcheng
        2. image-2020-02-16-11-17-32-103.png
          255 kB
          liupengcheng

        Issue Links

          Activity

            People

              Unassigned Unassigned
              liupengcheng liupengcheng
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: