Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-31664

Race in YARN scheduler shutdown leads to uncaught SparkException "Could not find CoarseGrainedScheduler"

    XMLWordPrintableJSON

Details

    • Bug
    • Status: In Progress
    • Minor
    • Resolution: Unresolved
    • 3.0.0, 3.0.1, 3.1.0
    • None
    • Scheduler, Spark Core, YARN
    • None

    Description

      I used this command to run SparkPi on a yarn cluster with dynamicAllocation enabled: "$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ./spark-examples.jar 1000" and received error log below every time.

       

      20/05/06 16:31:44 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
      org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
      	at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:169)
      	at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:150)
      	at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:684)
      	at org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:66)
      	at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:253)
      	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)
      	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
      	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
      	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
      	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
      	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
      	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
      	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
      	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
      	at java.lang.Thread.run(Thread.java:748)
      20/05/06 16:31:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
      20/05/06 16:31:45 INFO MemoryStore: MemoryStore cleared
      20/05/06 16:31:45 INFO BlockManager: BlockManager stopped
      

       

      After some investigation, I found this issue might be introduced in https://github.com/apache/spark/pull/25964. There is a race between driver backend and executor backend that could happen when driver shutdown.

       

      PR#25964 added a new message type LaunchedExecutor and updated the communication mechanism between executor and driver when launching executor to:

      1. executor backend sends "RegisterExecutor" to the driver backend.
      2. the driver backend replies "true".
      3. executor backend instantiates executor once it receives "true" from driver backend.
      4. after the executor is instantiated, the executor backend sends "LaunchedExecutor" to the driver backend.
      5. the driver backend makes offers for executor when received "LaunchedExecutor".

      So the issue occurs in steps 3 and 4. If the driver backend is stopped(hence driver endpoint removed in dispatcher) during step 3, in step 4, when executor backend tries to send "LaunchedExecutor" to driver backend, RPC dispatcher will throw a SparkException for "Could not find CoarseGrainedScheduler".  These exception logs are verbose and somewhat misleading.

       

      This race can be fixed or greatly alleviated through these changes:

      When the stop() in CoarseGrainedSchedulerBackend is called:

      1. A stopping boolean variable is set to true.
      2. driverEndpoint will not be stopped at this time. (dispatcher will stop it at the end)

      And when the stopping is set to true, the driver backend will:

      1. replies sendFailure to executor backend when receives "RegisterExecutor".
      2. replies "StopExecutor" to executor backend (or "RemoveExecutor" to self) when receives "LaunchedExecutor"

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Baohe Zhang Baohe Zhang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: