ShuffleBlockFetcherIterator has logic to limit the number of simultaneous block fetches. By default, this logic tries to keep the number of outstanding block fetches beneath a data size limit (maxBytesInFlight). However, this limiting does not take fixed overheads into account: even though a remote block might be, say, 4KB, there are certain fixed-size internal overheads due to Netty buffer sizes which may cause the actual space requirements to be larger.
As a result, if a map stage produces a huge number of extremely tiny blocks then we may see errors like
SPARK-24989 is another report of this problem (but with a different proposed fix).
This problem can currently be mitigated by setting spark.reducer.maxReqsInFlight to some some non-IntMax value (
SPARK-6166), but this additional manual configuration step is cumbersome.
Instead, I think that Spark should take these fixed overheads into account in the maxBytesInFlight calculation: instead of using blocks' actual sizes, use Math.min(blockSize, minimumNettyBufferSize). There might be some tricky details involved to make this work on all configurations (e.g. to use a different minimum when direct buffers are disabled, etc.), but I think the core idea behind the fix is pretty simple.
This will improve Spark's stability and removes configuration / tuning burden from end users.