Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-6235 Address various 2G limits
  3. SPARK-24307

Support sending messages over 2GB from memory

    XMLWordPrintableJSON

Details

    Description

      Spark's networking layer supports sending messages backed by a FileRegion or a ByteBuf. Sending large FileRegion's works, as netty supports large FileRegions. However, ByteBuf is limited to 2GB. This is particularly a problem for sending large datasets that are already in memory, eg. cached RDD blocks.

      eg. if you try to replicate a block stored in memory that is over 2 GB, you will see an exception like:

      18/05/16 12:40:57 ERROR client.TransportClient: Failed to send RPC 7420542363232096629 to xyz.com/172.31.113.213:44358: io.netty.handler.codec.EncoderException: java.lang.IndexOutOfBoundsException: readerIndex: 0, writerIndex: -1294617291 (expected: 0 <= readerIndex <= writerIndex <= capacity(-1294617291))
      io.netty.handler.codec.EncoderException: java.lang.IndexOutOfBoundsException: readerIndex: 0, writerIndex: -1294617291 (expected: 0 <= readerIndex <= writerIndex <= capacity(-1294617291))
              at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:106)
              at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
              at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
              at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
              at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
              at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
              at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
              at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
              at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
              at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1081)
              at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1128)
              at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070)
              at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
              at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
              at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
              at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
              at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.IndexOutOfBoundsException: readerIndex: 0, writerIndex: -1294617291 (expected: 0 <= readerIndex <= writerIndex <= capacity(-1294617291))
              at io.netty.buffer.AbstractByteBuf.setIndex(AbstractByteBuf.java:129)
              at io.netty.buffer.CompositeByteBuf.setIndex(CompositeByteBuf.java:1688)
              at io.netty.buffer.CompositeByteBuf.<init>(CompositeByteBuf.java:110)
              at io.netty.buffer.Unpooled.wrappedBuffer(Unpooled.java:359)
              at org.apache.spark.util.io.ChunkedByteBuffer.toNetty(ChunkedByteBuffer.scala:87)
              at org.apache.spark.storage.ByteBufferBlockData.toNetty(BlockManager.scala:95)
              at org.apache.spark.storage.BlockManagerManagedBuffer.convertToNetty(BlockManagerManagedBuffer.scala:52)
              at org.apache.spark.network.protocol.MessageEncoder.encode(MessageEncoder.java:58)
              at org.apache.spark.network.protocol.MessageEncoder.encode(MessageEncoder.java:33)
              at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:88)
              ... 17 more
      
      

      A simple solution to this is to create a "FileRegion" which is backed by a ChunkedByteBuffer (spark's existing datastructure to support blocks > 2GB in memory).

      A drawback to this approach is that blocks that are cached in memory as deserialized values would need to have the entire block serialized into memory before it can be pushed. However, that would involve a larger change to the block manager as well, and is not strictly necessary, so can be handled separately as a performance improvement.

      Attachments

        Activity

          People

            irashid Imran Rashid
            irashid Imran Rashid
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: