Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-7563

OutputCommitCoordinator.stop() should only be executed in driver

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.3.1
    • Fix Version/s: 1.3.2, 1.4.0
    • Component/s: Spark Core
    • Labels:
      None
    • Environment:

      Red Hat Enterprise Linux Server release 7.0 (Maipo)
      Spark 1.3.1 Release

      Description

      I am from IBM Platform Symphony team and we are integrating Spark 1.3.1 with EGO (a resource management product).

      In EGO we uses fine-grained dynamic allocation policy, and each Executor will exit after its tasks are all done. When testing spark-shell, we find that when executor of first job exit, it will stop OutputCommitCoordinator, which result in all future jobs failing. Details are as follows:

      We got the following error in executor when submitting job in spark-shell the second time (the first job submission is successful):

      15/05/11 04:02:31 INFO spark.util.AkkaUtils: Connecting to OutputCommitCoordinator: akka.tcp://sparkDriver@whlspark01:50452/user/OutputCommitCoordinator
      Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@whlspark01:50452/), Path(/user/OutputCommitCoordinator)]
              at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
              at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
              at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
              at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
              at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
              at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
              at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
              at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
              at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
              at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
              at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
              at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
              at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
              at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
              at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
              at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89)
              at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
              at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
              at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
              at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
              at akka.actor.ActorCell.invoke(ActorCell.scala:487)
              at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
              at akka.dispatch.Mailbox.run(Mailbox.scala:220)
              at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
              at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
              at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
              at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
              at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

      And in driver side, we see a log message telling that the OutputCommitCoordinator is stopped after the first submission:

      15/05/11 04:01:23 INFO spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor: OutputCommitCoordinator stopped!
      

      We examine the code of OutputCommitCoordinator, and find that executor will reuse the ref of driver's OutputCommitCoordinatorActor. So when an executor exits, it will eventually call SparkEnv.stop():

        private[spark] def stop() {
          isStopped = true
          pythonWorkers.foreach { case(key, worker) => worker.stop() }
          Option(httpFileServer).foreach(_.stop())
          mapOutputTracker.stop()
          shuffleManager.stop()
          broadcastManager.stop()
          blockManager.stop()
          blockManager.master.stop()
          metricsSystem.stop()
          outputCommitCoordinator.stop()      <--------------- 
          actorSystem.shutdown()
          ......
      

      and in OutputCommitCoordinator.stop():

        def stop(): Unit = synchronized {
          coordinatorActor.foreach(_ ! StopCoordinator)
          coordinatorActor = None
          authorizedCommittersByStage.clear()
        }
      

      We now work this problem around by adding an attribute "isDriver" in OutputCommitCoordinator and judge whether the "stop" command comes from driver or executor:

      diff SparkEnv.scala
      360c360
      <       new OutputCommitCoordinator(conf, isDriver)
      ---
      >       new OutputCommitCoordinator(conf)
      
      
      diff OutputCommitCoordinator.scala
      43c43
      < private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean = false) extends Logging {
      ---
      > private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
      137,141c137,139
      <     if (isDriver) {
      <       coordinatorActor.foreach(_ ! StopCoordinator)
      <       coordinatorActor = None
      <       authorizedCommittersByStage.clear()
      <     }
      ---
      >     coordinatorActor.foreach(_ ! StopCoordinator)
      >     coordinatorActor = None
      >     authorizedCommittersByStage.clear()
      

      We propose to apply this fix in future release since it may affects all spark-shell function of dynamic allocation model.

        Attachments

          Activity

            People

            • Assignee:
              joshrosen Josh Rosen
              Reporter:
              wenhailong1988 Hailong Wen
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: