Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
3.2.0, 3.3.0, 3.4.0, 3.5.0
-
None
Description
One possible situation that has been found is that, during the process of requesting mergedBlockMeta, when the channel is closed, it may trigger two callback callbacks and result in duplicate data for the original shuffle blocks.
- The first time is when the channel is inactivated, the responseHandler will execute the callback for all outstandingRpcs.
- The second time is when the listener corresponding to shuffleClient.writeAndFlush executes the callback after the channel is closed.
Some Error Logs:
23/09/08 09:22:21 ERROR shuffle-client-7-1 TransportResponseHandler: Still have 1 requests outstanding when connection from host/ip:prot is closed 23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to get the meta of push-merged block for (3, 54) from host:port java.io.IOException: Connection from host:port closed at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147) at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:745) 23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to get the meta of push-merged block for (3, 54) from host:port java.io.IOException: Failed to send RPC RPC 8079698359363123411 to host/ip:port: java.nio.channels.ClosedChannelException at org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:433) at org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:409) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993) at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865) at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767) at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:745) Caused by: java.nio.channels.ClosedChannelException at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957) ... 18 more