Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-34193

Potential race condition during decommissioning with TorrentBroadcast

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.0, 3.1.1, 3.1.2, 3.2.0
    • 3.1.1
    • Spark Core
    • None

    Description

      I found this while back porting so the line numbers should be ignored, but the core of the issue is that we shouldn't be failing the job on this (I don't think). We could fix this by allowing broadcast blocks to be put or having the torrent broadcast ignore this exception.

      [info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage 3.0 (TID 8, 192.168.1.57, executor 1): java.io.IOException: org.apache.spark.storage.BlockSavedOnDecommissionedBlockManagerException: Block broadcast_2_piece0 cannot be saved on decommissioned executor[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage 3.0 (TID 8, 192.168.1.57, executor 1): java.io.IOException: org.apache.spark.storage.BlockSavedOnDecommissionedBlockManagerException: Block broadcast_2_piece0 cannot be saved on decommissioned executor[info] at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1333)[info] at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:215)[info] at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)[info] at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)[info] at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)[info] at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)[info] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:84)[info] at org.apache.spark.scheduler.Task.run(Task.scala:123)[info] at org.apache.spark.executor.Executor$TaskRunner$$anonfun$12.apply(Executor.scala:448)[info] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)[info] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:454)[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[info] at java.lang.Thread.run(Thread.java:748)[info] Caused by: org.apache.spark.storage.BlockSavedOnDecommissionedBlockManagerException: Block broadcast_2_piece0 cannot be saved on decommissioned executor[info] at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1105)[info] at org.apache.spark.storage.BlockManager.doPutBytes(BlockManager.scala:1010)[info] at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:986)[info] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:181)[info] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:159)[info] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:159)[info] at scala.collection.immutable.List.foreach(List.scala:392)[info] at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:159)[info] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$apply$2.apply(TorrentBroadcast.scala:239)[info] at scala.Option.getOrElse(Option.scala:121)[info] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:219)[info] at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1326)[info] ... 13 more[info][info] Driver stacktrace:[info]   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1928)[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1916)[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1915)[info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)[info]   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1915)[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:951)[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:951)[info]   at scala.Option.foreach(Option.scala:257)[info]   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:951)[info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2149)[info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2098)[info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2087)[info]   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)[info]   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:762)[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2089)[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2110)[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129)[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)[info]   at org.apache.spark.rdd.RDD.count(RDD.scala:1213)[info]   at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite.org$apache$spark$storage$BlockManagerDecommissionIntegrationSuite$$runDecomTest(BlockManagerDecommissionIntegrationSuite.scala:276)[info]   at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite$$anonfun$1.apply$mcV$sp(BlockManagerDecommissionIntegrationSuite.scala:61)[info]   at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite$$anonfun$1.apply(BlockManagerDecommissionIntegrationSuite.scala:61)[info]   at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite$$anonfun$1.apply(BlockManagerDecommissionIntegrationSuite.scala:61)[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:147)[info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)[info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:54)[info]   at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:54)[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)[info]   at scala.collection.immutable.List.foreach(List.scala:392)[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)[info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)[info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)[info]   at org.scalatest.Suite$class.run(Suite.scala:1147)[info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)[info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:54)[info]   at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)[info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:54)[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[info]   at java.lang.Thread.run(Thread.java:748)[info]   Cause: java.io.IOException: org.apache.spark.storage.BlockSavedOnDecommissionedBlockManagerException: Block broadcast_2_piece0 cannot be saved on decommissioned executor[info]   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1333)[info]   at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:215)[info]   at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)[info]   at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)[info]   at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)[info]   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)[info]   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:84)[info]   at org.apache.spark.scheduler.Task.run(Task.scala:123)[info]   at org.apache.spark.executor.Executor$TaskRunner$$anonfun$12.apply(Executor.scala:448)[info]   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)[info]   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:454)[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[info]   at java.lang.Thread.run(Thread.java:748)[info]   Cause: org.apache.spark.storage.BlockSavedOnDecommissionedBlockManagerException: Block broadcast_2_piece0 cannot be saved on decommissioned executor[info]   at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1105)[info]   at org.apache.spark.storage.BlockManager.doPutBytes(BlockManager.scala:1010)[info]   at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:986)[info]   at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:181)[info]   at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:159)[info]   at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:159)[info]   at scala.collection.immutable.List.foreach(List.scala:392)[info]   at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:159)[info]   at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$apply$2.apply(TorrentBroadcast.scala:239)[info]   at scala.Option.getOrElse(Option.scala:121)[info]   at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:219)[info]   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1326)[info]   at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:215)[info]   at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)[info]   at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)[info]   at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)[info]   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)[info]   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:84)[info]   at org.apache.spark.scheduler.Task.run(Task.scala:123)[info]   at org.apache.spark.executor.Executor$TaskRunner$$anonfun$12.apply(Executor.scala:448)[info]   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)[info]   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:454)[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[info]   at java.util.concurrent.ThreadPoolExecutor$W

      Attachments

        Activity

          People

            holden Holden Karau
            holden Holden Karau
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: