Details

      Description

      Spark's networking layer supports sending messages backed by a FileRegion or a ByteBuf. Sending large FileRegion's works, as netty supports large FileRegions. However, ByteBuf is limited to 2GB. This is particularly a problem for sending large datasets that are already in memory, eg. cached RDD blocks.

      eg. if you try to replicate a block stored in memory that is over 2 GB, you will see an exception like:

      18/05/16 12:40:57 ERROR client.TransportClient: Failed to send RPC 7420542363232096629 to xyz.com/172.31.113.213:44358: io.netty.handler.codec.EncoderException: java.lang.IndexOutOfBoundsException: readerIndex: 0, writerIndex: -1294617291 (expected: 0 <= readerIndex <= writerIndex <= capacity(-1294617291))
      io.netty.handler.codec.EncoderException: java.lang.IndexOutOfBoundsException: readerIndex: 0, writerIndex: -1294617291 (expected: 0 <= readerIndex <= writerIndex <= capacity(-1294617291))
              at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:106)
              at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
              at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
              at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
              at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
              at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
              at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
              at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
              at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
              at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1081)
              at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1128)
              at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070)
              at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
              at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
              at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
              at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
              at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.IndexOutOfBoundsException: readerIndex: 0, writerIndex: -1294617291 (expected: 0 <= readerIndex <= writerIndex <= capacity(-1294617291))
              at io.netty.buffer.AbstractByteBuf.setIndex(AbstractByteBuf.java:129)
              at io.netty.buffer.CompositeByteBuf.setIndex(CompositeByteBuf.java:1688)
              at io.netty.buffer.CompositeByteBuf.<init>(CompositeByteBuf.java:110)
              at io.netty.buffer.Unpooled.wrappedBuffer(Unpooled.java:359)
              at org.apache.spark.util.io.ChunkedByteBuffer.toNetty(ChunkedByteBuffer.scala:87)
              at org.apache.spark.storage.ByteBufferBlockData.toNetty(BlockManager.scala:95)
              at org.apache.spark.storage.BlockManagerManagedBuffer.convertToNetty(BlockManagerManagedBuffer.scala:52)
              at org.apache.spark.network.protocol.MessageEncoder.encode(MessageEncoder.java:58)
              at org.apache.spark.network.protocol.MessageEncoder.encode(MessageEncoder.java:33)
              at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:88)
              ... 17 more
      
      

      A simple solution to this is to create a "FileRegion" which is backed by a ChunkedByteBuffer (spark's existing datastructure to support blocks > 2GB in memory).

      A drawback to this approach is that blocks that are cached in memory as deserialized values would need to have the entire block serialized into memory before it can be pushed. However, that would involve a larger change to the block manager as well, and is not strictly necessary, so can be handled separately as a performance improvement.

        Attachments

          Activity

            People

            • Assignee:
              irashid Imran Rashid
              Reporter:
              irashid Imran Rashid
            • Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: