Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-38856

Fix a rejectedExecutionException error when push-based shuffle is enabled

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.2.0, 3.2.1
    • None
    • Shuffle
    • None

    Description

      When enabled push-based shuffle in our production, there will be a rejectedExecutionException error, this is because that the shuffle pusher pool has been shutdowned before using it.

      This is the rejectedExecutionException error :

      {{FetchFailed(BlockManagerId(26,xxxxx.hadoop.jd.local, 7337, None), shuffleId=0, mapIndex=6424, mapId=4177, reduceId=1031, message=
      org.apache.spark.shuffle.FetchFailedException
      at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1181)
      at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:919)
      at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:81)
      at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
      at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
      at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
      at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
      at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
      at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:815)
      at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
      at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
      at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
      at org.apache.spark.scheduler.Task.run(Task.scala:133)
      at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.util.concurrent.RejectedExecutionException: Task org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2$$Lambda$1045/583658475@3492bd6f rejected from java.util.concurrent.ThreadPoolExecutor@2e63bad5[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 243134]
      at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
      at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
      at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
      at org.apache.spark.shuffle.ShuffleBlockPusher.submitTask(ShuffleBlockPusher.scala:147)
      at org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.handleResult(ShuffleBlockPusher.scala:235)
      at org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockPushSuccess(ShuffleBlockPusher.scala:245)
      at org.apache.spark.network.shuffle.BlockPushingListener.onBlockTransferSuccess(BlockPushingListener.java:42)
      at org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockTransferSuccess(ShuffleBlockPusher.scala:224)
      at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferSuccess(RetryingBlockTransferor.java:258)
      at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockPushSuccess(RetryingBlockTransferor.java:304)
      at org.apache.spark.network.shuffle.OneForOneBlockPusher$BlockPushCallback.onSuccess(OneForOneBlockPusher.java:97)
      at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:197)
      at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
      at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
      at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
      at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
      at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
      at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
      at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
      at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
      ... 1 more

      )}}

      Attachments

        Activity

          People

            weixiuli weixiuli
            weixiuli weixiuli
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: