Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-15074

Spark shuffle service bottlenecked while fetching large amount of intermediate data

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.6.1
    • Fix Version/s: 2.1.0
    • Component/s: Shuffle
    • Labels:
      None
    • Target Version/s:

      Description

      While running a job which produces more than 90TB of intermediate data, we find that about 10-15% of the reducer execution time is being spent in shuffle fetch.

      Jstack of the shuffle service reveals that most of the time the shuffle service is reading the index files generated by the mapper.

      java.lang.Thread.State: RUNNABLE
      	at java.io.FileInputStream.readBytes(Native Method)
      	at java.io.FileInputStream.read(FileInputStream.java:255)
      	at java.io.DataInputStream.readFully(DataInputStream.java:195)
      	at java.io.DataInputStream.readLong(DataInputStream.java:416)
      	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:277)
      	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:190)
      	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:85)
      	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:72)
      	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149)
      	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
      	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
      	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
      	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
      	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
      	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
      	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
      	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
      	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
      	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
      	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
      	at java.lang.Thread.run(Thread.java:745)
      

      The issue is that for each shuffle fetch, we reopen the same index file again and read it. It would be much efficient, if we can avoid opening the same file multiple times and cache the data. We can use an LRU cache to save the index file information. This way we can also limit the number of entries in the cache so that we don't blow up the memory indefinitely.

        Attachments

          Activity

            People

            • Assignee:
              sitalkedia@gmail.com Sital Kedia
              Reporter:
              sitalkedia@gmail.com Sital Kedia
            • Votes:
              0 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: