Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27991

ShuffleBlockFetcherIterator should take Netty constant-factor overheads into account when limiting number of simultaneous block fetches

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.4.0
    • Fix Version/s: None
    • Component/s: Shuffle
    • Labels:
      None

      Description

      ShuffleBlockFetcherIterator has logic to limit the number of simultaneous block fetches. By default, this logic tries to keep the number of outstanding block fetches beneath a data size limit (maxBytesInFlight). However, this limiting does not take fixed overheads into account: even though a remote block might be, say, 4KB, there are certain fixed-size internal overheads due to Netty buffer sizes which may cause the actual space requirements to be larger.

      As a result, if a map stage produces a huge number of extremely tiny blocks then we may see errors like

      org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 39325794304, max: 39325794304)
      at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
      at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
      [...]
      Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 39325794304, max: 39325794304)
      at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640)
      at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594)
      at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)
      at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
      at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
      at io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
      at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
      at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
      [...]

      SPARK-24989 is another report of this problem (but with a different proposed fix).

      This problem can currently be mitigated by setting spark.reducer.maxReqsInFlight to some some non-IntMax value (SPARK-6166), but this additional manual configuration step is cumbersome.

      Instead, I think that Spark should take these fixed overheads into account in the maxBytesInFlight calculation: instead of using blocks' actual sizes, use Math.min(blockSize, minimumNettyBufferSize). There might be some tricky details involved to make this work on all configurations (e.g. to use a different minimum when direct buffers are disabled, etc.), but I think the core idea behind the fix is pretty simple.

      This will improve Spark's stability and removes configuration / tuning burden from end users.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                joshrosen Josh Rosen
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated: