Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-5293 Enable Spark user applications to use different versions of Akka
  3. SPARK-7174

Move calling `TaskScheduler.executorHeartbeatReceived` to another thread to avoid blocking the Akka thread pool

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.2.2, 1.3.1
    • 1.4.0
    • Spark Core
    • None

    Description

      HeartbeatReceiver will call TaskScheduler.executorHeartbeatReceived, which is a blocking operation because "TaskScheduler.executorHeartbeatReceived" will call

          blockManagerMaster.driverEndpoint.askWithReply[Boolean](
            BlockManagerHeartbeat(blockManagerId), 600 seconds)
      

      finally. Even if it asks from a local Actor, it may block the current Akka thread. E.g., the reply may be dispatched to the same thread of the ask operation. So the reply cannot be processed. An extreme case is setting the thread number of Akka dispatch thread pool to 1.

      jstack log:

      "sparkDriver-akka.actor.default-dispatcher-14" daemon prio=10 tid=0x00007f2a8c02d000 nid=0x725 waiting on condition [0x00007f2b1d6d0000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000006197a0868> (a scala.concurrent.impl.Promise$CompletionLatch)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
      	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
      	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
      	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
      	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
      	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
      	at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
      	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
      	at scala.concurrent.Await$.result(package.scala:107)
      	at org.apache.spark.rpc.RpcEndpointRef.askWithReply(RpcEnv.scala:355)
      	at org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:169)
      	at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:367)
      	at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(HeartbeatReceiver.scala:103)
      	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:182)
      	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:128)
      	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:203)
      	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:127)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
      	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
      	at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
      	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:94)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

      Thanks for yhuai providing the helpful jstack log.

      Attachments

        Activity

          People

            zsxwing Shixiong Zhu
            zsxwing Shixiong Zhu
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: