Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
None
-
None
-
None
Description
If a shuffle block is over 2GB, the shuffle fails, with an uninformative exception. The tasks get retried a few times and then eventually the job fails.
Here is an example program which can cause the exception:
val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore => val n = 3e3.toInt val arr = new Array[Byte](n) //need to make sure the array doesn't compress to something small scala.util.Random.nextBytes(arr) arr } rdd.map { x => (1, x)}.groupByKey().count()
Note that you can't trigger this exception in local mode, it only happens on remote fetches. I triggered these exceptions running with MASTER=yarn-client spark-shell --num-executors 2 --executor-memory 4000m
15/02/20 11:10:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, imran-3.ent.cloudera.com): FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message= org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 3021252889 - discarded at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 2147483647: 3021252889 - discarded at io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:501) at io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:477) at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:403) at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) ... 1 more )
or if you use "spark.shuffle.blockTransferService=nio", then you get:
15/02/20 12:48:07 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, imran-2.ent.cloudera.com): FetchFailed(BlockManagerId(2, imran-3.ent.cloudera.com, 42827), shuffleId=0, mapId=0, reduceId=0, message= org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed with ACK that signalled a remote error: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76) at org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215) at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191) at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28) at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165) at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) at org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750) at org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: sendMessageReliably failed with ACK that signalled a remote error: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76) at org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215) at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191) at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28) at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165) at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) at org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750) at org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) at org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:954) at org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:940) at org.apache.spark.network.nio.ConnectionManager$MessageStatus.success(ConnectionManager.scala:67) at org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:728) at org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581) ... 3 more )
Attachments
Attachments
Issue Links
- duplicates
-
SPARK-6238 Support shuffle where individual blocks might be > 2G
- Resolved
- is duplicated by
-
SPARK-20308 org.apache.spark.shuffle.FetchFailedException: Too large frame
- Resolved
-
SPARK-5739 Size exceeds Integer.MAX_VALUE in File Map
- Resolved
- is required by
-
SPARK-1476 2GB limit in spark for blocks
- Closed
- relates to
-
SPARK-6190 create LargeByteBuffer abstraction for eliminating 2GB limit on blocks
- Resolved
- links to