Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-13704

TaskSchedulerImpl.createTaskSetManager can be expensive, and result in lost executors due to blocked heartbeats

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.3.1, 1.4.1, 1.5.2, 1.6.0
    • 3.0.0
    • Spark Core
    • None

    Description

      In some cases, TaskSchedulerImpl.createTaskSetManager can be expensive. For example, in a Yarn cluster, it may call the topology script for rack awareness. When submit a very large job in a very large Yarn cluster, the topology script may take signifiant time to run. And this blocks receiving executors' heartbeats, which may result in lost executors

      Stacktraces we observed which is related to this issue:

      https://issues.apache.org/jira/browse/SPARK-13704#
      "dag-scheduler-event-loop" daemon prio=10 tid=0x00007f8392875800 nid=0x26e8 runnable [0x00007f83576f4000]
         java.lang.Thread.State: RUNNABLE
              at java.io.FileInputStream.readBytes(Native Method)
              at java.io.FileInputStream.read(FileInputStream.java:272)
              at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
              at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
              - locked <0x00000000f551f460> (a java.lang.UNIXProcess$ProcessPipeInputStream)
              at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
              at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
              at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
              - locked <0x00000000f5529740> (a java.io.InputStreamReader)
              at java.io.InputStreamReader.read(InputStreamReader.java:184)
              at java.io.BufferedReader.fill(BufferedReader.java:154)
              at java.io.BufferedReader.read1(BufferedReader.java:205)
              at java.io.BufferedReader.read(BufferedReader.java:279)
              - locked <0x00000000f5529740> (a java.io.InputStreamReader)
              at org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:728)
              at org.apache.hadoop.util.Shell.runCommand(Shell.java:524)
              at org.apache.hadoop.util.Shell.run(Shell.java:455)
              at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
              at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251)
              at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188)
              at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119)
              at org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
              at org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
              at org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38)
              at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:210)
              at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:189)
              at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
              at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
              at org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$addPendingTask(TaskSetManager.scala:189)
              at org.apache.spark.scheduler.TaskSetManager$$anonfun$1.apply$mcVI$sp(TaskSetManager.scala:158)
              at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
              at org.apache.spark.scheduler.TaskSetManager.<init>(TaskSetManager.scala:157)
              at org.apache.spark.scheduler.TaskSchedulerImpl.createTaskSetManager(TaskSchedulerImpl.scala:187)
              at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:161)
              - locked <0x00000000ea3b8a88> (a org.apache.spark.scheduler.cluster.YarnScheduler)
              at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:872)
              at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
              at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
              at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
              at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
              at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      
      "sparkDriver-akka.actor.default-dispatcher-15" daemon prio=10 tid=0x00007f829c020000 nid=0x2737 waiting for monitor entry [0x00007f8355ebd000]
         java.lang.Thread.State: BLOCKED (on object monitor)
              at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:362)
              - waiting to lock <0x00000000ea3b8a88> (a org.apache.spark.scheduler.cluster.YarnScheduler)
              at org.apache.spark.HeartbeatReceiver$$anonfun$receiveWithLogging$1.applyOrElse(HeartbeatReceiver.scala:46)
              at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
              at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
              at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
              at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
              at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
              at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
              at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
              at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
              at akka.actor.ActorCell.invoke(ActorCell.scala:456)
              at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
              at akka.dispatch.Mailbox.run(Mailbox.scala:219)
              at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
              at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
              at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
              at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
              at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

      Attachments

        Issue Links

          Activity

            People

              cltlfcjin Lantao Jin
              zwang Zhong Wang
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: