Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
2.2.0
-
None
-
None
Description
Description
This problem can be reproduced stably by a large parallelism job migrate from map reduce to Spark in our practice, some metrics list below:
Item | Value |
---|---|
spark.executor.instances | 1000 |
spark.executor.cores | 5 |
task number of shuffle writer stage | 18038 |
task number of shuffle reader stage | 80000 |
While the shuffle writer stage successful ended, the shuffle reader stage starting and keep failing by FetchFail. Each fetch request need the netty sever allocate a buffer in 16MB(detailed stack attached below), the huge amount of fetch request will use up default maxDirectMemory rapidly, even though we bump up io.netty.maxDirectMemory to 50GB!
org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:530) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:484) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:711) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:700) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237) at io.netty.buffer.PoolArena.allocate(PoolArena.java:221) at io.netty.buffer.PoolArena.allocate(PoolArena.java:141) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:296) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168) at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129) at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) ... 1 more
Solution
Add retry support and bump up java option io.netty.maxDirectMemory and try lager spark.shuffle.io.retryWait can help the job passing and , I think we need more discussion about load balance of fetch requests, but maybe the retry support is necessary first.
Attachments
Attachments
Issue Links
- duplicates
-
SPARK-21243 Limit the number of maps in a single shuffle fetch
- Resolved
- is related to
-
SPARK-27991 ShuffleBlockFetcherIterator should take Netty constant-factor overheads into account when limiting number of simultaneous block fetches
- Resolved
- links to