Details

    • Type: Umbrella
    • Status: In Progress
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Shuffle, Spark Core
    • Labels:
      None

      Description

      An umbrella ticket to track the various 2G limit we have in Spark, due to the use of byte arrays and ByteBuffers.

        Issue Links

          Activity

          Hide
          zhpengg Zhen Peng added a comment -

          Hi Reynold Xin, is there any update for this issue?

          Show
          zhpengg Zhen Peng added a comment - Hi Reynold Xin , is there any update for this issue?
          Hide
          rgande Ram Gande added a comment -

          Any progress on this. We are seeing this issue constantly in our Spark jobs. Really appreciate if you could provide us with an update.

          Show
          rgande Ram Gande added a comment - Any progress on this. We are seeing this issue constantly in our Spark jobs. Really appreciate if you could provide us with an update.
          Hide
          graphex Sean McKibben added a comment -

          When reading from HBase into spark, the regions seem to dictate the spark partition and thus the block size. Makes things very difficult.

          Show
          graphex Sean McKibben added a comment - When reading from HBase into spark, the regions seem to dictate the spark partition and thus the block size. Makes things very difficult.
          Hide
          glenn.strycker@gmail.com Glenn Strycker added a comment -

          Until this issue and sub-issue tickets are solved, are there any known work-arounds? Increase number of partitions, or decrease? Split up RDDs into parts, run your command, and then union? Turn off Kryo? Use dataframes? Help!!

          I am encountering the 2GB bug on attempting to simply (re)partition by key an RDD of modest size (84GB) and low skew (AFAIK). I have my memory requests per executor, per master node, per Java, etc. all cranked up as far as they'll go, and I'm currently attempting to partition this RDD across 6800 partitions. Unless my skew is really bad, I don't see why 12MB per partition would be causing a shuffle to hit the 2GB limit, unless the overhead of so many partitions is actually hurting rather than helping. I'm going to try adjusting my partition number and see what happens, but I wanted to know if there is a standard work-around answer to this 2GB issue.

          Show
          glenn.strycker@gmail.com Glenn Strycker added a comment - Until this issue and sub-issue tickets are solved, are there any known work-arounds? Increase number of partitions, or decrease? Split up RDDs into parts, run your command, and then union? Turn off Kryo? Use dataframes? Help!! I am encountering the 2GB bug on attempting to simply (re)partition by key an RDD of modest size (84GB) and low skew (AFAIK). I have my memory requests per executor, per master node, per Java, etc. all cranked up as far as they'll go, and I'm currently attempting to partition this RDD across 6800 partitions. Unless my skew is really bad, I don't see why 12MB per partition would be causing a shuffle to hit the 2GB limit, unless the overhead of so many partitions is actually hurting rather than helping. I'm going to try adjusting my partition number and see what happens, but I wanted to know if there is a standard work-around answer to this 2GB issue.
          Hide
          rxin Reynold Xin added a comment -

          Is your data skewed? i.e. maybe there is a single key that's enormous?

          Show
          rxin Reynold Xin added a comment - Is your data skewed? i.e. maybe there is a single key that's enormous?
          Hide
          glenn.strycker@gmail.com Glenn Strycker added a comment -

          I don't think so, but I can check. My RDD came from an RDD of type (K,V) that was partitioned by key and worked just fine... my new RDD that is failing is attempting to map the value V to the K, so that (V, K) is now going to be partitioned by the value (now the key) instead. So I can try running some checks of multiplicity to see if my values have some kind of skew... unfortunately most of those checks are going to involve reduceByKey-like operations that will probably result in 2GB failures themselves... I was hoping to get the mapping and partitioning of (K,V) -> (V,K) accomplished first before running such checks. Thanks for the suggestion, though!

          Show
          glenn.strycker@gmail.com Glenn Strycker added a comment - I don't think so, but I can check. My RDD came from an RDD of type (K,V) that was partitioned by key and worked just fine... my new RDD that is failing is attempting to map the value V to the K, so that (V, K) is now going to be partitioned by the value (now the key) instead. So I can try running some checks of multiplicity to see if my values have some kind of skew... unfortunately most of those checks are going to involve reduceByKey-like operations that will probably result in 2GB failures themselves... I was hoping to get the mapping and partitioning of (K,V) -> (V,K) accomplished first before running such checks. Thanks for the suggestion, though!
          Hide
          bdolbeare Brian added a comment -

          How is it possible that Spark 2.0 comes and and this bug isn't solved? A quick Google search fort "Spark 2GB limit" or "Spark Integer.MAX_VALUE" shows that this is a very real problem that affects lots of users. From the outside looking in, it seems like the Spark developers don't have an interest in solving this bug since it's been around for years at this point (including the jiras this consolidated ticket replaced). Can you provide some sort of an update? Maybe if you don't plan on fixing this issue, you can close the ticket or mark it as won't fix. At least that way we'd have some insight in to your plans....Thanks!

          Show
          bdolbeare Brian added a comment - How is it possible that Spark 2.0 comes and and this bug isn't solved? A quick Google search fort "Spark 2GB limit" or "Spark Integer.MAX_VALUE" shows that this is a very real problem that affects lots of users. From the outside looking in, it seems like the Spark developers don't have an interest in solving this bug since it's been around for years at this point (including the jiras this consolidated ticket replaced). Can you provide some sort of an update? Maybe if you don't plan on fixing this issue, you can close the ticket or mark it as won't fix. At least that way we'd have some insight in to your plans....Thanks!
          Hide
          srowen Sean Owen added a comment -

          I think the short answer is, it's very hard. I am not sure it's useful to say "I guess you all don't care". Please have a look at Imran's tickets and jump in. In practice, it's not a big limit, since hitting it means something else in the app can be designed better.

          Show
          srowen Sean Owen added a comment - I think the short answer is, it's very hard. I am not sure it's useful to say "I guess you all don't care". Please have a look at Imran's tickets and jump in. In practice, it's not a big limit, since hitting it means something else in the app can be designed better.
          Hide
          gq Guoqiang Li added a comment -

          I'm doing this work and I'll put the patch in this month.

          Show
          gq Guoqiang Li added a comment - I'm doing this work and I'll put the patch in this month.
          Hide
          hvanhovell Herman van Hovell added a comment -

          Guoqiang Li it might be a good idea to share some design before pressing ahead. This seems to be a complex issue, that probably needs some discussion on the approach, before pressing ahead with a PR.

          If we don't take this precaution, you might end up putting a lot of time in a very complex and very difficult to review PR.

          Show
          hvanhovell Herman van Hovell added a comment - Guoqiang Li it might be a good idea to share some design before pressing ahead. This seems to be a complex issue, that probably needs some discussion on the approach, before pressing ahead with a PR. If we don't take this precaution, you might end up putting a lot of time in a very complex and very difficult to review PR.
          Hide
          gq Guoqiang Li added a comment - - edited

          Herman van Hovell
          The main changes.

          1. Replace DiskStore method def getBytes (blockId: BlockId): ChunkedByteBuffer to def getBlockData(blockId: BlockId): ManagedBuffer.

          2. ManagedBuffer's nioByteBuffer method return ChunkedByteBuffer.

          3. Add Class ChunkFetchInputStream, used for flow control and code as follows:

          package org.apache.spark.network.client;
          
          import java.io.IOException;
          import java.io.InputStream;
          import java.nio.channels.ClosedChannelException;
          import java.util.Iterator;
          import java.util.concurrent.LinkedBlockingQueue;
          import java.util.concurrent.atomic.AtomicBoolean;
          import java.util.concurrent.atomic.AtomicReference;
          
          import com.google.common.primitives.UnsignedBytes;
          import io.netty.buffer.ByteBuf;
          import io.netty.channel.Channel;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          
          import org.apache.spark.network.buffer.ChunkedByteBuffer;
          import org.apache.spark.network.buffer.ManagedBuffer;
          import org.apache.spark.network.protocol.StreamChunkId;
          import org.apache.spark.network.util.LimitedInputStream;
          import org.apache.spark.network.util.TransportFrameDecoder;
          
          public class ChunkFetchInputStream extends InputStream {
            private final Logger logger = LoggerFactory.getLogger(ChunkFetchInputStream.class);
          
            private final TransportResponseHandler handler;
            private final Channel channel;
            private final StreamChunkId streamId;
            private final long byteCount;
            private final ChunkReceivedCallback callback;
            private final LinkedBlockingQueue<ByteBuf> buffers = new LinkedBlockingQueue<>(1024);
            public final TransportFrameDecoder.Interceptor interceptor;
          
            private ByteBuf curChunk;
            private boolean isCallbacked = false;
            private long writerIndex = 0;
          
            private final AtomicReference<Throwable> cause = new AtomicReference<>(null);
            private final AtomicBoolean isClosed = new AtomicBoolean(false);
          
            public ChunkFetchInputStream(
                TransportResponseHandler handler,
                Channel channel,
                StreamChunkId streamId,
                long byteCount,
                ChunkReceivedCallback callback) {
              this.handler = handler;
              this.channel = channel;
              this.streamId = streamId;
              this.byteCount = byteCount;
              this.callback = callback;
              this.interceptor = new StreamInterceptor();
            }
          
            @Override
            public int read() throws IOException {
              if (isClosed.get()) return -1;
              pullChunk();
              if (curChunk != null) {
                byte b = curChunk.readByte();
                return UnsignedBytes.toInt(b);
              } else {
                return -1;
              }
            }
          
            @Override
            public int read(byte[] dest, int offset, int length) throws IOException {
              if (isClosed.get()) return -1;
              pullChunk();
              if (curChunk != null) {
                int amountToGet = Math.min(curChunk.readableBytes(), length);
                curChunk.readBytes(dest, offset, amountToGet);
                return amountToGet;
              } else {
                return -1;
              }
            }
          
            @Override
            public long skip(long bytes) throws IOException {
              if (isClosed.get()) return 0L;
              pullChunk();
              if (curChunk != null) {
                int amountToSkip = (int) Math.min(bytes, curChunk.readableBytes());
                curChunk.skipBytes(amountToSkip);
                return amountToSkip;
              } else {
                return 0L;
              }
            }
          
            @Override
            public void close() throws IOException {
              if (!isClosed.get()) {
                releaseCurChunk();
                isClosed.set(true);
                resetChannel();
                Iterator<ByteBuf> itr = buffers.iterator();
                while (itr.hasNext()) {
                  itr.next().release();
                }
                buffers.clear();
              }
            }
          
            private void pullChunk() throws IOException {
              if (curChunk != null && !curChunk.isReadable()) releaseCurChunk();
              if (curChunk == null && cause.get() == null && !isClosed.get()) {
                try {
                  curChunk = buffers.take();
                  // if channel.read() will be not invoked automatically,
                  // the method is called by here
                  if (!channel.config().isAutoRead()) channel.read();
                } catch (Throwable e) {
                  setCause(e);
                }
              }
              if (cause.get() != null) throw new IOException(cause.get());
            }
          
            private void setCause(Throwable e) {
              if (cause.get() == null) cause.set(e);
            }
          
            private void releaseCurChunk() {
              if (curChunk != null) {
                curChunk.release();
                curChunk = null;
              }
            }
          
            private void onSuccess() throws IOException {
              if (isCallbacked) return;
              if (cause.get() != null) {
                callback.onFailure(streamId.chunkIndex, cause.get());
              } else {
                InputStream inputStream = new LimitedInputStream(this, byteCount);
                ManagedBuffer managedBuffer = new InputStreamManagedBuffer(inputStream, byteCount);
                callback.onSuccess(streamId.chunkIndex, managedBuffer);
              }
              isCallbacked = true;
            }
          
            private void resetChannel() {
              if (!channel.config().isAutoRead()) {
                channel.config().setAutoRead(true);
                channel.read();
              }
            }
          
            private class StreamInterceptor implements TransportFrameDecoder.Interceptor {
              @Override
              public void exceptionCaught(Throwable e) throws Exception {
                handler.deactivateStream();
                setCause(e);
                logger.trace("exceptionCaught", e);
                onSuccess();
                resetChannel();
              }
          
              @Override
              public void channelInactive() throws Exception {
                handler.deactivateStream();
                setCause(new ClosedChannelException());
                logger.trace("channelInactive", cause.get());
                onSuccess();
                resetChannel();
              }
          
              @Override
              public boolean handle(ByteBuf buf) throws Exception {
                try {
                  ByteBuf frame = nextBufferForFrame(byteCount - writerIndex, buf);
                  int available = frame.readableBytes();
                  writerIndex += available;
                  mayTrafficSuspension();
                  if (!isClosed.get() && available > 0) {
                    buffers.put(frame);
                    if (writerIndex > byteCount) {
                      setCause(new IllegalStateException(String.format(
                          "Read too many bytes? Expected %d, but read %d.", byteCount, writerIndex)));
                      handler.deactivateStream();
                    } else if (writerIndex == byteCount) {
                      handler.deactivateStream();
                    }
                  } else {
                    frame.release();
                  }
                  logger.trace(streamId + ", writerIndex  " + writerIndex + " byteCount, " + byteCount);
                  onSuccess();
                } catch (Exception e) {
                  setCause(e);
                  resetChannel();
                }
                return writerIndex != byteCount;
              }
          
              /**
               * Takes the first buffer in the internal list, and either adjust it to fit in the frame
               * (by taking a slice out of it) or remove it from the internal list.
               */
              private ByteBuf nextBufferForFrame(long bytesToRead, ByteBuf buf) {
                int slen = (int) Math.min(buf.readableBytes(), bytesToRead);
                ByteBuf frame;
                if (slen == buf.readableBytes()) {
                  frame = buf.retain().readSlice(slen);
                } else {
                  frame = buf.alloc().buffer(slen);
                  buf.readBytes(frame);
                  frame.retain();
                }
                return frame;
              }
          
              private void mayTrafficSuspension() {
                // If there is too much cached chunk, to manually call channel.read().
                if (channel.config().isAutoRead() && buffers.size() > 31) {
                  channel.config().setAutoRead(false);
                }
                if (writerIndex >= byteCount) resetChannel();
              }
            }
          
            private class InputStreamManagedBuffer extends ManagedBuffer {
              private final InputStream inputStream;
              private final long byteCount;
          
              InputStreamManagedBuffer(InputStream inputStream, long byteCount) {
                this.inputStream = inputStream;
                this.byteCount = byteCount;
              }
          
              public long size() {
                return byteCount;
              }
          
              public ChunkedByteBuffer nioByteBuffer() throws IOException {
                throw new UnsupportedOperationException("nioByteBuffer");
              }
          
              public InputStream createInputStream() throws IOException {
                return inputStream;
              }
          
              public ManagedBuffer retain() {
                // throw new UnsupportedOperationException("retain");
                return this;
              }
          
              public ManagedBuffer release() {
                // throw new UnsupportedOperationException("release");
                return this;
              }
          
              public Object convertToNetty() throws IOException {
                throw new UnsupportedOperationException("convertToNetty");
              }
            }
          }
          
          
          Show
          gq Guoqiang Li added a comment - - edited Herman van Hovell The main changes. 1. Replace DiskStore method def getBytes (blockId: BlockId): ChunkedByteBuffer to def getBlockData(blockId: BlockId): ManagedBuffer . 2. ManagedBuffer's nioByteBuffer method return ChunkedByteBuffer. 3. Add Class ChunkFetchInputStream , used for flow control and code as follows: package org.apache.spark.network.client; import java.io.IOException; import java.io.InputStream; import java.nio.channels.ClosedChannelException; import java.util.Iterator; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import com.google.common.primitives.UnsignedBytes; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.network.buffer.ChunkedByteBuffer; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.protocol.StreamChunkId; import org.apache.spark.network.util.LimitedInputStream; import org.apache.spark.network.util.TransportFrameDecoder; public class ChunkFetchInputStream extends InputStream { private final Logger logger = LoggerFactory.getLogger(ChunkFetchInputStream.class); private final TransportResponseHandler handler; private final Channel channel; private final StreamChunkId streamId; private final long byteCount; private final ChunkReceivedCallback callback; private final LinkedBlockingQueue<ByteBuf> buffers = new LinkedBlockingQueue<>(1024); public final TransportFrameDecoder.Interceptor interceptor; private ByteBuf curChunk; private boolean isCallbacked = false; private long writerIndex = 0; private final AtomicReference<Throwable> cause = new AtomicReference<>(null); private final AtomicBoolean isClosed = new AtomicBoolean(false); public ChunkFetchInputStream( TransportResponseHandler handler, Channel channel, StreamChunkId streamId, long byteCount, ChunkReceivedCallback callback) { this.handler = handler; this.channel = channel; this.streamId = streamId; this.byteCount = byteCount; this.callback = callback; this.interceptor = new StreamInterceptor(); } @Override public int read() throws IOException { if (isClosed.get()) return -1; pullChunk(); if (curChunk != null) { byte b = curChunk.readByte(); return UnsignedBytes.toInt(b); } else { return -1; } } @Override public int read(byte[] dest, int offset, int length) throws IOException { if (isClosed.get()) return -1; pullChunk(); if (curChunk != null) { int amountToGet = Math.min(curChunk.readableBytes(), length); curChunk.readBytes(dest, offset, amountToGet); return amountToGet; } else { return -1; } } @Override public long skip(long bytes) throws IOException { if (isClosed.get()) return 0L; pullChunk(); if (curChunk != null) { int amountToSkip = (int) Math.min(bytes, curChunk.readableBytes()); curChunk.skipBytes(amountToSkip); return amountToSkip; } else { return 0L; } } @Override public void close() throws IOException { if (!isClosed.get()) { releaseCurChunk(); isClosed.set(true); resetChannel(); Iterator<ByteBuf> itr = buffers.iterator(); while (itr.hasNext()) { itr.next().release(); } buffers.clear(); } } private void pullChunk() throws IOException { if (curChunk != null && !curChunk.isReadable()) releaseCurChunk(); if (curChunk == null && cause.get() == null && !isClosed.get()) { try { curChunk = buffers.take(); // if channel.read() will be not invoked automatically, // the method is called by here if (!channel.config().isAutoRead()) channel.read(); } catch (Throwable e) { setCause(e); } } if (cause.get() != null) throw new IOException(cause.get()); } private void setCause(Throwable e) { if (cause.get() == null) cause.set(e); } private void releaseCurChunk() { if (curChunk != null) { curChunk.release(); curChunk = null; } } private void onSuccess() throws IOException { if (isCallbacked) return; if (cause.get() != null) { callback.onFailure(streamId.chunkIndex, cause.get()); } else { InputStream inputStream = new LimitedInputStream(this, byteCount); ManagedBuffer managedBuffer = new InputStreamManagedBuffer(inputStream, byteCount); callback.onSuccess(streamId.chunkIndex, managedBuffer); } isCallbacked = true; } private void resetChannel() { if (!channel.config().isAutoRead()) { channel.config().setAutoRead(true); channel.read(); } } private class StreamInterceptor implements TransportFrameDecoder.Interceptor { @Override public void exceptionCaught(Throwable e) throws Exception { handler.deactivateStream(); setCause(e); logger.trace("exceptionCaught", e); onSuccess(); resetChannel(); } @Override public void channelInactive() throws Exception { handler.deactivateStream(); setCause(new ClosedChannelException()); logger.trace("channelInactive", cause.get()); onSuccess(); resetChannel(); } @Override public boolean handle(ByteBuf buf) throws Exception { try { ByteBuf frame = nextBufferForFrame(byteCount - writerIndex, buf); int available = frame.readableBytes(); writerIndex += available; mayTrafficSuspension(); if (!isClosed.get() && available > 0) { buffers.put(frame); if (writerIndex > byteCount) { setCause(new IllegalStateException(String.format( "Read too many bytes? Expected %d, but read %d.", byteCount, writerIndex))); handler.deactivateStream(); } else if (writerIndex == byteCount) { handler.deactivateStream(); } } else { frame.release(); } logger.trace(streamId + ", writerIndex " + writerIndex + " byteCount, " + byteCount); onSuccess(); } catch (Exception e) { setCause(e); resetChannel(); } return writerIndex != byteCount; } /** * Takes the first buffer in the internal list, and either adjust it to fit in the frame * (by taking a slice out of it) or remove it from the internal list. */ private ByteBuf nextBufferForFrame(long bytesToRead, ByteBuf buf) { int slen = (int) Math.min(buf.readableBytes(), bytesToRead); ByteBuf frame; if (slen == buf.readableBytes()) { frame = buf.retain().readSlice(slen); } else { frame = buf.alloc().buffer(slen); buf.readBytes(frame); frame.retain(); } return frame; } private void mayTrafficSuspension() { // If there is too much cached chunk, to manually call channel.read(). if (channel.config().isAutoRead() && buffers.size() > 31) { channel.config().setAutoRead(false); } if (writerIndex >= byteCount) resetChannel(); } } private class InputStreamManagedBuffer extends ManagedBuffer { private final InputStream inputStream; private final long byteCount; InputStreamManagedBuffer(InputStream inputStream, long byteCount) { this.inputStream = inputStream; this.byteCount = byteCount; } public long size() { return byteCount; } public ChunkedByteBuffer nioByteBuffer() throws IOException { throw new UnsupportedOperationException("nioByteBuffer"); } public InputStream createInputStream() throws IOException { return inputStream; } public ManagedBuffer retain() { // throw new UnsupportedOperationException("retain"); return this; } public ManagedBuffer release() { // throw new UnsupportedOperationException("release"); return this; } public Object convertToNetty() throws IOException { throw new UnsupportedOperationException("convertToNetty"); } } }
          Hide
          srowen Sean Owen added a comment -

          How does this relate to the existing subtasks and their design/content? this looks like a piece of managing chunked data, but this isn't the hard part at all.

          Show
          srowen Sean Owen added a comment - How does this relate to the existing subtasks and their design/content? this looks like a piece of managing chunked data, but this isn't the hard part at all.
          Hide
          apachespark Apache Spark added a comment -

          User 'witgo' has created a pull request for this issue:
          https://github.com/apache/spark/pull/14647

          Show
          apachespark Apache Spark added a comment - User 'witgo' has created a pull request for this issue: https://github.com/apache/spark/pull/14647
          Hide
          gq Guoqiang Li added a comment -

          Yes, it contains a lot of minor changes, eg: Replace ByteBuffer with ChunkedByteBuffer

          Show
          gq Guoqiang Li added a comment - Yes, it contains a lot of minor changes, eg: Replace ByteBuffer with ChunkedByteBuffer
          Hide
          gq Guoqiang Li added a comment -

          Preliminary Design Document.

          Show
          gq Guoqiang Li added a comment - Preliminary Design Document.
          Hide
          apachespark Apache Spark added a comment -

          User 'witgo' has created a pull request for this issue:
          https://github.com/apache/spark/pull/14977

          Show
          apachespark Apache Spark added a comment - User 'witgo' has created a pull request for this issue: https://github.com/apache/spark/pull/14977
          Hide
          apachespark Apache Spark added a comment -

          User 'witgo' has created a pull request for this issue:
          https://github.com/apache/spark/pull/14995

          Show
          apachespark Apache Spark added a comment - User 'witgo' has created a pull request for this issue: https://github.com/apache/spark/pull/14995
          Hide
          gq Guoqiang Li added a comment -

          Reynold Xin Any comments?

          Show
          gq Guoqiang Li added a comment - Reynold Xin Any comments?
          Hide
          gq Guoqiang Li added a comment -
          Show
          gq Guoqiang Li added a comment - ping Reynold Xin
          Hide
          jamiehutton Jamie Hutton added a comment -

          Hi there, Is there any update on when this will be included in a spark release?

          Show
          jamiehutton Jamie Hutton added a comment - Hi there, Is there any update on when this will be included in a spark release?

            People

            • Assignee:
              Unassigned
              Reporter:
              rxin Reynold Xin
            • Votes:
              46 Vote for this issue
              Watchers:
              87 Start watching this issue

              Dates

              • Created:
                Updated:

                Development