Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22823

Race Condition when reading Broadcast shuffle input. Failed to get broadcast piece

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Incomplete
    • Affects Version/s: 2.0.1, 2.2.1, 2.3.0
    • Fix Version/s: None
    • Component/s: Block Manager, Spark Core
    • Labels:

      Description

      It seems we have a race condition when trying to read shuffle input which is a broadcast, not direct. To read broadcast MapStatuses at

      org.apache.spark.shuffle.BlockStoreShuffleReader::read()
      

      we submit a message of the type GetMapOutputStatuses(shuffleId) to be executed in MapOutputTrackerMaster's pool which in turn ends up in creating a new broadcast at

      org.apache.spark.MapOutputTracker::serializeMapStatuses
      

      if the received statuses bytes more than minBroadcastSize.

      So registering the newly created broadcast in the driver's BlockManagerMasterEndpoint may appear later than some executor asks for the broadcast piece location from the driver.

      In out project we get the following exception on the regular basis:

      java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_176_piece0 of broadcast_176
              at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1280)
              at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
              at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
              at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
              at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
              at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
              at org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
              at org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
              at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
              at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:598)
              at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:660)
              at org.apache.spark.MapOutputTracker.getStatuses(MapOutputTracker.scala:203)
              at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:142)
              at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
              at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
              at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
              at org.apache.spark.scheduler.Task.run(Task.scala:86)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.spark.SparkException: Failed to get broadcast_176_piece0 of broadcast_176
              at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:146)
              at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
              at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
              at scala.collection.immutable.List.foreach(List.scala:381)
              at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:125)
              at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:186)
              at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1273)
      
      

      This exception is appeared when we try to read a broadcast piece. To do this we need to fetch the broadcast piece location from the driver

      org.apache.spark.storage.BlockManagerMaster::getLocations(blockId: BlockId)
      

      . The driver responses with empty list of locations and the fecthing of broadcast piece failed with the exception listed above.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              dbundin Dmitrii Bundin
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: