Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24989

BlockFetcher should retry while getting OutOfDirectMemoryError

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 2.2.0
    • None
    • Shuffle, Spark Core
    • None

    Description

      Description

      This problem can be reproduced stably by a large parallelism job migrate from map reduce to Spark in our practice, some metrics list below:

      Item Value
      spark.executor.instances 1000
      spark.executor.cores 5
      task number of shuffle writer stage 18038
      task number of shuffle reader stage 80000

      While the shuffle writer stage successful ended, the shuffle reader stage starting and keep failing by FetchFail. Each fetch request need the netty sever allocate a buffer in 16MB(detailed stack attached below), the huge amount of fetch request will use up default maxDirectMemory rapidly, even though we bump up io.netty.maxDirectMemory to 50GB!

      org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480)
      	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514)
      	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445)
      	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
      	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
      	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
      	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
      	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
      	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
      	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119)
      	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      	at org.apache.spark.scheduler.Task.run(Task.scala:108)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480)
      	at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:530)
      	at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:484)
      	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:711)
      	at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:700)
      	at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)
      	at io.netty.buffer.PoolArena.allocate(PoolArena.java:221)
      	at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
      	at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:296)
      	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
      	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
      	at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
      	at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
      	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
      	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
      	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
      	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
      	... 1 more
      

      Solution

      Add retry support and bump up java option io.netty.maxDirectMemory and try lager spark.shuffle.io.retryWait can help the job passing and , I think we need more discussion about load balance of fetch requests, but maybe the retry support is necessary first.
       

      Attachments

        1. FailedStage.png
          363 kB
          Yuanjian Li

        Issue Links

          Activity

            People

              Unassigned Unassigned
              XuanYuan Yuanjian Li
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: