Hi Reynold Xin,
I've been adding scatterered notes across the various tickets which I think has led to a lot of the confusion – lemme try to summarize things here.
I completely agree about the importance of the various cases. Caching large blocks is by far the most important case. However, I think its worth exploring the other cases now for two reasons. (a) I still think they need to be solved eventually for a consistent user experience. Eg., if caching locally works, but reading from a remote cache doesn't, a user will be baffled when on run 1 of their job, everything works fine, but run 2, with the same data & same code, tasks get scheduled slightly different and require a remote fetch, and KABOOM! thats the kind of experience that makes the average user want to throw spark out the window. (This is actually what I thought you were pointing out in your comments on the earlier jira – that we can forget about uploading at this point, but need to make sure remote fetches work.) (b) We should make sure that whatever approach we take at least leaves the door open for solutions to all the problems. At least for myself, I wasn't sure if this approach would work for everything initially, but exploring the options makes me feel like its all possible. (which gets to your question about large blocks vs. multi-blocks.)
The proposal isn't exactly "read-only", it also supports writing via LargeByteBufferOutputStream. It turns out thats all we need. The BlockManager currently exposes ByteBuffers, but it actually doesn't need to. For example, currently local shuffle fetches only expose a FileInputStream over the data – thats why there isn't a 2GB limit on local shuffles. (it gets wrapped in a FileSegmentManagedBuffer and eventually read here: https://github.com/apache/spark/blob/55c4831d68c8326380086b5540244f984ea9ec27/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L300) It also makes sense that we only need stream access, since RDDs & broadcast vars are immutable – eg. we never say "treat bytes 37-40 as an int, and increment its value".
Fundamentally, blocks are always created via serialization – more specifically, Serializer#serializeStream. Obviously there isn't any limit when writing to a FileOutputStream, we just need a way to write to an in-memory output stream over 2GB. We can create an Array[Array[Byte]] already with ByteArrayChunkOutputStream https://github.com/apache/spark/blob/55c4831d68c8326380086b5540244f984ea9ec27/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala (currently used to create multiple blocks by TorrentBroadcast). We can use that to write out more than 2GB, eg. creating many chunks of max size 64K.
Similarly, we need a way to convert the various representations of large blocks back into InputStreams. File-based input streams have no problem (DiskStore only fails b/c the code currently tries to convert to a ByteBuffer, though conceptually this is unnecessary). For in-memory large blocks, represented as Array[Array[Byte]], again we can do the same as TorrentBroadcast. The final case is network transfer. This involves changing the netty frame decoder to handle frames that are > 2GB – then we just use the same input stream for the in-memory case. That was the last piece that I was prototyping, and was mentioning in my latest comments. I have an implementation available here: https://github.com/squito/spark/blob/5e83a55daa30a19840214f77681248e112635bf6/network/common/src/main/java/org/apache/spark/network/protocol/FixedChunkLargeFrameDecoder.java
Its a good question about whether we should allow large blocks, or instead we should have blocks be limited at 2GB and have another layer put multiple blocks together. I don't know if I have very clear objective arguments for one vs. the other, but I did consider both and felt like this version was much simpler to implement. Especially given the limited api that is actually needed (only stream access), the changes proposed here really aren't that big. It keeps the changes more nicely contained to the layers underneath BlockManager (with mostly cosmetic / naming changes required in outer layers since we'd no longer be returning ByteBuffers). Going down this road certainly doesn't prevent us from later deciding to have blocks be fragmented (then its just a question of naming: are "blocks" the smallest units that we work with in the internals, and there is some new logical unit which wraps blocks? or are "blocks" the logical unit that is exposed, and there is some new smaller unit which is used by the internals? the proposed solution pushes us towards the latter. To some extent it is already creating smaller units by making Array[Array[Byte]], though as proposed these aren't chunks you'd work with directly, and in fact the sending & receiving side can "chunk" in different ways if they like.)
Your questions about resource management are good, but IMO they are orthogonal to this change. To me, those questions apply just as well to a 1.99GB block as to a 2.01 GB block. We might consider making changes for those purposes whether or not we support blocks > 2GB. I was just discussing this w/ Marcelo, and he pointed out that (if my understanding of the network layers is correct) with the current implementation, if you fetch a 1.99 GB block (shuffle or non-shuffle), the entire block is downloaded before any processing is done, because the netty layer creates one large ByteBuf for the data before making it available. Doesn't make a lot of sense, given that we only need to expose it as an InputStream. The goal of these changes is not to optimize the performance of large blocks, its just to make them work (though of course we need reasonable efficiency). Even with my proposed changes, it probably still make sense for a spark app to be tuned such that blocks are significantly smaller than 2GB. But thats very different than having a hard failure at 2GB – especially in production systems which you expect to run regularly. I think optimizing performance for large blocks (eg. changing to a streaming interface on the receiving side of the network) would be a great addition to Spark, but should be tackled separately.
To me, the main remaining question is, out of the assorted 2GB limits that come up, at what granularity can we check in fixes? Eg., is it an acceptable user experience to have local caching of 2GB work, but remote fetches of the cache not? As you noted, there are 4 areas we could address:
(b) fetching non-shuffle blocks (mostly for reading cached data remotely)
(c) fetching shuffle blocks
(d) uploading (aka replicating) blocks
(a) was already solved by my previous PR (the code is not ready for merge, but I think conceptually it is a good solution, just needs polish.) (b) is what my latest prototyping for sending large blocks over the network was trying to address. Though (b) probably occurs far less than (a) , I feel a little uneasy about solving (a) w/out including (b) b/c of the confusing user-experience it would lead to, as I mentioned earlier. Remote fetches from the cache are not that unlikely when there is load. I'm OK with delaying (c) & (d), though I think they will actually be relatively straightforward given a solution to (b). (The earlier PR had a solution to (d), but now that I see how to transfer over 2gb directly I think that solution was too complicated; I definitely do not want to merge that.)
whew, that was a long spiel, but hopefully writing this all down collects everything together. thanks for reading this far