Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40987

Avoid creating a directory when deleting a block, causing DAGScheduler to not work

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.2.2, 3.3.1
    • 3.2.3, 3.3.2, 3.4.0
    • Spark Core
    • None

    Description

      When the driver submits a job, DAGScheduler calls sc.broadcast(taskBinaryBytes).

      TorrentBroadcast#writeBlocks may fail due to disk problems during blockManager#putBytes.

      BlockManager#doPut calls BlockManager#removeBlockInternal to clean up the block.

      BlockManager#removeBlockInternal calls DiskStore#remove to clean up blocks on disk.

      DiskStore#remove will try to create the directory because the directory does not exist, and an exception will be thrown at this time.

      BlockInfoManager#blockInfoWrappers block info and lock not removed.

      The catch block in TorrentBroadcast#writeBlocks will call blockManager.removeBroadcast to clean up the broadcast.
      Because the block lock in BlockInfoManager#blockInfoWrappers is not released, the dag-scheduler-event-loop thread of DAGScheduler will wait forever.

       

       

      22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: XXXXX.
      22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast 

       

       

       

      "dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000]
         java.lang.Thread.State: WAITING (parking)
          at sun.misc.Unsafe.park(Native Method)
          - parking to wait for  <0x00000007add3d8c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
          at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
          at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
          at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
          at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
          at org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown Source)
          at org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
          at org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
          at org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
          at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
          at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
          at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
          at org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown Source)
          at scala.collection.Iterator.foreach(Iterator.scala:943)
          at scala.collection.Iterator.foreach$(Iterator.scala:943)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
          at org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
          at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
          at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
          at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
          at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
          at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
          at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
          at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
          at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
          at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)
          at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) 

       

       

      Attachments

        Activity

          People

            dzcxzl dzcxzl
            dzcxzl dzcxzl
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: