Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.4.3
-
None
Description
We are using spark 2.4.3 with mesos and with external shuffle service. External shuffle service is launched using systemd by command
/bin/bash -ce "exec /*/spark/bin/spark-class org.apache.spark.deploy.mesos.MesosExternalShuffleService"
Sometimes spark executor has connection timeout when it tries to connect to external shuffle service. When it happens spark executor throws an exception
ERROR BlockManager: Failed to connect to external shuffle server, will retry 4 more times after waiting 5 seconds...
If connection timeout happens 4 more times spark executor throws an error
ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Unable to create executor due to Unable to register with external shuffle server due to : Failed to connect to our-host.com/10.103..:7337
After this error Spark application just hangs. On Mesos UI it goes to inactive frameworks and on Spark Driver UI I can see few failed tasks and looks like it does nothing.
External Shuffle service throws an exception
ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=4941243310586976766, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to /10.103.*.*:49482; closing connection
Full spark executor log is
ERROR BlockManager: Failed to connect to external shuffle server, will retry 1 more times after waiting 5 seconds...
java.io.IOException: Failed to connect to our-host.com/10.103..:7337
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:201)
at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:142)
at org.apache.spark.storage.BlockManager.$anonfun$registerWithExternalShuffleServer$3(BlockManager.scala:295)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:291)
at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:265)
at org.apache.spark.executor.Executor.<init>(Executor.scala:118)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: our-host.com/10.103..:7337
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
... 1 more
Caused by: java.net.ConnectException: Connection timed out
... 11 more
ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Unable to create executor due to Unable to register with external shuffle server due to : Failed to connect to our-host.com/10.103.*.*:7337
org.apache.spark.SparkException: Unable to register with external shuffle server due to : Failed to connect to our-host.com/10.103..:7337
at org.apache.spark.storage.BlockManager.$anonfun$registerWithExternalShuffleServer$3(BlockManager.scala:304)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:291)
at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:265)
at org.apache.spark.executor.Executor.<init>(Executor.scala:118)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to connect to our-host.com/10.103.*.*:7337
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:201)
at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:142)
at org.apache.spark.storage.BlockManager.$anonfun$registerWithExternalShuffleServer$3(BlockManager.scala:295)
... 12 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: our-host.com/10.103.*.*:7337
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
... 1 more
Caused by: java.net.ConnectException: Connection timed out
... 11 more
INFO DiskBlockManager: Shutdown hook called
INFO ShutdownHookManager: Shutdown hook called
I0131 16:29:25.446748 3768 executor.cpp:1039] Command exited with status 1 (pid: 3795)
I0131 16:29:26.447976 3794 process.cpp:935] Stopped the socket accept loop
Full external shuffle service log is
ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=4941243310586976766, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to /10.103..:49482; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148)
at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:111)
at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831)
at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041)
at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300)
at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:288)
at org.apache.spark.network.server.TransportRequestHandler.access$000(TransportRequestHandler.java:45)
at org.apache.spark.network.server.TransportRequestHandler$1.onSuccess(TransportRequestHandler.java:183)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:102)
at org.apache.spark.deploy.mesos.MesosExternalShuffleBlockHandler.handleMessage(MesosExternalShuffleService.scala:78)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)
at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=4713235420893637000, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to /10.103..:49482; closing connection
java.nio.channels.ClosedChannelException