Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: java, spec
    • Labels:

      Description

      Avro nicely supports chunking of container types into multiple frames. We need to expose this to RPC layer to facilitate use cases like the Hadoop Datanode where a single "RPC" can yield far more data than should be buffered in memory.

        Issue Links

          Activity

          Hide
          Todd Lipcon added a comment -

          Here's one proposal I discussed a bit with Philip yesterday:

          An RPC can only be considered "streamable" if at least one of the following is true:

          1. The response type is array<X> and marked "streamed" in the protocol.
          2. The last request parameter is array<X> and marked "streamed" in the protocol.

          The "streamed" marking is probably best suited as a mixin property (see AVRO-404) since this should be entirely backwards-compatible for RPC clients/servers that don't support streaming. It just specifies to the RPC client/server that a particular API should be used.

          Let's take the following example RPCs for discussion:

          record Chunk {
            fixed checksum(4);
            binary data;
          }
          streamed array<Chunk> getBlock(int blockId);
          PutResult putBlock(int blockid, streamed array<Chunk> chunks)
          

          The non-streamed java interfaces would look like:

          List<Chunk> getBlock(int blockId);
          PutResult putBlock(int blockId, List<Chunk> chunks);
          

          If streaming is enabled for these chunks, it would change to:

          Iterable<Chunk> getBlock(int blockId);
          PutResult putBlock(int blockId, Iterable<Chunk> chunks);
          

          In this case, the iterable would stream the data in from the network as it is iterated for putBlock. And getBlock would be responsible for returning an iterable that generates response packets until the block has been entirely sent.

          Users of this would probably do something like the following most often:

          record ReadHeader

          { int statusCode; ... }

          union ReadPacket

          { ReadHeader, Chunk }

          ;
          streamed array<ReadPacket> getBlock(int blockId);

          and then document that it will always send one ReadHeader followed by an unspecified number of chunks.

          The streaming parameters and streaming response could certainly be used together (eg to provide a putBlock() that "acks" sequence numbers as they're written to disk)

          For an event-driven server or client, the APIs would probably be more callback-oriented, but let's start here.

          Show
          Todd Lipcon added a comment - Here's one proposal I discussed a bit with Philip yesterday: An RPC can only be considered "streamable" if at least one of the following is true: The response type is array<X> and marked "streamed" in the protocol. The last request parameter is array<X> and marked "streamed" in the protocol. The "streamed" marking is probably best suited as a mixin property (see AVRO-404 ) since this should be entirely backwards-compatible for RPC clients/servers that don't support streaming. It just specifies to the RPC client/server that a particular API should be used. Let's take the following example RPCs for discussion: record Chunk { fixed checksum(4); binary data; } streamed array<Chunk> getBlock( int blockId); PutResult putBlock( int blockid, streamed array<Chunk> chunks) The non-streamed java interfaces would look like: List<Chunk> getBlock( int blockId); PutResult putBlock( int blockId, List<Chunk> chunks); If streaming is enabled for these chunks, it would change to: Iterable<Chunk> getBlock( int blockId); PutResult putBlock( int blockId, Iterable<Chunk> chunks); In this case, the iterable would stream the data in from the network as it is iterated for putBlock. And getBlock would be responsible for returning an iterable that generates response packets until the block has been entirely sent. Users of this would probably do something like the following most often: record ReadHeader { int statusCode; ... } union ReadPacket { ReadHeader, Chunk } ; streamed array<ReadPacket> getBlock(int blockId); and then document that it will always send one ReadHeader followed by an unspecified number of chunks. The streaming parameters and streaming response could certainly be used together (eg to provide a putBlock() that "acks" sequence numbers as they're written to disk) For an event-driven server or client, the APIs would probably be more callback-oriented, but let's start here.
          Hide
          Doug Cutting added a comment -

          The existing API to minimize copies is Decoder#readBytes(ByteBuffer). When writing data, if a value is larger than 4k or so, it is written as a separate frame. If, when reading, one calls Decoder#readBytes(ByteBuffer) at a frame boundary, it should read directly from the socket or file handle without intermediate buffering.

          For the datanode case, I had imagined that one would make RPC requests to read a chunk at a time, and each would be returned with its checksum. To avoid round-trip delays, the RPC client would need to be able to submit requests without blocking. It could then submit multiple requests, then read responses until the number of outstanding requests fell below some threshold, then submit more requests. If the transport can return responses out of order then the client would need to re-order them. Alternately, if a transport like HTTP is used, multiple requests can be sent over a connection and their responses are guaranteed to be returned in order.

          Show
          Doug Cutting added a comment - The existing API to minimize copies is Decoder#readBytes(ByteBuffer). When writing data, if a value is larger than 4k or so, it is written as a separate frame. If, when reading, one calls Decoder#readBytes(ByteBuffer) at a frame boundary, it should read directly from the socket or file handle without intermediate buffering. For the datanode case, I had imagined that one would make RPC requests to read a chunk at a time, and each would be returned with its checksum. To avoid round-trip delays, the RPC client would need to be able to submit requests without blocking. It could then submit multiple requests, then read responses until the number of outstanding requests fell below some threshold, then submit more requests. If the transport can return responses out of order then the client would need to re-order them. Alternately, if a transport like HTTP is used, multiple requests can be sent over a connection and their responses are guaranteed to be returned in order.
          Hide
          Todd Lipcon added a comment -

          Sending an RPC per chunk seems like a significant amount of overhead. A streaming RPC response also makes good sense for scan or multiget APIs in HBase - if you want to perform a scan of a few thousand rows with minimal latency, it's best not to have to worry about sending 1000 separate RPCs and congealing the responses. That would add up to a significant CPU overhead, and probably transfer overhead as well - imagine a multiget where each cell is only a few bytes, but you want several thousand of them.

          Show
          Todd Lipcon added a comment - Sending an RPC per chunk seems like a significant amount of overhead. A streaming RPC response also makes good sense for scan or multiget APIs in HBase - if you want to perform a scan of a few thousand rows with minimal latency, it's best not to have to worry about sending 1000 separate RPCs and congealing the responses. That would add up to a significant CPU overhead, and probably transfer overhead as well - imagine a multiget where each cell is only a few bytes, but you want several thousand of them.
          Hide
          Doug Cutting added a comment -

          I don't have a problem with streaming RPC responses. The Encoder and Decoder API do support streaming, but currently with the assumption that no single binary value is bigger than can be buffered. Have you looked at the BlockingEncoder?

          Show
          Doug Cutting added a comment - I don't have a problem with streaming RPC responses. The Encoder and Decoder API do support streaming, but currently with the assumption that no single binary value is bigger than can be buffered. Have you looked at the BlockingEncoder?
          Hide
          Philip Zeyliger added a comment -

          We support blocking for complex types (arrays and maps) on the wire. But, the way all the APIs are right now, the RPC client would have to be able to put the whole thing in memory, which often doesn't make sense. That's what Todd's approach is suggesting. You should be able to have a meaningful "sendMeFiveHundredMegabytes()" command that can be used by a client with only a modicum of clever handling.

          Show
          Philip Zeyliger added a comment - We support blocking for complex types (arrays and maps) on the wire. But, the way all the APIs are right now, the RPC client would have to be able to put the whole thing in memory, which often doesn't make sense. That's what Todd's approach is suggesting. You should be able to have a meaningful "sendMeFiveHundredMegabytes()" command that can be used by a client with only a modicum of clever handling.
          Hide
          Doug Cutting added a comment -

          > You should be able to have a meaningful "sendMeFiveHundredMegabytes()" command that can be used by a client with only a modicum of clever handling.

          A way of doing this without changing anything in the spec is to transmit an array of chunks, since arrays are already blocked and may be arbitrarily long. Todd's API above makes the array of chunks explicit, so perhaps we're okay with that approach?

          BlockingBinaryEncoder intelligently breaks arrays when they're too big for a block, but not otherwise, minimizing block overhead. Applications can stream writes and reads of arbitrarily-large complex objects using this class. One option is thus to code directly to the Encoder/Decoder API and rely on ValidatingEncoder and ValidatingDecoder to ensure that calls conform to a schema: Encoder/Decoder is Avro's streaming API.

          We don't yet have a higher-level API that permits streaming arbitrarily large items. The Iterable<Chunk> approach Todd proposes should work in some cases. To my thinking it is only applicable when the final parameter of a method is an array type and/or when the return type is an array type. Does that sound right? One could annotate the schema somehow, so that the compiler generates this alternate API, or perhaps the compiler could simply be modified to always generate both styles of methods when these conditions are met.

          The Transceiver interface would also need to change from using List<ByteBuffer> to Iterator<ByteBuffer>, so that it can return a response before it has been entirely read and accept a request before it has been entirely written.

          Show
          Doug Cutting added a comment - > You should be able to have a meaningful "sendMeFiveHundredMegabytes()" command that can be used by a client with only a modicum of clever handling. A way of doing this without changing anything in the spec is to transmit an array of chunks, since arrays are already blocked and may be arbitrarily long. Todd's API above makes the array of chunks explicit, so perhaps we're okay with that approach? BlockingBinaryEncoder intelligently breaks arrays when they're too big for a block, but not otherwise, minimizing block overhead. Applications can stream writes and reads of arbitrarily-large complex objects using this class. One option is thus to code directly to the Encoder/Decoder API and rely on ValidatingEncoder and ValidatingDecoder to ensure that calls conform to a schema: Encoder/Decoder is Avro's streaming API. We don't yet have a higher-level API that permits streaming arbitrarily large items. The Iterable<Chunk> approach Todd proposes should work in some cases. To my thinking it is only applicable when the final parameter of a method is an array type and/or when the return type is an array type. Does that sound right? One could annotate the schema somehow, so that the compiler generates this alternate API, or perhaps the compiler could simply be modified to always generate both styles of methods when these conditions are met. The Transceiver interface would also need to change from using List<ByteBuffer> to Iterator<ByteBuffer>, so that it can return a response before it has been entirely read and accept a request before it has been entirely written.
          Hide
          Todd Lipcon added a comment -

          A way of doing this without changing anything in the spec is to transmit an array of chunks, since arrays are already blocked and may be arbitrarily long

          To my thinking it is only applicable when the final parameter of a method is an array type and/or when the return type is an array type

          Agreed on both fronts. Having multiple streamed parameters would be very tricky to understand since you can't start streaming the second parameter until the first is complete. The same goes for streamed record members - I don't think we should allow it since we can't ensure that the client will consume them in the correct order.

          One could annotate the schema somehow, so that the compiler generates this alternate API

          This was my "streamed array<Chunk>" notation above. In JSON syntax it might look something like

          
          
          { "type": "array", "items": "Chunk", "streamed: "true" }

          This would modify the interfaces for both client and server to generate the streaming-style APIs. This is a good example usecase for mixin annotations (AVRO-404) since some users might want the "simple API" instead. The call would be completely wire-compatible between streamed and non-streamed implementations, of course.

          Show
          Todd Lipcon added a comment - A way of doing this without changing anything in the spec is to transmit an array of chunks, since arrays are already blocked and may be arbitrarily long To my thinking it is only applicable when the final parameter of a method is an array type and/or when the return type is an array type Agreed on both fronts. Having multiple streamed parameters would be very tricky to understand since you can't start streaming the second parameter until the first is complete. The same goes for streamed record members - I don't think we should allow it since we can't ensure that the client will consume them in the correct order. One could annotate the schema somehow, so that the compiler generates this alternate API This was my "streamed array<Chunk>" notation above. In JSON syntax it might look something like { "type": "array", "items": "Chunk", "streamed: "true" } This would modify the interfaces for both client and server to generate the streaming-style APIs. This is a good example usecase for mixin annotations ( AVRO-404 ) since some users might want the "simple API" instead. The call would be completely wire-compatible between streamed and non-streamed implementations, of course.
          Hide
          Doug Cutting added a comment -

          > This was my "streamed array<Chunk>" notation above

          Right, but I'm not yet convinced we need an optional notation. Above I meant to suggest that the compiler could always emit both APIs when the last parameter is an array, i.e both 'putChunks(List<Chunk>)' and 'putChunks(Iterable<Chunk>)'. However that wouldn't work for return values, since we can't overload them, so we'd have to mangle names, which is ugly.

          Instead, perhaps the compiler and runtime could always use Iterable for Array parameters and return values? Then, folks can pass either a list or something that's lazier and materialized on the fly. The runtime, when the return type is an array that uses more than one block, could return an iterator that reads remaining responses directly from the connection.

          Show
          Doug Cutting added a comment - > This was my "streamed array<Chunk>" notation above Right, but I'm not yet convinced we need an optional notation. Above I meant to suggest that the compiler could always emit both APIs when the last parameter is an array, i.e both 'putChunks(List<Chunk>)' and 'putChunks(Iterable<Chunk>)'. However that wouldn't work for return values, since we can't overload them, so we'd have to mangle names, which is ugly. Instead, perhaps the compiler and runtime could always use Iterable for Array parameters and return values? Then, folks can pass either a list or something that's lazier and materialized on the fly. The runtime, when the return type is an array that uses more than one block, could return an iterator that reads remaining responses directly from the connection.
          Hide
          Todd Lipcon added a comment -

          Above I meant to suggest that the compiler could always emit both APIs when the last parameter is an array

          I think this makes sense on the client side, but for the server side interface, you wouldn't want to have to implement both, right?

          Then, folks can pass either a list or something that's lazier and materialized on the fly

          The disadvantage here of treating everything like an Iterable is that we won't be able to check .size() ahead of time to set up the right size array block, and we'll end up chunking all arrays in single-element blocks. We could of course do something like if (ret instanceof Collection)

          { int size = ret.size(); }

          but it seems kind of evil.

          Show
          Todd Lipcon added a comment - Above I meant to suggest that the compiler could always emit both APIs when the last parameter is an array I think this makes sense on the client side, but for the server side interface, you wouldn't want to have to implement both, right? Then, folks can pass either a list or something that's lazier and materialized on the fly The disadvantage here of treating everything like an Iterable is that we won't be able to check .size() ahead of time to set up the right size array block, and we'll end up chunking all arrays in single-element blocks. We could of course do something like if (ret instanceof Collection) { int size = ret.size(); } but it seems kind of evil.
          Hide
          Doug Cutting added a comment -

          > The disadvantage here of treating everything like an Iterable is that we won't be able to check .size() ahead of time to set up the right size array block, and we'll end up chunking all arrays in single-element blocks.

          BlockingBinaryEncoder solves this already this for small objects, buffering them until a block is full. And if an element is a large ByteBuffer, then it can be immediately written as a single-element block without copying and without significant overhead.

          Show
          Doug Cutting added a comment - > The disadvantage here of treating everything like an Iterable is that we won't be able to check .size() ahead of time to set up the right size array block, and we'll end up chunking all arrays in single-element blocks. BlockingBinaryEncoder solves this already this for small objects, buffering them until a block is full. And if an element is a large ByteBuffer, then it can be immediately written as a single-element block without copying and without significant overhead.
          Hide
          ryan rawson added a comment -

          The HBase API is really returning a 2 dimensional array of byte arrays... Each cell (row/col/version) is one byte array, and then each row is an array of those, and each result set is multiple rows. Perhaps another way to think of this is an endless array of cells with inferred 'next row' from watching the row key between cells.

          The optimization available to us here is that each hbase RPC call spends very little time actually in hbase but ends up waiting on Datanode to get the data back. Some kind of async framework on the server side could help chain daemons that make multiple RPCs and avoid busy-waiting threads.

          One other thought, right now we use a block-oriented loader, since right now you have to store the entire value in RAM at least once (during RPC and memstore times), but if someone wanted to store massive values in hbase we could use the DN streaming API and stream those chunks back to the client. Right now everything is modelled as arrays of bytes, so that might not be so hard to do. I'm a little wary of large object APIs, since you might as well store the data in HDFS directly.

          Right now the return type might be:
          array of array of byte

          if you say only the first enclosing array is 'streaming' that means the sub-array is NOT streamed, right?

          If so, then streaming excessively large objects in the process of streaming normal and other associated objects might not be the right thing to do.

          Show
          ryan rawson added a comment - The HBase API is really returning a 2 dimensional array of byte arrays... Each cell (row/col/version) is one byte array, and then each row is an array of those, and each result set is multiple rows. Perhaps another way to think of this is an endless array of cells with inferred 'next row' from watching the row key between cells. The optimization available to us here is that each hbase RPC call spends very little time actually in hbase but ends up waiting on Datanode to get the data back. Some kind of async framework on the server side could help chain daemons that make multiple RPCs and avoid busy-waiting threads. One other thought, right now we use a block-oriented loader, since right now you have to store the entire value in RAM at least once (during RPC and memstore times), but if someone wanted to store massive values in hbase we could use the DN streaming API and stream those chunks back to the client. Right now everything is modelled as arrays of bytes, so that might not be so hard to do. I'm a little wary of large object APIs, since you might as well store the data in HDFS directly. Right now the return type might be: array of array of byte if you say only the first enclosing array is 'streaming' that means the sub-array is NOT streamed, right? If so, then streaming excessively large objects in the process of streaming normal and other associated objects might not be the right thing to do.
          Hide
          Todd Lipcon added a comment -

          if you say only the first enclosing array is 'streaming' that means the sub-array is NOT streamed, right?

          Correct. To be really technical, on a wire level we could stream any structure that is "tail streamable"... by which I mean array<foo>, or array<array<foo>>, or array<array<MyRecord>> where MyRecord's last field is "tail streamable". However, it will be impossible to enforce that clients or servers consume/provide the values in the correct order. For example:

          void doStuff(Iterable<Iterable<Foo>> inputFoos) {
            for (Iterable<Foo> fooIter : inputFoos) {
              for (Foo foos : fooIter) {
                // do something with foo
              }
            }
          }
          

          could work, since the user is consuming the input in the same order it's being serialized on the wire. However, if the outer iterator were moved before all of the inner iterator's data was consumed, it would no longer work (the second array<Foo> isn't available until the first array<Foo> is done). Granted, we could "skip ahead" at this point, but I think this complexity would be very bad, and probably not clear for framework users either.

          For your use case, could you get by with a bit more application-level logic and change your array<array<Cell>> to something more like:

          record ResponseChunk {
            boolean continuingPreviousRow;
            array<Cell> cells;
          } 
          array<ResponseChunk> getCells(...)
          

          where you'd send a few cells at a time in a ResponseChunk, and unwrap them on the other side into whatever user-level API you want?

          If so, then streaming excessively large objects in the process of streaming normal and other associated objects might not be the right thing to do.

          Sorry, I couldn't parse this sentence. Can you explain further what you mean? I guess you're referring to streaming large binary values? If so, I think it will be impossible to do it in a general way from the API even if the wire protocol supports it. The large binary values can always be "chunked" as above and it shouldn't be a big hassle for developers, right?

          (should be noted this is probably an "advanced feature" that only a few hardcore apps will need to use... in particular HBase and Hadoop )

          Show
          Todd Lipcon added a comment - if you say only the first enclosing array is 'streaming' that means the sub-array is NOT streamed, right? Correct. To be really technical, on a wire level we could stream any structure that is "tail streamable"... by which I mean array<foo>, or array<array<foo>>, or array<array<MyRecord>> where MyRecord's last field is "tail streamable". However, it will be impossible to enforce that clients or servers consume/provide the values in the correct order. For example: void doStuff(Iterable<Iterable<Foo>> inputFoos) { for (Iterable<Foo> fooIter : inputFoos) { for (Foo foos : fooIter) { // do something with foo } } } could work, since the user is consuming the input in the same order it's being serialized on the wire. However, if the outer iterator were moved before all of the inner iterator's data was consumed, it would no longer work (the second array<Foo> isn't available until the first array<Foo> is done). Granted, we could "skip ahead" at this point, but I think this complexity would be very bad, and probably not clear for framework users either. For your use case, could you get by with a bit more application-level logic and change your array<array<Cell>> to something more like: record ResponseChunk { boolean continuingPreviousRow; array<Cell> cells; } array<ResponseChunk> getCells(...) where you'd send a few cells at a time in a ResponseChunk, and unwrap them on the other side into whatever user-level API you want? If so, then streaming excessively large objects in the process of streaming normal and other associated objects might not be the right thing to do. Sorry, I couldn't parse this sentence. Can you explain further what you mean? I guess you're referring to streaming large binary values? If so, I think it will be impossible to do it in a general way from the API even if the wire protocol supports it. The large binary values can always be "chunked" as above and it shouldn't be a big hassle for developers, right? (should be noted this is probably an "advanced feature" that only a few hardcore apps will need to use... in particular HBase and Hadoop )
          Hide
          Todd Lipcon added a comment -

          I think it's worth noting that I'm essentially saying we should expect people to write "mini protocols" at the application on top of avro when they want to do really complicated stuff like this. It's a bit annoying, but I think it still provides good benefit over "do it yourself transport" - the protocol will be way easier to evolve, and all the other transport features will be leveraged.

          Would like to see arguments against this, though, if someone has one!

          Show
          Todd Lipcon added a comment - I think it's worth noting that I'm essentially saying we should expect people to write "mini protocols" at the application on top of avro when they want to do really complicated stuff like this. It's a bit annoying, but I think it still provides good benefit over "do it yourself transport" - the protocol will be way easier to evolve, and all the other transport features will be leveraged. Would like to see arguments against this, though, if someone has one!
          Hide
          ryan rawson added a comment -

          I was just musing about the idea of mixing large objects and non-large objects in the same result stream, and streaming where necessary and not. But like you said if you are an array<array<byte>> then it'd be hard to do tail streaming.

          If we make the hard easy and the impossible merely difficult i think the job will be a success.

          Show
          ryan rawson added a comment - I was just musing about the idea of mixing large objects and non-large objects in the same result stream, and streaming where necessary and not. But like you said if you are an array<array<byte>> then it'd be hard to do tail streaming. If we make the hard easy and the impossible merely difficult i think the job will be a success.
          Hide
          Doug Cutting added a comment -

          > if you say only the first enclosing array is 'streaming' that means the sub-array is NOT streamed, right?

          The current Avro binary data format supports streaming of nested arrays. For example, BlockingBinaryEncoder implements this so that, if an inner array has millions of elements, it is blocked. Each block is prefixed not just with the number of elements, but also with the number of bytes in the block. So arbitrarily large, nested data structures may already be efficiently streamed through Avro.

          As I think about it more, I believe the Iterator<Iterator<Foo>> approach has merit. Avro's runtime supports the notion of efficient skipping. It doesn't seem overly complex for the outer iterator to know whether the inner iterator has completed or not, and, if it's not, skip accordingly. I beleive this can be implemented with a call to ParsingDecoder#skipTo(level), where level is the parser's stack level of the outer iterator. Note that, since AVRO-388, all Java Binary decoders are now parser-based, and hence support this. I find the consistency and generality of this model attractive.

          Thiru, does this sound plausible?

          Show
          Doug Cutting added a comment - > if you say only the first enclosing array is 'streaming' that means the sub-array is NOT streamed, right? The current Avro binary data format supports streaming of nested arrays. For example, BlockingBinaryEncoder implements this so that, if an inner array has millions of elements, it is blocked. Each block is prefixed not just with the number of elements, but also with the number of bytes in the block. So arbitrarily large, nested data structures may already be efficiently streamed through Avro. As I think about it more, I believe the Iterator<Iterator<Foo>> approach has merit. Avro's runtime supports the notion of efficient skipping. It doesn't seem overly complex for the outer iterator to know whether the inner iterator has completed or not, and, if it's not, skip accordingly. I beleive this can be implemented with a call to ParsingDecoder#skipTo(level), where level is the parser's stack level of the outer iterator. Note that, since AVRO-388 , all Java Binary decoders are now parser-based, and hence support this. I find the consistency and generality of this model attractive. Thiru, does this sound plausible?
          Hide
          Todd Lipcon added a comment -

          As I think about it more, I believe the Iterator<Iterator<Foo>> approach has merit. Avro's runtime supports the notion of efficient skipping

          Skipping is one thing, but there's no way to rewind a socket. For example, what do we do about the following user code:

          void myStreamedRpc(Iterable<Iterable<Foo>> myArg) {
            Iterator outerIter = myArg.iterator();
            Iterator<Foo> firstIter = outerIter.next().iterator();
            Iterator<Foo> secondIter = outerIter.next().iterator();
            Foo a = secondIter.next();
            Foo b = firstIter.next();
          }
          

          This code tries to read the data off the stream in the opposite order from which they arrive. When assigning 'a' we could certainly skip all of firstIter's data, but then we'd be screwed when it comes time to assign 'b' since we can't skip back. We could buffer all of firstIter as soon as we access secondIter, but then we're not being very transparent at all to end users. I don't like the idea that users of this API would have to really understand its workings under the hood or else risk potentially unbounded memory buffering.

          Keeping the API restricted to "you may have one streamed input and one streamed output" has its downsides in loss of generality, but at least it is very transparent to implementors, and slight deviations in processing order can't cause huge swings in performance.

          Show
          Todd Lipcon added a comment - As I think about it more, I believe the Iterator<Iterator<Foo>> approach has merit. Avro's runtime supports the notion of efficient skipping Skipping is one thing, but there's no way to rewind a socket. For example, what do we do about the following user code: void myStreamedRpc(Iterable<Iterable<Foo>> myArg) { Iterator outerIter = myArg.iterator(); Iterator<Foo> firstIter = outerIter.next().iterator(); Iterator<Foo> secondIter = outerIter.next().iterator(); Foo a = secondIter.next(); Foo b = firstIter.next(); } This code tries to read the data off the stream in the opposite order from which they arrive. When assigning 'a' we could certainly skip all of firstIter's data, but then we'd be screwed when it comes time to assign 'b' since we can't skip back. We could buffer all of firstIter as soon as we access secondIter, but then we're not being very transparent at all to end users. I don't like the idea that users of this API would have to really understand its workings under the hood or else risk potentially unbounded memory buffering. Keeping the API restricted to "you may have one streamed input and one streamed output" has its downsides in loss of generality, but at least it is very transparent to implementors, and slight deviations in processing order can't cause huge swings in performance.
          Hide
          Doug Cutting added a comment -

          Todd, I think that last line could throw an exception. I don't think it's unreasonable to define the lifetime of an inner iterator to end when next() is called on the outer iterator.

          Show
          Doug Cutting added a comment - Todd, I think that last line could throw an exception. I don't think it's unreasonable to define the lifetime of an inner iterator to end when next() is called on the outer iterator.
          Hide
          Todd Lipcon added a comment -

          So would the restriction be that you can only have nested array<array<array<...>>> type streamables?

          Do we allow something like:

          record MyRecord {
            array<Chunk> chunks;
            array<Blahs> blahs;
            StatusCode status;
          }
          

          ? In this case, it's not "tail streamable" to use my definition above. Do we force people to use accessors and then throw an exception if getStatus() is called before chunks and blahs have both been fully iterated? Do we enforce that you can't start iterating blahs until chunks have been iterated? I think this will be very confusing.

          Show
          Todd Lipcon added a comment - So would the restriction be that you can only have nested array<array<array<...>>> type streamables? Do we allow something like: record MyRecord { array<Chunk> chunks; array<Blahs> blahs; StatusCode status; } ? In this case, it's not "tail streamable" to use my definition above. Do we force people to use accessors and then throw an exception if getStatus() is called before chunks and blahs have both been fully iterated? Do we enforce that you can't start iterating blahs until chunks have been iterated? I think this will be very confusing.
          Hide
          Doug Cutting added a comment -

          > Do we enforce that you can't start iterating blahs until chunks have been iterated?

          I'm just exploring whether and how we might use a uniform, iterator-based API. We might optimize the runtime in particular cases, and these optimizations might result in runtime restrictions. These optimizations might be switchable at runtime. So, tail-streamable might be enabled by default, and all others would be fully buffered. But you might be able to call a #setStreamNested(boolean) method that permits nested things to be streamed but requires that datastructures be accessed in-order.

          Show
          Doug Cutting added a comment - > Do we enforce that you can't start iterating blahs until chunks have been iterated? I'm just exploring whether and how we might use a uniform, iterator-based API. We might optimize the runtime in particular cases, and these optimizations might result in runtime restrictions. These optimizations might be switchable at runtime. So, tail-streamable might be enabled by default, and all others would be fully buffered. But you might be able to call a #setStreamNested(boolean) method that permits nested things to be streamed but requires that datastructures be accessed in-order.
          Hide
          Philip Zeyliger added a comment -

          I'm wary of using Iterators all the time. Users typically know at design time whether they have a small array, where they'd like the advantages of random access and .size() or whether they have an iterator representing a possibly big stream. Using an annotation in the schema to specify this makes quite a bit of sense.

          It might also be simpler to only do the iteration if it's top-level for return values.

          This also reminds me a bit of AVRO-391 (DoS possible). It would be a neat feature to put in "maximum_allowable_size" in schemas, saying that, hey, this record should never exceed 10K.

          – Philip

          Show
          Philip Zeyliger added a comment - I'm wary of using Iterators all the time. Users typically know at design time whether they have a small array, where they'd like the advantages of random access and .size() or whether they have an iterator representing a possibly big stream. Using an annotation in the schema to specify this makes quite a bit of sense. It might also be simpler to only do the iteration if it's top-level for return values. This also reminds me a bit of AVRO-391 (DoS possible). It would be a neat feature to put in "maximum_allowable_size" in schemas, saying that, hey, this record should never exceed 10K. – Philip
          Hide
          Todd Lipcon added a comment -

          We just had a bit of a discussion about this. In discussion, we determined that the primary difficulty is going to be flow control when multiplexing several streamed RPCs over the same TCP pipe.

          For example, if there are two block writes streaming into one Hadoop DN, going to different disks, we need a way of applying backpressure on one write without backpressure on the other. This implies that we either (a) must implement our own flow control mechanism for streamed calls, or (b) cannot multiplex multiple streamed calls on the same TCP stream, thus allowing normal TCP backpressure to work here.

          Unfortunately, option A above is a pain, since we'd end up reimplementing sliding window. Option B is a pain because we're throwing away other benefits of keeping a low number of sockets open (eg avoiding resetting TCP slow start for each block read).

          Show
          Todd Lipcon added a comment - We just had a bit of a discussion about this. In discussion, we determined that the primary difficulty is going to be flow control when multiplexing several streamed RPCs over the same TCP pipe. For example, if there are two block writes streaming into one Hadoop DN, going to different disks, we need a way of applying backpressure on one write without backpressure on the other. This implies that we either (a) must implement our own flow control mechanism for streamed calls, or (b) cannot multiplex multiple streamed calls on the same TCP stream, thus allowing normal TCP backpressure to work here. Unfortunately, option A above is a pain, since we'd end up reimplementing sliding window. Option B is a pain because we're throwing away other benefits of keeping a low number of sockets open (eg avoiding resetting TCP slow start for each block read).
          Hide
          André Cruz added a comment -

          Has streaming support been definitely abandoned? It would be very useful for Cassandra.

          Show
          André Cruz added a comment - Has streaming support been definitely abandoned? It would be very useful for Cassandra.

            People

            • Assignee:
              Unassigned
              Reporter:
              Todd Lipcon
            • Votes:
              2 Vote for this issue
              Watchers:
              26 Start watching this issue

              Dates

              • Created:
                Updated:

                Development