Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-31664

Race in YARN scheduler shutdown leads to uncaught SparkException "Could not find CoarseGrainedScheduler"

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: In Progress
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 3.0.0, 3.0.1, 3.1.0
    • Fix Version/s: None
    • Component/s: Scheduler, Spark Core, YARN
    • Labels:
      None

      Description

      I used this command to run SparkPi on a yarn cluster with dynamicAllocation enabled: "$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ./spark-examples.jar 1000" and received error log below every time.

       

      20/05/06 16:31:44 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
      org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
      	at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:169)
      	at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:150)
      	at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:684)
      	at org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:66)
      	at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:253)
      	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)
      	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
      	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
      	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
      	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
      	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
      	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
      	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
      	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
      	at java.lang.Thread.run(Thread.java:748)
      20/05/06 16:31:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
      20/05/06 16:31:45 INFO MemoryStore: MemoryStore cleared
      20/05/06 16:31:45 INFO BlockManager: BlockManager stopped
      

       

      After some investigation, I found this issue might be introduced in https://github.com/apache/spark/pull/25964. There is a race between driver backend and executor backend that could happen when driver shutdown.

       

      PR#25964 added a new message type LaunchedExecutor and updated the communication mechanism between executor and driver when launching executor to:

      1. executor backend sends "RegisterExecutor" to the driver backend.
      2. the driver backend replies "true".
      3. executor backend instantiates executor once it receives "true" from driver backend.
      4. after the executor is instantiated, the executor backend sends "LaunchedExecutor" to the driver backend.
      5. the driver backend makes offers for executor when received "LaunchedExecutor".

      So the issue occurs in steps 3 and 4. If the driver backend is stopped(hence driver endpoint removed in dispatcher) during step 3, in step 4, when executor backend tries to send "LaunchedExecutor" to driver backend, RPC dispatcher will throw a SparkException for "Could not find CoarseGrainedScheduler".  These exception logs are verbose and somewhat misleading.

       

      This race can be fixed or greatly alleviated through these changes:

      When the stop() in CoarseGrainedSchedulerBackend is called:

      1. A stopping boolean variable is set to true.
      2. driverEndpoint will not be stopped at this time. (dispatcher will stop it at the end)

      And when the stopping is set to true, the driver backend will:

      1. replies sendFailure to executor backend when receives "RegisterExecutor".
      2. replies "StopExecutor" to executor backend (or "RemoveExecutor" to self) when receives "LaunchedExecutor"

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                Baohe Zhang Baohe Zhang
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated: