Details

    • Type: Improvement
    • Status: Closed
    • Priority: Critical
    • Resolution: Duplicate
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Spark Core
    • Labels:
      None
    • Environment:

      all

      Description

      The underlying abstraction for blocks in spark is a ByteBuffer : which limits the size of the block to 2GB.
      This has implication not just for managed blocks in use, but also for shuffle blocks (memory mapped blocks are limited to 2gig, even though the api allows for long), ser-deser via byte array backed outstreams (SPARK-1391), etc.

      This is a severe limitation for use of spark when used on non trivial datasets.

      1. 2g_fix_proposal.pdf
        76 kB
        Mridul Muralidharan

        Issue Links

          Activity

          Hide
          mridulm80 Mridul Muralidharan added a comment -

          WIP Proposal:

          • All references to ByteBuffer will need to be replaced with Seq[ByteBuffer].
            This applies to definition of a block, memory mapped file segments for a shuffle block, etc.
          • All use of byte array backed outputstream will need to be replaced with a aggregating outputstream which writes to multiple boas as and when array limits are hit.
          Show
          mridulm80 Mridul Muralidharan added a comment - WIP Proposal: All references to ByteBuffer will need to be replaced with Seq [ByteBuffer] . This applies to definition of a block, memory mapped file segments for a shuffle block, etc. All use of byte array backed outputstream will need to be replaced with a aggregating outputstream which writes to multiple boas as and when array limits are hit.
          Hide
          pwendell Patrick Wendell added a comment - - edited

          This says it's a "severe limitation" - but why not just use more, smaller blocks? I think Spark's design assumes in various places that block's are not extremely large. Think of it like e.g. the HDFS block size... it can't be arbitrary large. The answer here might be to use multiple blocks in the case of something like a shuffle where the size can get really large.

          Show
          pwendell Patrick Wendell added a comment - - edited This says it's a "severe limitation" - but why not just use more, smaller blocks? I think Spark's design assumes in various places that block's are not extremely large. Think of it like e.g. the HDFS block size... it can't be arbitrary large. The answer here might be to use multiple blocks in the case of something like a shuffle where the size can get really large.
          Hide
          matei Matei Zaharia added a comment -

          I agree, would be good to understand what kind of operations this arises in. Do you have cached RDD partitions that are this large, or is it shuffle blocks? Is it skew in the shuffle data?

          The main concern I see with this is that it would complicate the deserializer and block sender code paths, but maybe it's worth it.

          Show
          matei Matei Zaharia added a comment - I agree, would be good to understand what kind of operations this arises in. Do you have cached RDD partitions that are this large, or is it shuffle blocks? Is it skew in the shuffle data? The main concern I see with this is that it would complicate the deserializer and block sender code paths, but maybe it's worth it.
          Hide
          mridulm80 Mridul Muralidharan added a comment - - edited

          There are multiple issues at play here :

          a) If a block goes beyond 2G, everything fails - this is the case for shuffle, cached and 'normal' blocks.
          Irrespective of storage level and/or other config.
          In a lot of cases, particularly when data generated 'increases' after a map/flatMap/etc, the user has no control over the size increase in the output block for a given input block (think iterations over datasets).

          b) Increasing number of partitions is not always an option (for the subset of usecases where it can be done) :
          1) It has an impact on number of intermediate files created while doing a shuffle. (ulimit constraints, IO performance issues, etc).
          2) It does not help when there is skew anyway.

          c) Compression codec, serializer used, etc have an impact.

          d) 2G is extremely low limit to have in modern hardware : and this is particularly a severe limitation when we have nodes running on 32G to 64G ram and TB's of disk space available for spark.

          To address specific points raised above :

          A) Patrick Wendell Mapreduce jobs dont fail in case the block size of files increases - it might be inefficient, it still runs (not that I know of any case where it does actually, but theoretically I guess it can become inefficient).
          So analogy does not apply.
          Also to add, 2G is not really an unlimited increase in block size - and in MR, output of a map can easily go a couple of orders above 2G : whether it is followed by a reduce or not.

          B) Matei Zaharia In the specific cases it was failing, the users were not caching the data but directly going to shuffle.
          There was no skew from what I see : just the data size per key is high; and there are a lot of keys too btw (as iterations increase and nnz increases).
          Note that it was an impl detail that it was not being cached - it could have been too.
          Additionally, compression and/or serialization also apply implicitly in this case, since it was impacting shuffle - the 2G limit was observed at both the map and reduce side (in two different jobs).

          In general, our effort is to make spark as a drop in replacement for most usecases which are currently being done via MR/Pig/etc.
          Limitations of this sort make it difficult to position spark as a credible alternative.

          Current approach we are exploring is to remove all direct references to ByteBuffer from spark (except for ConnectionManager, etc parts); and rely on a BlockData or similar datastructure which encapsulate the data corresponding to a block. By default, a single ByteBuffer should suffice but in case it does not, the class will automatically take care of splitting across blocks.
          Similarly, all references to byte array backed streams will need to be replaced with a wrapper stream which multiplexes over byte array streams.
          The performance impact for all 'normal' usecases should be the minimal, while allowing for spark to be used in cases where 2G limit is being hit.

          The only unknown here is tachyon integration : where the interface is a ByteBuffer - and I am not knowledgable enough to comment on what the issues there would be.

          Show
          mridulm80 Mridul Muralidharan added a comment - - edited There are multiple issues at play here : a) If a block goes beyond 2G, everything fails - this is the case for shuffle, cached and 'normal' blocks. Irrespective of storage level and/or other config. In a lot of cases, particularly when data generated 'increases' after a map/flatMap/etc, the user has no control over the size increase in the output block for a given input block (think iterations over datasets). b) Increasing number of partitions is not always an option (for the subset of usecases where it can be done) : 1) It has an impact on number of intermediate files created while doing a shuffle. (ulimit constraints, IO performance issues, etc). 2) It does not help when there is skew anyway. c) Compression codec, serializer used, etc have an impact. d) 2G is extremely low limit to have in modern hardware : and this is particularly a severe limitation when we have nodes running on 32G to 64G ram and TB's of disk space available for spark. To address specific points raised above : A) Patrick Wendell Mapreduce jobs dont fail in case the block size of files increases - it might be inefficient, it still runs (not that I know of any case where it does actually, but theoretically I guess it can become inefficient). So analogy does not apply. Also to add, 2G is not really an unlimited increase in block size - and in MR, output of a map can easily go a couple of orders above 2G : whether it is followed by a reduce or not. B) Matei Zaharia In the specific cases it was failing, the users were not caching the data but directly going to shuffle. There was no skew from what I see : just the data size per key is high; and there are a lot of keys too btw (as iterations increase and nnz increases). Note that it was an impl detail that it was not being cached - it could have been too. Additionally, compression and/or serialization also apply implicitly in this case, since it was impacting shuffle - the 2G limit was observed at both the map and reduce side (in two different jobs). In general, our effort is to make spark as a drop in replacement for most usecases which are currently being done via MR/Pig/etc. Limitations of this sort make it difficult to position spark as a credible alternative. Current approach we are exploring is to remove all direct references to ByteBuffer from spark (except for ConnectionManager, etc parts); and rely on a BlockData or similar datastructure which encapsulate the data corresponding to a block. By default, a single ByteBuffer should suffice but in case it does not, the class will automatically take care of splitting across blocks. Similarly, all references to byte array backed streams will need to be replaced with a wrapper stream which multiplexes over byte array streams. The performance impact for all 'normal' usecases should be the minimal, while allowing for spark to be used in cases where 2G limit is being hit. The only unknown here is tachyon integration : where the interface is a ByteBuffer - and I am not knowledgable enough to comment on what the issues there would be.
          Hide
          pwendell Patrick Wendell added a comment - - edited

          Mridul Muralidharan I think the proposed change would benefit from a design doc to explain exactly the cases we want to fix and what trade-offs we are willing to make in terms of complexity.

          Agreed that there is definitely room for improvement in the out-of-the-box behavior here.

          Right now the limits as I understand them are (a) the shuffle output from one mapper to one reducer cannot be more than 2GB. (b) partitions of an RDD cannot exceed 2GB.

          I see (a) as the bigger of the two issues. It would be helpful to have specific examples of workloads where this causes a problem and the associated data sizes, etc. For instance, say I want to do a 1 Terabyte shuffle. Right now number of (mappers * reducers) needs to be > ~1000 for this to work (e.g. 100 mappers and 10 reducers) assuming uniform partitioning. That doesn't seem too crazy of an assumption, but if you have skew this would be a much bigger problem.

          Would it be possible to improve (a) but not (b) with a much simpler design? I'm not sure (maybe they reduce to the same problem), but it's something a design doc could help flesh out.

          Popping up a bit - I think our goal should be to handle reasonable workloads and not to be 100% compliant with the semantics of Hadoop MapReduce. After all, in-memory RDD's are not even a concept in MapReduce. And keep in mind that MapReduce became so bloated/complex of a project that it is today no longer possible to make substantial changes to it. That's something we definitely want to avoid by keeping Spark internals as simple as possible.

          Show
          pwendell Patrick Wendell added a comment - - edited Mridul Muralidharan I think the proposed change would benefit from a design doc to explain exactly the cases we want to fix and what trade-offs we are willing to make in terms of complexity. Agreed that there is definitely room for improvement in the out-of-the-box behavior here. Right now the limits as I understand them are (a) the shuffle output from one mapper to one reducer cannot be more than 2GB. (b) partitions of an RDD cannot exceed 2GB. I see (a) as the bigger of the two issues. It would be helpful to have specific examples of workloads where this causes a problem and the associated data sizes, etc. For instance, say I want to do a 1 Terabyte shuffle. Right now number of (mappers * reducers) needs to be > ~1000 for this to work (e.g. 100 mappers and 10 reducers) assuming uniform partitioning. That doesn't seem too crazy of an assumption, but if you have skew this would be a much bigger problem. Would it be possible to improve (a) but not (b) with a much simpler design? I'm not sure (maybe they reduce to the same problem), but it's something a design doc could help flesh out. Popping up a bit - I think our goal should be to handle reasonable workloads and not to be 100% compliant with the semantics of Hadoop MapReduce. After all, in-memory RDD's are not even a concept in MapReduce. And keep in mind that MapReduce became so bloated/complex of a project that it is today no longer possible to make substantial changes to it. That's something we definitely want to avoid by keeping Spark internals as simple as possible.
          Hide
          matei Matei Zaharia added a comment -

          Hey Mridul, the one thing I'd add as an alternative is whether we could have splitting happen at a higher level than the block manager. For example, maybe a map task is allowed to create 2 output blocks for a given reducer, or maybe a cached RDD partition gets stored as 2 blocks. This might be slightly easier to implement than replacing all instances of ByteBuffers. But I agree that this should be addressed somehow, since 2 GB will become more and more limiting over time. Anyway, I'd love to see a more detailed design. I think even the replace-ByteBuffers approach you proposed can be made to work with Tachyon.

          Show
          matei Matei Zaharia added a comment - Hey Mridul, the one thing I'd add as an alternative is whether we could have splitting happen at a higher level than the block manager. For example, maybe a map task is allowed to create 2 output blocks for a given reducer, or maybe a cached RDD partition gets stored as 2 blocks. This might be slightly easier to implement than replacing all instances of ByteBuffers. But I agree that this should be addressed somehow, since 2 GB will become more and more limiting over time. Anyway, I'd love to see a more detailed design. I think even the replace-ByteBuffers approach you proposed can be made to work with Tachyon.
          Hide
          mridulm80 Mridul Muralidharan added a comment -

          Patrick Wendell IMO both are limitations (shuffle block < 2G vs any rdd block < 2G) though the former is slightly more trickier to work around at times (when there is skew, etc).
          Note that trying to work around latter will mean more partitions which also has an impact on functionality and shuffle performance (ulimit limits enforced will mean job fails again).
          Ideally though, I would prefer if we did not need to work around these issues actually.

          This is one of two jira's we want to work on for the next release - the other being improving shuffle performance, particularly in context of number of files which are created and/or opened.
          Since that is a more involved work, Thomas Graves and I will file that later.

          Agree about need for a design document - unfortunately, we are flying blind right now since we dont know what all will need to change (tachyon integration I mentioned above was something we hit recently). Basic idea is known - but not sure what other things this will impact.
          Current plan is to do a POC, iron out the issues seen and run it on one of the jobs which is failing right now : address performance and/or functionality issues seen - and then submit a PR based on the conclusions.
          I filed this JIRA to get basic inputs from other developers and/or users; and to use it as a sounding board in case we hit particularly thorny issues we cant resolve due to lack of sufficient context - crowdsourcing design/implementation issues

          Matei Zaharia Interesting that you should mention about splitting output of a map into multiple blocks.
          We are actually thinking about that in a different context - akin to MultiOutputs in hadoop or SPLIT in pig : without needing to cache the intermediate output; but directly emit values to different blocks/rdd's based on the output of a map or some such.
          In the context of splitting the output into multiple blocks : I was not so sure - particularly given the need for compression, custom serialization, etc.
          Also, we have degenerate cases where the value for a key actually becomes > 2G (list of sparse vectors which become denser as iterations increase, etc)

          Show
          mridulm80 Mridul Muralidharan added a comment - Patrick Wendell IMO both are limitations (shuffle block < 2G vs any rdd block < 2G) though the former is slightly more trickier to work around at times (when there is skew, etc). Note that trying to work around latter will mean more partitions which also has an impact on functionality and shuffle performance (ulimit limits enforced will mean job fails again). Ideally though, I would prefer if we did not need to work around these issues actually. This is one of two jira's we want to work on for the next release - the other being improving shuffle performance, particularly in context of number of files which are created and/or opened. Since that is a more involved work, Thomas Graves and I will file that later. Agree about need for a design document - unfortunately, we are flying blind right now since we dont know what all will need to change (tachyon integration I mentioned above was something we hit recently). Basic idea is known - but not sure what other things this will impact. Current plan is to do a POC, iron out the issues seen and run it on one of the jobs which is failing right now : address performance and/or functionality issues seen - and then submit a PR based on the conclusions. I filed this JIRA to get basic inputs from other developers and/or users; and to use it as a sounding board in case we hit particularly thorny issues we cant resolve due to lack of sufficient context - crowdsourcing design/implementation issues Matei Zaharia Interesting that you should mention about splitting output of a map into multiple blocks. We are actually thinking about that in a different context - akin to MultiOutputs in hadoop or SPLIT in pig : without needing to cache the intermediate output; but directly emit values to different blocks/rdd's based on the output of a map or some such. In the context of splitting the output into multiple blocks : I was not so sure - particularly given the need for compression, custom serialization, etc. Also, we have degenerate cases where the value for a key actually becomes > 2G (list of sparse vectors which become denser as iterations increase, etc)
          Hide
          pwendell Patrick Wendell added a comment - - edited

          Okay sounds good - a POC like that would be really helpful. We've run some very large shuffles recently and couldn't isolate any problems except for SPARK-1239, which should only be a small change.

          If there are some smaller fixes you run into then by all means submit them directly. If it requires large architectural changes I'd recommend having a design doc before submitting a pull request, because people will want to discuss the overall approach. I.e. we should avoid being "break-fix" and think about the long term design implications of changes.

          Show
          pwendell Patrick Wendell added a comment - - edited Okay sounds good - a POC like that would be really helpful. We've run some very large shuffles recently and couldn't isolate any problems except for SPARK-1239 , which should only be a small change. If there are some smaller fixes you run into then by all means submit them directly. If it requires large architectural changes I'd recommend having a design doc before submitting a pull request, because people will want to discuss the overall approach. I.e. we should avoid being "break-fix" and think about the long term design implications of changes.
          Hide
          mridulm80 Mridul Muralidharan added a comment -

          Matei Zaharia We are having some issue porting the netty shuffle copier code to support > 2G since only ByteBuf seems to be exposed.
          Before I dig into netty more, wanted to know if you or someone else from among spark developers knew how to add support for large buffers in our netty code. Thanks !

          Show
          mridulm80 Mridul Muralidharan added a comment - Matei Zaharia We are having some issue porting the netty shuffle copier code to support > 2G since only ByteBuf seems to be exposed. Before I dig into netty more, wanted to know if you or someone else from among spark developers knew how to add support for large buffers in our netty code. Thanks !
          Hide
          mridulm80 Mridul Muralidharan added a comment -

          Proposal detailing the work we have done on this effort.
          Looking forward to feedback before a PR is submitted based on this..

          Show
          mridulm80 Mridul Muralidharan added a comment - Proposal detailing the work we have done on this effort. Looking forward to feedback before a PR is submitted based on this..
          Hide
          rxin Reynold Xin added a comment -

          Mridul Muralidharan can you post an update on this?

          I think it is a great idea to provide a buffer abstraction that can be backed by various buffer implementations (nio.ByteBuffer, Netty ByteBuf, on disk file region). I would like to make this happen for 1.2.

          Show
          rxin Reynold Xin added a comment - Mridul Muralidharan can you post an update on this? I think it is a great idea to provide a buffer abstraction that can be backed by various buffer implementations (nio.ByteBuffer, Netty ByteBuf, on disk file region). I would like to make this happen for 1.2.
          Hide
          mridulm80 Mridul Muralidharan added a comment -

          Based on discussions we had with others, apparently 1.1 was not a good vehicle for this proposal.
          Further, since there was no interest in this jira/comments on the proposal, we put the effort on the backburner.

          We plan to push atleast some of the bugs fixed as part of this effort - consolidated shuffle did get resolved in 1.1 and probably a few more might be contributed back in 1.2 time permitting (disk backed map output tracking for example looks like a good candidate).
          But bulk of the change is pervasive and at times a bit invasive and at odds with some of the other changes (for example, zero-copy); shepherding it might be a bit time consuming for me given other deliverables.

          If there is renewed interest in this to get it integrated into a spark release, I can try to push for it to be resurrected and submitted.

          Show
          mridulm80 Mridul Muralidharan added a comment - Based on discussions we had with others, apparently 1.1 was not a good vehicle for this proposal. Further, since there was no interest in this jira/comments on the proposal, we put the effort on the backburner. We plan to push atleast some of the bugs fixed as part of this effort - consolidated shuffle did get resolved in 1.1 and probably a few more might be contributed back in 1.2 time permitting (disk backed map output tracking for example looks like a good candidate). But bulk of the change is pervasive and at times a bit invasive and at odds with some of the other changes (for example, zero-copy); shepherding it might be a bit time consuming for me given other deliverables. If there is renewed interest in this to get it integrated into a spark release, I can try to push for it to be resurrected and submitted.
          Hide
          rxin Reynold Xin added a comment - - edited

          Let's work together to get something for 1.2 or 1.3. At the very least, I would like to have a buffer abstraction layer that can support this in the future.

          Show
          rxin Reynold Xin added a comment - - edited Let's work together to get something for 1.2 or 1.3. At the very least, I would like to have a buffer abstraction layer that can support this in the future.
          Hide
          mridulm80 Mridul Muralidharan added a comment -

          WIP version pushed to https://github.com/mridulm/spark/tree/2g_fix - about 2 weeks before feature freeze in 1.1 iirc.

          Note that the 2g fixes are functionally complete, but this branch also includes a large number of other fixes.
          Some of these have been pushed to master; while others have not yet done : for alleviating memory pressure primarily, and fixing resource leaks.

          This branch has been shared for reference purpose - and is not meant to be actively worked on for merging into master.
          We will need to cherry pick the changes and do that manually.

          Show
          mridulm80 Mridul Muralidharan added a comment - WIP version pushed to https://github.com/mridulm/spark/tree/2g_fix - about 2 weeks before feature freeze in 1.1 iirc. Note that the 2g fixes are functionally complete, but this branch also includes a large number of other fixes. Some of these have been pushed to master; while others have not yet done : for alleviating memory pressure primarily, and fixing resource leaks. This branch has been shared for reference purpose - and is not meant to be actively worked on for merging into master. We will need to cherry pick the changes and do that manually.
          Hide
          irashid Imran Rashid added a comment -

          Based on discussion on the dev list, Mridul Muralidharan isn't actively working on this. I'd like to start on it, with the following very minimal goals:

          1. Make it possible for blocks to be bigger than 2GB
          2. Maintain performance on smaller blocks

          ie., I'm not going to try to do anything fancy to optimize performance of the large blocks. To that end, my plan is to

          1. create a LargeByteBuffer interface, which just has the same methods we use on ByteBuffer
          2. have one implementation that just wraps one ByteBuffer, and another which wraps a completely static set of {{ByteBuffer}}s (eg., if you map a 3 GB file, it'll just immediately map it to 2 {{ByteBuffer}}s, nothing fancy with only mapping the first half of the file until the second is needed etc. etc.)
          3. change ByteBuffer to LargeByteBuffer in ShuffleBlockManager and BlockStore

          I see that about a year back there was a lot of discussion on this, and some alternate proposals. I'd like to push forward with a POC to try to move the discussion along again. I know there was some discussion about how important this is, and whether or not we want to support it. IMO this is a big limitation and results in a lot of frustration for the users, we really need a solution for this.

          Show
          irashid Imran Rashid added a comment - Based on discussion on the dev list, Mridul Muralidharan isn't actively working on this. I'd like to start on it, with the following very minimal goals: 1. Make it possible for blocks to be bigger than 2GB 2. Maintain performance on smaller blocks ie., I'm not going to try to do anything fancy to optimize performance of the large blocks. To that end, my plan is to 1. create a LargeByteBuffer interface, which just has the same methods we use on ByteBuffer 2. have one implementation that just wraps one ByteBuffer , and another which wraps a completely static set of {{ByteBuffer}}s (eg., if you map a 3 GB file, it'll just immediately map it to 2 {{ByteBuffer}}s, nothing fancy with only mapping the first half of the file until the second is needed etc. etc.) 3. change ByteBuffer to LargeByteBuffer in ShuffleBlockManager and BlockStore I see that about a year back there was a lot of discussion on this, and some alternate proposals. I'd like to push forward with a POC to try to move the discussion along again. I know there was some discussion about how important this is, and whether or not we want to support it. IMO this is a big limitation and results in a lot of frustration for the users, we really need a solution for this.
          Hide
          vanzin Marcelo Vanzin added a comment -

          Hi Imran Rashid,

          Approach sounds good. It would be nice to measure whether the optimization for smaller blocks actually makes a difference; from what I can tell, supporting multiple ByteBuffer instances just means having an array and picking the right ByteBuffer based on an offset, both of which should be pretty cheap.

          Show
          vanzin Marcelo Vanzin added a comment - Hi Imran Rashid , Approach sounds good. It would be nice to measure whether the optimization for smaller blocks actually makes a difference; from what I can tell, supporting multiple ByteBuffer instances just means having an array and picking the right ByteBuffer based on an offset, both of which should be pretty cheap.
          Hide
          irashid Imran Rashid added a comment -

          I spent a little time with Sandy Ryza on this today, and I realized that the shuffle limit and the cache limit are actually quite distinct. (Sorry if this was already obvious to everyone else.) I've made another issue SPARK-5928 to deal w/ the shuffle issue. Then I say we make SPARK-1391 focus more on the cache limit (and broadcast limit etc.). I'm going to make this issue require both of those.

          I'm going to pursue a solution to only SPARK-1391 (basically what I outlined above), I'll move further discussion of the particular of what I'm doing over there.

          Show
          irashid Imran Rashid added a comment - I spent a little time with Sandy Ryza on this today, and I realized that the shuffle limit and the cache limit are actually quite distinct. (Sorry if this was already obvious to everyone else.) I've made another issue SPARK-5928 to deal w/ the shuffle issue. Then I say we make SPARK-1391 focus more on the cache limit (and broadcast limit etc.). I'm going to make this issue require both of those. I'm going to pursue a solution to only SPARK-1391 (basically what I outlined above), I'll move further discussion of the particular of what I'm doing over there.
          Hide
          tgraves Thomas Graves added a comment -

          we have a lot of jira about the 2G limit. I'm going to dup this to the umbrella jira https://issues.apache.org/jira/browse/SPARK-6235

          If someone things something is missing from that, lets add another item there.

          Show
          tgraves Thomas Graves added a comment - we have a lot of jira about the 2G limit. I'm going to dup this to the umbrella jira https://issues.apache.org/jira/browse/SPARK-6235 If someone things something is missing from that, lets add another item there.

            People

            • Assignee:
              Unassigned
              Reporter:
              mridulm80 Mridul Muralidharan
            • Votes:
              16 Vote for this issue
              Watchers:
              56 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development