Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-5206

Accumulators are not re-registered during recovering from checkpoint

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 1.1.0
    • None
    • DStreams

    Description

      I got exception as following while my streaming application restarts from crash from checkpoit:

      15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 4)
      java.util.NoSuchElementException: key not found: 1
      at scala.collection.MapLike$class.default(MapLike.scala:228)
      at scala.collection.AbstractMap.default(Map.scala:58)
      at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
      at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
      at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
      at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
      at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
      at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
      at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
      at akka.actor.ActorCell.invoke(ActorCell.scala:456)
      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
      at akka.dispatch.Mailbox.run(Mailbox.scala:219)
      at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

      I guess that an Accumulator is registered to a singleton Accumulators in Line 58 of org.apache.spark.Accumulable:
      Accumulators.register(this, true)
      This code need to be executed in the driver once. But when the application is recovered from checkpoint. It won't be executed in the driver. So when the driver process it at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), It can't find the Accumulator because it's not re-register during the recovery.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              vincentye38 vincent ye
              Votes:
              7 Vote for this issue
              Watchers:
              20 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: