Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-5928

Remote Shuffle Blocks cannot be more than 2 GB

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskMoveLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete CommentsDelete
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • None
    • None
    • Spark Core
    • None

    Description

      If a shuffle block is over 2GB, the shuffle fails, with an uninformative exception. The tasks get retried a few times and then eventually the job fails.

      Here is an example program which can cause the exception:

          val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =>
            val n = 3e3.toInt
            val arr = new Array[Byte](n)
            //need to make sure the array doesn't compress to something small
            scala.util.Random.nextBytes(arr)
            arr
          }
          rdd.map { x => (1, x)}.groupByKey().count()
      

      Note that you can't trigger this exception in local mode, it only happens on remote fetches. I triggered these exceptions running with MASTER=yarn-client spark-shell --num-executors 2 --executor-memory 4000m

      15/02/20 11:10:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, imran-3.ent.cloudera.com): FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
      org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 3021252889 - discarded
      	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
      	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
      	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
      	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
      	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
      	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
      	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
      	at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
      	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
      	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
      	at org.apache.spark.scheduler.Task.run(Task.scala:56)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 2147483647: 3021252889 - discarded
      	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:501)
      	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:477)
      	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:403)
      	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
      	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249)
      	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
      	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
      	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
      	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
      	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
      	... 1 more
      
      )
      

      or if you use "spark.shuffle.blockTransferService=nio", then you get:

      15/02/20 12:48:07 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, imran-2.ent.cloudera.com): FetchFailed(BlockManagerId(2, imran-3.ent.cloudera.com, 42827), shuffleId=0, mapId=0, reduceId=0, message=
      org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed with ACK that signalled a remote error: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
      	at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
      	at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
      	at org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
      	at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
      	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
      	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
      	at org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
      	at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
      	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
      	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
      	at org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
      	at org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      
      	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
      	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
      	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
      	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
      	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
      	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
      	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
      	at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
      	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
      	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
      	at org.apache.spark.scheduler.Task.run(Task.scala:56)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.io.IOException: sendMessageReliably failed with ACK that signalled a remote error: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
      	at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
      	at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
      	at org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
      	at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
      	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
      	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
      	at org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
      	at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
      	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
      	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
      	at org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
      	at org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      
      	at org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:954)
      	at org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:940)
      	at org.apache.spark.network.nio.ConnectionManager$MessageStatus.success(ConnectionManager.scala:67)
      	at org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:728)
      	at org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
      	... 3 more
      
      )
      
      

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned Assign to me
            irashid Imran Rashid
            Votes:
            32 Vote for this issue
            Watchers:
            63 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment