Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
3.3.1
Description
We have been observing this issue several times in our production where some executors are being stuck at BlockTransferService#fetchBlockSync().
After some investigation, the issue seems to be caused by an unhandled edge case in RetryingBlockTransferor.
1. Shuffle transfer fails for whatever reason
java.io.IOException: Cannot allocate memory at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:51) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211) at org.apache.spark.network.shuffle.SimpleDownloadFile$SimpleDownloadWritableChannel.write(SimpleDownloadFile.java:78) at org.apache.spark.network.shuffle.OneForOneBlockFetcher$DownloadCallback.onData(OneForOneBlockFetcher.java:340) at org.apache.spark.network.client.StreamInterceptor.handle(StreamInterceptor.java:79) at org.apache.spark.network.util.TransportFrameDecoder.feedInterceptor(TransportFrameDecoder.java:263) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:87) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
2. The above exception caught by AbstractChannelHandlerContext#invokeChannelRead(), and propagated to the exception handler
3. Exception reaches RetryingBlockTransferor#initiateRetry(), and it tries to initiate retry
23/08/09 16:58:37 shuffle-client-4-2 INFO RetryingBlockTransferor: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms
4. Retry initiation fails (in our case, it fails to create a new thread)
5. Exception caught by AbstractChannelHandlerContext#invokeExceptionCaught(), and not further processed
23/08/09 16:58:53 shuffle-client-4-2 DEBUG AbstractChannelHandlerContext: An exception java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:719) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at org.apache.spark.network.shuffle.RetryingBlockTransferor.initiateRetry(RetryingBlockTransferor.java:182) at org.apache.spark.network.shuffle.RetryingBlockTransferor.access$500(RetryingBlockTransferor.java:43) at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferFailure(RetryingBlockTransferor.java:230) at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockFetchFailure(RetryingBlockTransferor.java:260) at org.apache.spark.network.shuffle.OneForOneBlockFetcher.failRemainingBlocks(OneForOneBlockFetcher.java:318) at org.apache.spark.network.shuffle.OneForOneBlockFetcher.access$300(OneForOneBlockFetcher.java:55) at org.apache.spark.network.shuffle.OneForOneBlockFetcher$DownloadCallback.onFailure(OneForOneBlockFetcher.java:357) at org.apache.spark.network.client.StreamInterceptor.exceptionCaught(StreamInterceptor.java:56) at org.apache.spark.network.util.TransportFrameDecoder.exceptionCaught(TransportFrameDecoder.java:231) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
6. After all, retry never happens and the executor thread ends up being stuck at BlockTransferService#fetchBlockSync(), waiting for the transfer to complete/fail
sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242) scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258) scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263) org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:293) org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103) org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1154) org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1098)