Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.3.1
-
None
-
Red Hat Enterprise Linux Server release 7.0 (Maipo)
Spark 1.3.1 Release
Description
I am from IBM Platform Symphony team and we are integrating Spark 1.3.1 with EGO (a resource management product).
In EGO we uses fine-grained dynamic allocation policy, and each Executor will exit after its tasks are all done. When testing spark-shell, we find that when executor of first job exit, it will stop OutputCommitCoordinator, which result in all future jobs failing. Details are as follows:
We got the following error in executor when submitting job in spark-shell the second time (the first job submission is successful):
15/05/11 04:02:31 INFO spark.util.AkkaUtils: Connecting to OutputCommitCoordinator: akka.tcp://sparkDriver@whlspark01:50452/user/OutputCommitCoordinator Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@whlspark01:50452/), Path(/user/OutputCommitCoordinator)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
And in driver side, we see a log message telling that the OutputCommitCoordinator is stopped after the first submission:
15/05/11 04:01:23 INFO spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor: OutputCommitCoordinator stopped!
We examine the code of OutputCommitCoordinator, and find that executor will reuse the ref of driver's OutputCommitCoordinatorActor. So when an executor exits, it will eventually call SparkEnv.stop():
private[spark] def stop() { isStopped = true pythonWorkers.foreach { case(key, worker) => worker.stop() } Option(httpFileServer).foreach(_.stop()) mapOutputTracker.stop() shuffleManager.stop() broadcastManager.stop() blockManager.stop() blockManager.master.stop() metricsSystem.stop() outputCommitCoordinator.stop() <--------------- actorSystem.shutdown() ......
and in OutputCommitCoordinator.stop():
def stop(): Unit = synchronized { coordinatorActor.foreach(_ ! StopCoordinator) coordinatorActor = None authorizedCommittersByStage.clear() }
We now work this problem around by adding an attribute "isDriver" in OutputCommitCoordinator and judge whether the "stop" command comes from driver or executor:
diff SparkEnv.scala 360c360 < new OutputCommitCoordinator(conf, isDriver) --- > new OutputCommitCoordinator(conf) diff OutputCommitCoordinator.scala 43c43 < private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean = false) extends Logging { --- > private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { 137,141c137,139 < if (isDriver) { < coordinatorActor.foreach(_ ! StopCoordinator) < coordinatorActor = None < authorizedCommittersByStage.clear() < } --- > coordinatorActor.foreach(_ ! StopCoordinator) > coordinatorActor = None > authorizedCommittersByStage.clear()
We propose to apply this fix in future release since it may affects all spark-shell function of dynamic allocation model.