Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-16702

Driver hangs after executors are lost

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 1.6.1, 1.6.2, 2.0.0
    • None
    • Spark Core
    • None

    Description

      It's my first time, please be kind.

      I'm still trying to debug this error locally - at this stage I'm pretty convinced that it's a weird deadlock/livelock problem due to the use of scheduleAtFixedRate within ExecutorAllocationManager. This problem is possibly tangentially related to the issues discussed in SPARK-1560 around the use of blocking calls within locks.

      Observed Behavior

      When running a spark job, and executors are lost, the job occassionally goes into a state where it makes no progress with tasks. Most commonly it seems that the issue occurs when executors are preempted by yarn, but I'm not confident enough to state that it's restricted to just this scenario.

      Upon inspecting a thread dump from the driver, the following stack traces seem noteworthy (a full thread dump is attached):

      Thread 178: spark-dynamic-executor-allocation (TIMED_WAITING)
      sun.misc.Unsafe.park(Native Method)
      java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
      java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
      scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
      scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
      scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
      scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
      scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
      scala.concurrent.Await$.result(package.scala:190)
      org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
      org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
      org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
      org.apache.spark.scheduler.cluster.YarnSchedulerBackend.doRequestTotalExecutors(YarnSchedulerBackend.scala:59)
      org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:447)
      org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1423)
      org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:359)
      org.apache.spark.ExecutorAllocationManager.updateAndSyncNumExecutorsTarget(ExecutorAllocationManager.scala:310)
      org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:264)
      org.apache.spark.ExecutorAllocationManager$$anon$2.run(ExecutorAllocationManager.scala:223)
      java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
      java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
      java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      java.lang.Thread.run(Thread.java:745)
      
      Thread 22: dispatcher-event-loop-10 (BLOCKED)
      org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.disableExecutor(CoarseGrainedSchedulerBackend.scala:289)
      org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint$$anonfun$onDisconnected$1.apply(YarnSchedulerBackend.scala:121)
      org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint$$anonfun$onDisconnected$1.apply(YarnSchedulerBackend.scala:120)
      scala.Option.foreach(Option.scala:257)
      org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint.onDisconnected(YarnSchedulerBackend.scala:120)
      org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:142)
      org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
      org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
      org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
      java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      java.lang.Thread.run(Thread.java:745)
      
      Thread 640: kill-executor-thread (BLOCKED)
      org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors(CoarseGrainedSchedulerBackend.scala:488)
      org.apache.spark.SparkContext.killAndReplaceExecutor(SparkContext.scala:1499)
      org.apache.spark.HeartbeatReceiver$$anonfun$org$apache$spark$HeartbeatReceiver$$expireDeadHosts$3$$anon$3$$anonfun$run$3.apply$mcV$sp(HeartbeatReceiver.scala:206)
      org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1219)
      org.apache.spark.HeartbeatReceiver$$anonfun$org$apache$spark$HeartbeatReceiver$$expireDeadHosts$3$$anon$3.run(HeartbeatReceiver.scala:203)
      java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      java.util.concurrent.FutureTask.run(FutureTask.java:262)
      java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      java.lang.Thread.run(Thread.java:745)
      
      Thread 21: dispatcher-event-loop-9 (TIMED_WAITING)
      sun.misc.Unsafe.park(Native Method)
      java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
      java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
      scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
      scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
      scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
      scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
      scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
      scala.concurrent.Await$.result(package.scala:190)
      org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
      org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
      org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
      org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.removeExecutor(CoarseGrainedSchedulerBackend.scala:370)
      org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint$$anonfun$receive$1.applyOrElse(YarnSchedulerBackend.scala:176)
      org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
      org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
      org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
      org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
      java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      java.lang.Thread.run(Thread.java:745)
      

      My theory is that the following components attempting to call and/or message eachother simultaneously are causing a deadlock/livelock scenario.

      • CoarseGrainedSchedulerBackend (RequestExecutors) -> YarnSchedulerEndpoint
      • YarnSchedulerEndpoint (RemoveExecutor) --> DriverEndpoint
      • DriverEndpoint (disableExecutor) --> CoarseGrainedSchedulerBackend

      This is where the use of scheduleAtFixedRate comes into play: the deadlock should presumably be released when the blocking call to YarnSchedulerEndpoint times out, however as soon as lock contention causes a single execution of ExecutorAllocationManager.schedule to take longer than the hard coded 100 milliseconds, then the possibility exists for that thread to release and then immediately reaquire the lock on CoarseGrainedSchedulerBackend

      Proposed Solution

      A simple solution would be to have YarnSchedulerEndpoint.doRequestTotalExecutors not make blocking calls, similar to SPARK-15606. However I think it would also be wise to refactor ExecutorAllocationManager to not use scheduleAtFixedRate and rather to sleep for some interval of time.

      That's all I've got, I hope that it's been helpful. I plan on starting to work on my proposed solution, and so would welcome any feedback on the direction I've suggested.

      Attachments

        1. SparkThreadsBlocked.txt
          299 kB
          Angus Gerry

        Issue Links

          Activity

            People

              Unassigned Unassigned
              angolon@gmail.com Angus Gerry
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: