Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-1391

BlockManager cannot transfer blocks larger than 2G in size

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 1.0.0
    • Fix Version/s: None
    • Component/s: Block Manager, Shuffle
    • Labels:
      None

      Description

      If a task tries to remotely access a cached RDD block, I get an exception when the block size is > 2G. The exception is pasted below.

      Memory capacities are huge these days (> 60G), and many workflows depend on having large blocks in memory, so it would be good to fix this bug.

      I don't know if the same thing happens on shuffles if one transfer (from mapper to reducer) is > 2G.

      14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer message
      java.lang.ArrayIndexOutOfBoundsException
              at it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96)
              at it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134)
              at it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164)
              at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
              at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
              at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
              at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
              at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38)
              at org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93)
              at org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26)
              at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913)
              at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922)
              at org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102)
              at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348)
              at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323)
              at org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90)
              at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69)
              at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
              at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
              at scala.collection.Iterator$class.foreach(Iterator.scala:727)
              at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
              at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
              at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
              at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
              at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
              at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44)
              at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
              at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
              at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:661)
              at org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:503)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              at java.lang.Thread.run(Thread.java:744)
      
      1. BlockLimitDesign.pdf
        75 kB
        Imran Rashid
      2. SPARK-1391.diff
        4 kB
        Min Zhou

        Issue Links

          Activity

          Hide
          coderplay Min Zhou added a comment - - edited

          Hi Shivaram

          Is that the entire exception stack? Seems missed some information on that.

          Which revision did you run on?

          Show
          coderplay Min Zhou added a comment - - edited Hi Shivaram Is that the entire exception stack? Seems missed some information on that. Which revision did you run on?
          Hide
          coderplay Min Zhou added a comment -

          It's supposed to have some detailed message after "java.lang.ArrayIndexOutOfBoundsException"

          like

          java.lang.ArrayIndexOutOfBoundsException: -2147483648
          Show
          coderplay Min Zhou added a comment - It's supposed to have some detailed message after "java.lang.ArrayIndexOutOfBoundsException" like java.lang.ArrayIndexOutOfBoundsException: -2147483648
          Hide
          srowen Sean Owen added a comment -

          It is possible that it constructs AIOOBE without the line number. I may be stating the obvious but it is all but surely negative and due to int overflow. 2GB strongly suggests it anyway. Would be good to confirm.

          Show
          srowen Sean Owen added a comment - It is possible that it constructs AIOOBE without the line number. I may be stating the obvious but it is all but surely negative and due to int overflow. 2GB strongly suggests it anyway. Would be good to confirm.
          Hide
          coderplay Min Zhou added a comment - - edited

          Sean Owen

          Yes, there is possibility. It's not a line number, it's instead should have an index or Otherewise it should be a
          user defined exception or native exception. I greped the fastutil source code, it won't throw AIOOBEs with empty message.

          And the line number is not corresponding to the v6.4.4

          see line 96 of FastByteArrayOutputStream:
          http://grepcode.com/file/repo1.maven.org/maven2/it.unimi.dsi/fastutil/6.4.4/it/unimi/dsi/fastutil/io/FastByteArrayOutputStream.java

          Here is the possibility where throws AIOOBEs. The position can be negative due to line

          public void write( final byte[] b, final int off, final int len ) throws IOException {
          ...
              if ( position + len > length ) length = position += len;
          ...
          }
          

          Here is the simulation with fastutils under the version of 6.4.4

          import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
          
          public class ArrayOutofIndex {
          
            public static void main(String[] args) throws Exception {
              FastByteArrayOutputStream outputStream = new FastByteArrayOutputStream(4096);
              outputStream.position(-1);
              outputStream.write('a');
              outputStream.close();
            }
          }
          
          Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
          	at it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:92)
          	at mzhou.shuffle.perf.ArrayOutofIndex.main(ArrayOutofIndex.java:29)
          
          import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
          
          public class ArrayOutofIndex {
            public static void main(String[] args) throws Exception {
              FastByteArrayOutputStream outputStream = new FastByteArrayOutputStream(4096);
              outputStream.position(-1);
              outputStream.write(new byte[1024], 0, 1024);
              outputStream.close();
            }
          }
          
          Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException
          	at java.lang.System.arraycopy(Native Method)
          	at it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:98)
          	at mzhou.shuffle.perf.ArrayOutofIndex.main(ArrayOutofIndex.java:29)
          

          The line number and stack info is not the same as that be reported.

          Show
          coderplay Min Zhou added a comment - - edited Sean Owen Yes, there is possibility. It's not a line number, it's instead should have an index or Otherewise it should be a user defined exception or native exception. I greped the fastutil source code, it won't throw AIOOBEs with empty message. And the line number is not corresponding to the v6.4.4 see line 96 of FastByteArrayOutputStream: http://grepcode.com/file/repo1.maven.org/maven2/it.unimi.dsi/fastutil/6.4.4/it/unimi/dsi/fastutil/io/FastByteArrayOutputStream.java Here is the possibility where throws AIOOBEs. The position can be negative due to line public void write( final byte[] b, final int off, final int len ) throws IOException { ... if ( position + len > length ) length = position += len; ... } Here is the simulation with fastutils under the version of 6.4.4 import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; public class ArrayOutofIndex { public static void main(String[] args) throws Exception { FastByteArrayOutputStream outputStream = new FastByteArrayOutputStream(4096); outputStream.position(-1); outputStream.write('a'); outputStream.close(); } } Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1 at it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:92) at mzhou.shuffle.perf.ArrayOutofIndex.main(ArrayOutofIndex.java:29) import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; public class ArrayOutofIndex { public static void main(String[] args) throws Exception { FastByteArrayOutputStream outputStream = new FastByteArrayOutputStream(4096); outputStream.position(-1); outputStream.write(new byte[1024], 0, 1024); outputStream.close(); } } Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException at java.lang.System.arraycopy(Native Method) at it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:98) at mzhou.shuffle.perf.ArrayOutofIndex.main(ArrayOutofIndex.java:29) The line number and stack info is not the same as that be reported.
          Hide
          srowen Sean Owen added a comment -

          Oops yes I mean offset of course. Good investigation there. I am also not sure why the index would not show.

          Show
          srowen Sean Owen added a comment - Oops yes I mean offset of course. Good investigation there. I am also not sure why the index would not show.
          Hide
          coderplay Min Zhou added a comment -

          Sean Owen Yes, neither of the two simulation can exactly match the stack info Shivaram gave us. So I suspected that if something missed there.

          Show
          coderplay Min Zhou added a comment - Sean Owen Yes, neither of the two simulation can exactly match the stack info Shivaram gave us. So I suspected that if something missed there.
          Hide
          coderplay Min Zhou added a comment -

          From the line number, the fastutils version should be 6.5.7.
          The error should be thrown by this line in FastByteArrayOutputStream.java

          System.arraycopy( b, off, array, position, len );
          
          Show
          coderplay Min Zhou added a comment - From the line number, the fastutils version should be 6.5.7. The error should be thrown by this line in FastByteArrayOutputStream.java System.arraycopy( b, off, array, position, len );
          Hide
          coderplay Min Zhou added a comment - - edited

          Finally, I can explain the whole thing. Apologies to Shivaram, you didn't miss anything.

          From the above, we can see that the position can be a negative value.
          Run below code under fastutils 6.5.7 we will get

          import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
          public class ArrayIndex {
            public static void main(String[] args) throws Exception {
              FastByteArrayOutputStream outputStream = new FastByteArrayOutputStream(4096);
              outputStream.position(Integer.MAX_VALUE);
              outputStream.write(new byte[1024], 0, 1024);
              outputStream.close();
            }
          }
          
          Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException
          	at java.lang.System.arraycopy(Native Method)
          	at it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96)
          	at mzhou.shuffle.perf.ArrayOutofIndex.main(ArrayOutofIndex.java:29)
          

          We can see the line number FastByteArrayOutputStream.java:96 is correct, the same as Shivaram's. The only different is that the stack frame "at java.lang.System.arraycopy(Native Method)" does exists in Shivaram's report.

          After some investigation on jdk source code, I get the answer, System.arraycopy got Just-in-Time compiled. Here is a simulation

          public class ArrayCopy {
          
            public static void main(String[] args) throws Exception {
              byte[] src = new byte[8];
              byte[] dst = new byte[8];
              for(int i = 0; i < 100000; i++) {
                System.arraycopy(src,0,dst,0, dst.length);
              }
              System.arraycopy(src,0,dst,-1, dst.length);
            }
          }
          
          Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException
          	at mzhou.shuffle.perf.ArrayOutofIndex.main(ArrayOutofIndex.java:36)
          

          See? That stack trace has gone.

          Show
          coderplay Min Zhou added a comment - - edited Finally, I can explain the whole thing. Apologies to Shivaram, you didn't miss anything. From the above, we can see that the position can be a negative value. Run below code under fastutils 6.5.7 we will get import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; public class ArrayIndex { public static void main(String[] args) throws Exception { FastByteArrayOutputStream outputStream = new FastByteArrayOutputStream(4096); outputStream.position(Integer.MAX_VALUE); outputStream.write(new byte[1024], 0, 1024); outputStream.close(); } } Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException at java.lang.System.arraycopy(Native Method) at it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) at mzhou.shuffle.perf.ArrayOutofIndex.main(ArrayOutofIndex.java:29) We can see the line number FastByteArrayOutputStream.java:96 is correct, the same as Shivaram's. The only different is that the stack frame "at java.lang.System.arraycopy(Native Method)" does exists in Shivaram's report. After some investigation on jdk source code, I get the answer, System.arraycopy got Just-in-Time compiled. Here is a simulation public class ArrayCopy { public static void main(String[] args) throws Exception { byte[] src = new byte[8]; byte[] dst = new byte[8]; for(int i = 0; i < 100000; i++) { System.arraycopy(src,0,dst,0, dst.length); } System.arraycopy(src,0,dst,-1, dst.length); } } Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException at mzhou.shuffle.perf.ArrayOutofIndex.main(ArrayOutofIndex.java:36) See? That stack trace has gone.
          Hide
          coderplay Min Zhou added a comment -

          The JIT intrinsic will replace the original JNI implemetation of System.arraycopy. Hotspot jvm can't get the symbol of this native method, this is the reason why that stack strace missed.

          Show
          coderplay Min Zhou added a comment - The JIT intrinsic will replace the original JNI implemetation of System.arraycopy. Hotspot jvm can't get the symbol of this native method, this is the reason why that stack strace missed.
          Hide
          shivaram Shivaram Venkataraman added a comment -

          I just copied the stack trace from the stderr of the executor, but this was towards the end of a long job, so JIT could definitely be an explanation.

          In case this motivates somebody to pick up this issue , let me add some context. This is really painful in situations where you have a long chain of transformations cached in memory. Any non-local task launched after that will try to recompute the entire transformation chain as block transfers fail.

          In terms of solutions, is fastutil the problem here ? Sean, will https://github.com/apache/spark/pull/266 address this by any chance ?

          Show
          shivaram Shivaram Venkataraman added a comment - I just copied the stack trace from the stderr of the executor, but this was towards the end of a long job, so JIT could definitely be an explanation. In case this motivates somebody to pick up this issue , let me add some context. This is really painful in situations where you have a long chain of transformations cached in memory. Any non-local task launched after that will try to recompute the entire transformation chain as block transfers fail. In terms of solutions, is fastutil the problem here ? Sean, will https://github.com/apache/spark/pull/266 address this by any chance ?
          Hide
          srowen Sean Owen added a comment -

          No fastutil is nothing to do with it, if in fact the problem is int overflow in the offset. It's basically a limit of how big an array can be in Java.

          Show
          srowen Sean Owen added a comment - No fastutil is nothing to do with it, if in fact the problem is int overflow in the offset. It's basically a limit of how big an array can be in Java.
          Hide
          coderplay Min Zhou added a comment -

          Shivaram Venkataraman

          Sean Owen, if you don't mind, I will take this issue. Actually, I was invited by Reynold to solve this problem. IMHO, fastutils at least can't cover the field "position" be overflow. Maybe my solution is like yours, to replace fastutils with other libs, then we will have some collaboration. May I?

          Show
          coderplay Min Zhou added a comment - Shivaram Venkataraman Sean Owen , if you don't mind, I will take this issue. Actually, I was invited by Reynold to solve this problem. IMHO, fastutils at least can't cover the field "position" be overflow. Maybe my solution is like yours, to replace fastutils with other libs, then we will have some collaboration. May I?
          Hide
          srowen Sean Owen added a comment -

          Of course! Hardly my issue. Well, you could try my patch that replaces fastutil with alternatives. I doubt the standard ByteArrayOutputStream does differently though?

          But we are always going to have a problem in that a Java byte array can only be so big because of the size of an int, regardless of stream position issues. This one could be deeper.

          Show
          srowen Sean Owen added a comment - Of course! Hardly my issue. Well, you could try my patch that replaces fastutil with alternatives. I doubt the standard ByteArrayOutputStream does differently though? But we are always going to have a problem in that a Java byte array can only be so big because of the size of an int, regardless of stream position issues. This one could be deeper.
          Hide
          shivaram Shivaram Venkataraman added a comment -

          Min Zhou That sounds great to me – I don't seem to have permissions to assign issues though.

          Show
          shivaram Shivaram Venkataraman added a comment - Min Zhou That sounds great to me – I don't seem to have permissions to assign issues though.
          Hide
          shivaram Shivaram Venkataraman added a comment -

          Yeah I think the right solution is to not create one large array, but somehow either stream the data or break it up into smaller chunks

          Show
          shivaram Shivaram Venkataraman added a comment - Yeah I think the right solution is to not create one large array, but somehow either stream the data or break it up into smaller chunks
          Hide
          coderplay Min Zhou added a comment -

          Thank you guys.

          Shivaram Venkataraman Which fastutils version did you use? Is it 6.5.7 as I analyzed ?

          I will submit a quick and dirty patch these days, not expect to commit it. Just prove my analysis above in your real world cuz I have no spark deployment here to test. Shivaram, would you like to test for me? Thanks in advance.

          Show
          coderplay Min Zhou added a comment - Thank you guys. Shivaram Venkataraman Which fastutils version did you use? Is it 6.5.7 as I analyzed ? I will submit a quick and dirty patch these days, not expect to commit it. Just prove my analysis above in your real world cuz I have no spark deployment here to test. Shivaram, would you like to test for me? Thanks in advance.
          Hide
          shivaram Shivaram Venkataraman added a comment -

          I am not using any fastutil version explicitly. I am just using Spark's master branch from around March 23rd. (The exact commit I am synced to is https://github.com/apache/spark/commit/8265dc7739caccc59bc2456b2df055ca96337fe4)

          Show
          shivaram Shivaram Venkataraman added a comment - I am not using any fastutil version explicitly. I am just using Spark's master branch from around March 23rd. (The exact commit I am synced to is https://github.com/apache/spark/commit/8265dc7739caccc59bc2456b2df055ca96337fe4 )
          Hide
          shivaram Shivaram Venkataraman added a comment -

          Oh and yes, I'd be happy to test out any patch / WIP

          Show
          shivaram Shivaram Venkataraman added a comment - Oh and yes, I'd be happy to test out any patch / WIP
          Hide
          coderplay Min Zhou added a comment -

          Shivaram Venkataraman Yeah, I meant the fastutils involved by spark. From your revision, it's weird that the version is 6.4.4...

          Show
          coderplay Min Zhou added a comment - Shivaram Venkataraman Yeah, I meant the fastutils involved by spark. From your revision, it's weird that the version is 6.4.4...
          Hide
          rxin Reynold Xin added a comment -

          I took a quick look into this. We are using a bunch of ByteBuffer's throughout the block manager and the communication layer. We need to replace that ByteBuffer with a different interface that can handle larger arrays.

          It is fortunate that the underlying communication in Connection.scala actually breaks messages down into smaller trunks, so that's one less place to change.

          Show
          rxin Reynold Xin added a comment - I took a quick look into this. We are using a bunch of ByteBuffer's throughout the block manager and the communication layer. We need to replace that ByteBuffer with a different interface that can handle larger arrays. It is fortunate that the underlying communication in Connection.scala actually breaks messages down into smaller trunks, so that's one less place to change.
          Hide
          coderplay Min Zhou added a comment -

          Yes. Communication layer use ByteBuffer array to transfer messages, but the receiver will convert them back to BlockMessages where each block corresponding to one ByteBuffer, which can't be larger than 2GB. Those BlockMessages will be consumed by the connection caller in everywhere we can't control.

          One approach is write an CompositeByteBuffer to overcome the 2GB limitation, but still can't break some other limitation of ByteBuffer interface, like ByteBuffer.position(), ByteBuffer.capacity(), ByteBuffer.remaining(), whose return values are still integers.

          Show
          coderplay Min Zhou added a comment - Yes. Communication layer use ByteBuffer array to transfer messages, but the receiver will convert them back to BlockMessages where each block corresponding to one ByteBuffer, which can't be larger than 2GB. Those BlockMessages will be consumed by the connection caller in everywhere we can't control. One approach is write an CompositeByteBuffer to overcome the 2GB limitation, but still can't break some other limitation of ByteBuffer interface, like ByteBuffer.position(), ByteBuffer.capacity(), ByteBuffer.remaining(), whose return values are still integers.
          Hide
          coderplay Min Zhou added a comment - - edited

          Shivaram Venkataraman

          It should take a long time if we fundamentally solve the problem, we need a ByteBuffer and an OutputStream that support more than 2GB data. Or change the data structure inside a block, for example , Array[ByteBuffer] to replace ByteBuffer.

          A short term approach is that take kryo serialization as the default ser instead of java ser which causes data inflation. I am attaching a patch following this approach. Due to I didn't test it in a real cluster, I am not sending a pull request currently.

          Shivaram, please apply this patch and test it for me, thanks!

          Show
          coderplay Min Zhou added a comment - - edited Shivaram Venkataraman It should take a long time if we fundamentally solve the problem, we need a ByteBuffer and an OutputStream that support more than 2GB data. Or change the data structure inside a block, for example , Array [ByteBuffer] to replace ByteBuffer. A short term approach is that take kryo serialization as the default ser instead of java ser which causes data inflation. I am attaching a patch following this approach. Due to I didn't test it in a real cluster, I am not sending a pull request currently. Shivaram, please apply this patch and test it for me, thanks!
          Hide
          shivaram Shivaram Venkataraman added a comment -

          Thanks for the patch. I will try this out in the next couple of days and get back.

          Show
          shivaram Shivaram Venkataraman added a comment - Thanks for the patch. I will try this out in the next couple of days and get back.
          Hide
          coderplay Min Zhou added a comment -

          Any update on your test , Shivaram Venkataraman ?

          Show
          coderplay Min Zhou added a comment - Any update on your test , Shivaram Venkataraman ?
          Hide
          shivaram Shivaram Venkataraman added a comment -

          Sorry didn't get a chance to try this yet. Will try to do it tomorrow

          Show
          shivaram Shivaram Venkataraman added a comment - Sorry didn't get a chance to try this yet. Will try to do it tomorrow
          Hide
          shivaram Shivaram Venkataraman added a comment -

          I tried to run with this patch yesterday, but unfortunately I dont think the non-local jobs were triggered during my run. I will try to synthetically force non-local tasks the next time around to verify this.

          Show
          shivaram Shivaram Venkataraman added a comment - I tried to run with this patch yesterday, but unfortunately I dont think the non-local jobs were triggered during my run. I will try to synthetically force non-local tasks the next time around to verify this.
          Hide
          mridulm80 Mridul Muralidharan added a comment -

          Another place where this is relevant is here :

          java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
          at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789)
          at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98)
          at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:413)
          at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:339)
          at org.apache.spark.storage.BlockManager.get(BlockManager.scala:506)
          at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:39)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:233)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
          at org.apache.spark.scheduler.Task.run(Task.scala:52)
          at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
          at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
          at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
          at java.security.AccessController.doPrivileged(Native Method)
          at javax.security.auth.Subject.doAs(Subject.java:415)
          at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1262)
          at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
          at java.lang.Thread.run(Thread.java:722)

          So we might want to change the abstraction from single ByteBuffer to a sequence of bytebuffers ...

          Show
          mridulm80 Mridul Muralidharan added a comment - Another place where this is relevant is here : java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:413) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:339) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:506) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:39) at org.apache.spark.rdd.RDD.iterator(RDD.scala:233) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:52) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1262) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) So we might want to change the abstraction from single ByteBuffer to a sequence of bytebuffers ...
          Hide
          soulmachine Frank Dai added a comment -

          Min Zhou Have you solved this issue yet ? Or do you have any temporary solution to this problem?

          Show
          soulmachine Frank Dai added a comment - Min Zhou Have you solved this issue yet ? Or do you have any temporary solution to this problem?
          Hide
          gtinjr Gilberto Tin added a comment - - edited

          I am having the same issue spark 1.1.0. 6 node cluster testing small file with 1.3GB size before moving to bigger cluster bigger files. It fails on flatmap operation.

          14/10/06 14:31:25 ERROR storage.BlockManagerWorker: Exception handling buffer message
          java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
          at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
          at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:104)
          at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:379)
          at org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:100)
          at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:79)
          at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
          at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
          at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
          at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
          at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
          at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
          at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
          at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
          at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
          at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
          at java.lang.Thread.run(Thread.java:745)

          Show
          gtinjr Gilberto Tin added a comment - - edited I am having the same issue spark 1.1.0. 6 node cluster testing small file with 1.3GB size before moving to bigger cluster bigger files. It fails on flatmap operation. 14/10/06 14:31:25 ERROR storage.BlockManagerWorker: Exception handling buffer message java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:104) at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:379) at org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:100) at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:79) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38) at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682) at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
          Hide
          irashid Imran Rashid added a comment -

          Min Zhou, I assume you are no longer looking at this, right? I'm going to take a crack at this issue if you don't mind. Here is my plan, copied from SPARK-1476 (now that I've untangled those issues a little bit):

          I'd like to start on it, with the following very minimal goals:

          1. Make it possible for blocks to be bigger than 2GB
          2. Maintain performance on smaller blocks

          ie., I'm not going to try to do anything fancy to optimize performance of the large blocks. To that end, my plan is to

          1. create a LargeByteBuffer interface, which just has the same methods we use on ByteBuffer
          2. have one implementation that just wraps one ByteBuffer, and another which wraps a completely static set of ByteBuffer s (eg., if you map a 3 GB file, it'll just immediately map it to 2 ByteBuffer s, nothing fancy with only mapping the first half of the file until the second is needed etc. etc.)
          3. change ByteBuffer to LargeByteBuffer in BlockStore

          I see that about a year back there was a lot of discussion on this in SPARK-1476, and some alternate proposals. I'd like to push forward with a POC to try to move the discussion along again. I know there was some discussion about how important this is, and whether or not we want to support it. IMO this is a big limitation and results in a lot of frustration for the users, we really need a solution for this.

          I could still be missing something, but I believe this should also solve SPARK-3151

          Show
          irashid Imran Rashid added a comment - Min Zhou , I assume you are no longer looking at this, right? I'm going to take a crack at this issue if you don't mind. Here is my plan, copied from SPARK-1476 (now that I've untangled those issues a little bit): I'd like to start on it, with the following very minimal goals: 1. Make it possible for blocks to be bigger than 2GB 2. Maintain performance on smaller blocks ie., I'm not going to try to do anything fancy to optimize performance of the large blocks. To that end, my plan is to 1. create a LargeByteBuffer interface, which just has the same methods we use on ByteBuffer 2. have one implementation that just wraps one ByteBuffer, and another which wraps a completely static set of ByteBuffer s (eg., if you map a 3 GB file, it'll just immediately map it to 2 ByteBuffer s, nothing fancy with only mapping the first half of the file until the second is needed etc. etc.) 3. change ByteBuffer to LargeByteBuffer in BlockStore I see that about a year back there was a lot of discussion on this in SPARK-1476 , and some alternate proposals. I'd like to push forward with a POC to try to move the discussion along again. I know there was some discussion about how important this is, and whether or not we want to support it. IMO this is a big limitation and results in a lot of frustration for the users, we really need a solution for this. I could still be missing something, but I believe this should also solve SPARK-3151
          Hide
          irashid Imran Rashid added a comment -

          Here is a minimal program to demonstrate the problem:

          sc.parallelize(1 to 1e6.toInt, 1).map{i => new Array[Byte](2.2e3.toInt)}.persist(StorageLevel.DISK_ONLY).count()
          

          this only demonstrates the problem w/ DiskStore but a solution to this should apply to other cases if done correctly. (probably need to come up with more test cases)

          Show
          irashid Imran Rashid added a comment - Here is a minimal program to demonstrate the problem: sc.parallelize(1 to 1e6.toInt, 1).map{i => new Array[ Byte ](2.2e3.toInt)}.persist(StorageLevel.DISK_ONLY).count() this only demonstrates the problem w/ DiskStore but a solution to this should apply to other cases if done correctly. (probably need to come up with more test cases)
          Hide
          irashid Imran Rashid added a comment - - edited

          The one complication here comes from the network transfer required by replication. If we ignore NioBlockTransferService for now and just look at NettyBlockTransferService, the existing behavior is:

          1. replication results in a request to NettyBlockTransferService#uploadBlocks , which sends an UploadBlock msg to a peer. The UploadBlock message contains the full payload, which is limited to 2GB currently.

          2. The message is received by NettyBlockRpcServer where it is simply deserialized and inserted into the local block manager.

          I'm thinking we could break a block apart into multiple messages, eg. UploadPartialBlock, with each message limited to 2GB (or even less). Then NettyBlockRpcServer would queue up all the messages, and once it had received them all it would put them together and insert the block locally.

          My concern with that approach is robustness – what happens if some of the UploadPartialBlock s never make it, for whatever reason? We wouldn't want NettyBlockRpcServer to simply hold on to those partial msgs in memory indefinitely. Does it make sense to introduce a timeout? When the first UploadPartialBlock is received, it would only wait for the rest of the msgs a limited time before dumping those partial blocks.

          Show
          irashid Imran Rashid added a comment - - edited The one complication here comes from the network transfer required by replication. If we ignore NioBlockTransferService for now and just look at NettyBlockTransferService , the existing behavior is: 1. replication results in a request to NettyBlockTransferService#uploadBlocks , which sends an UploadBlock msg to a peer. The UploadBlock message contains the full payload, which is limited to 2GB currently. 2. The message is received by NettyBlockRpcServer where it is simply deserialized and inserted into the local block manager. I'm thinking we could break a block apart into multiple messages, eg. UploadPartialBlock , with each message limited to 2GB (or even less). Then NettyBlockRpcServer would queue up all the messages, and once it had received them all it would put them together and insert the block locally. My concern with that approach is robustness – what happens if some of the UploadPartialBlock s never make it, for whatever reason? We wouldn't want NettyBlockRpcServer to simply hold on to those partial msgs in memory indefinitely. Does it make sense to introduce a timeout? When the first UploadPartialBlock is received, it would only wait for the rest of the msgs a limited time before dumping those partial blocks.
          Hide
          irashid Imran Rashid added a comment -

          design doc

          Show
          irashid Imran Rashid added a comment - design doc
          Hide
          apachespark Apache Spark added a comment -

          User 'squito' has created a pull request for this issue:
          https://github.com/apache/spark/pull/4857

          Show
          apachespark Apache Spark added a comment - User 'squito' has created a pull request for this issue: https://github.com/apache/spark/pull/4857
          Hide
          rxin Reynold Xin added a comment - - edited

          Imran Rashid if you want to attempt something this large and core to the whole engine, it would be better to do this incrementally, especially when this is a part of the code that you are less familiar with.

          I'd suggest breaking this task down in the following way, and getting feedback incrementally as well.

          1. LargeByteBuffer interface. (This alone would deserve its own design doc and focus on how it integrates with ManagedBuffer)
          2. Block storage
          3. Block fetching
          4. Block upload

          Uploading blocks > 2G is extremely rare, as it is rarely used outside of streaming, and streaming data blocks are usually small. It would also be much simpler to deal with upload if we change it to sending a msg to the other end and let the other end download the blocks instead.

          Show
          rxin Reynold Xin added a comment - - edited Imran Rashid if you want to attempt something this large and core to the whole engine, it would be better to do this incrementally, especially when this is a part of the code that you are less familiar with. I'd suggest breaking this task down in the following way, and getting feedback incrementally as well. 1. LargeByteBuffer interface. (This alone would deserve its own design doc and focus on how it integrates with ManagedBuffer) 2. Block storage 3. Block fetching 4. Block upload Uploading blocks > 2G is extremely rare, as it is rarely used outside of streaming, and streaming data blocks are usually small. It would also be much simpler to deal with upload if we change it to sending a msg to the other end and let the other end download the blocks instead.
          Hide
          irashid Imran Rashid added a comment -

          Reynold Xin Sure thing, I can break it into multiple pieces. though honestly, if we don't think we'll bother fixing some of these 2gb limits, step 0 should be putting in sane error messages on all the different cases where you can run into the 2gb limit. Right now the errors are extremely confusing.

          If we want to support only some very limited set of functionality, then we might not even need to have LargeByteBuffer interact at all w/ ManagedBuffer – if we only want to support cached partitions, with no replication and no remote fetches, then I'm pretty sure the relevant paths in BlockManager never involve a ManagedBuffer.

          I'm not sure I understand what you mean that "Uploading blocks > 2G is extremely rare". Are you saying that nobody would want to cache a partition > 2gb? That nobody uses replication when caching? Or that we don't need to support the combination? Also if you have a broadcast variable over 2gb, TorrentBroadcast will store it in all one block on the driver. It breaks it into smaller blocks when sending it executors, but not on the driver – perhaps that could be changed to always break into smaller blocks on the driver as well.

          Show
          irashid Imran Rashid added a comment - Reynold Xin Sure thing, I can break it into multiple pieces. though honestly, if we don't think we'll bother fixing some of these 2gb limits, step 0 should be putting in sane error messages on all the different cases where you can run into the 2gb limit. Right now the errors are extremely confusing. If we want to support only some very limited set of functionality, then we might not even need to have LargeByteBuffer interact at all w/ ManagedBuffer – if we only want to support cached partitions, with no replication and no remote fetches, then I'm pretty sure the relevant paths in BlockManager never involve a ManagedBuffer. I'm not sure I understand what you mean that "Uploading blocks > 2G is extremely rare". Are you saying that nobody would want to cache a partition > 2gb? That nobody uses replication when caching? Or that we don't need to support the combination? Also if you have a broadcast variable over 2gb, TorrentBroadcast will store it in all one block on the driver. It breaks it into smaller blocks when sending it executors, but not on the driver – perhaps that could be changed to always break into smaller blocks on the driver as well.
          Hide
          rxin Reynold Xin added a comment -
          • I absolutely agree that better error messages is critical.
          • I'm saying it is rare that somebody wants to cache > 2gb partition and also use replication.
          • We can fix TorrentBroadcast without fixing block upload, because TorrentBroadcast splits data into chunks anyway.
          Show
          rxin Reynold Xin added a comment - I absolutely agree that better error messages is critical. I'm saying it is rare that somebody wants to cache > 2gb partition and also use replication. We can fix TorrentBroadcast without fixing block upload, because TorrentBroadcast splits data into chunks anyway.
          Hide
          tgraves Thomas Graves added a comment -

          duping this to SPARK-6235. If something is missing lets add it there.

          Show
          tgraves Thomas Graves added a comment - duping this to SPARK-6235 . If something is missing lets add it there.

            People

            • Assignee:
              Unassigned
              Reporter:
              shivaram Shivaram Venkataraman
            • Votes:
              6 Vote for this issue
              Watchers:
              22 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development