Kafka
  1. Kafka
  2. KAFKA-406

Gzipped payload is a fully wrapped Message (with headers), not just payload

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Won't Fix
    • Affects Version/s: 0.7.1
    • Fix Version/s: None
    • Component/s: core
    • Labels:
      None
    • Environment:
      N/A

      Description

      When creating a gzipped MessageSet, the collection of Messages is passed to CompressionUtils.compress(), where each message is serialised [1] into a buffer (not just the payload, the full Message with headers, uncompressed), then gripped, and finally wrapped into another Message [2].

      In other words, the consumer has to unwrap the Message flagged as gzipped, unzip the payload, and unwrap the unzipped payload again as a non-compressed Message.

      Is this double-wrapping the intended behaviour?

      [1] messages.foreach(m => m.serializeTo(messageByteBuffer))

      [2] new Message(outputStream.toByteArray, compressionCodec)

        Activity

        Hide
        Neha Narkhede added a comment -

        Yes, this was intentional. The idea was to represent a compressed set of message just like a kafka Message. For this, we compress several messages into an array of bytes and treat it as the payload of a Message with compression codec GZIP or Snappy. This makes it easier to reason about compressed messages, since everything is a Message with a byte in its header telling us whether it is compressed or not.

        Show
        Neha Narkhede added a comment - Yes, this was intentional. The idea was to represent a compressed set of message just like a kafka Message. For this, we compress several messages into an array of bytes and treat it as the payload of a Message with compression codec GZIP or Snappy. This makes it easier to reason about compressed messages, since everything is a Message with a byte in its header telling us whether it is compressed or not.
        Hide
        Lorenzo Alberton added a comment -

        I see. It needs to be documented somewhere though, as people writing client libs might have trouble reverse-engineering the protocol. The fact that only gzipped messages are double-wrapped (requiring different handling depending on a flag) can be confusing.

        This might be a stupid question, but if a Message can contain a MessageSet with more than one Message, how is the consumer supposed to iterate through them? Children of a MessageSet might be Message objects OR MessageSet objects?

        Show
        Lorenzo Alberton added a comment - I see. It needs to be documented somewhere though, as people writing client libs might have trouble reverse-engineering the protocol. The fact that only gzipped messages are double-wrapped (requiring different handling depending on a flag) can be confusing. This might be a stupid question, but if a Message can contain a MessageSet with more than one Message, how is the consumer supposed to iterate through them? Children of a MessageSet might be Message objects OR MessageSet objects?
        Hide
        Lorenzo Alberton added a comment -

        After sleeping over this, I think it's a really bad design decision. I appreciate that gzipping multiple messages together can lead to significant space savings, but I'm not convinced this is the right way. Since a compressed message can contain a collection of messages, the symmetry with the non-compressed message interface is broken, and a linear log is turned into an odd tree structure. This can't even be classified as normal iterator polymorphism.

        Other three very good reasons to rethink this design decision:

        • as Michal also noted on the kafka-dev mailing list [1], the compression flag of a child of a compressed message could easily slip to 1, leading to endless recursion calls.
        • the collection within a compressed message can't be partially consumed, i.e. you can't save the offset within the inner collection, as it would result in an invalid offset for the kafka log. The inner collection has to be consumed as a whole and the offset needs to be advanced to the next Message in the outer collection, breaking another important Kafka property.
        • even if we only allow one single message (instead of a collection) as compressed payload of an outer Message, I don't see the need for the extra wrapping: the outer message has a CRC to verify that the gzipped payload is valid, and gzip itself has a CRC on the content, no need to have a 3rd CRC on the uncompressed message (waste of space and CPU).

        Thoughts?

        Best,

        Lorenzo Alberton
        Chief Tech Architect
        DataSift, Inc.

        [1] http://mail-archives.apache.org/mod_mbox/incubator-kafka-dev/201207.mbox/%3CCAP5ZrEiDjUyhYuNpmh7Xck1dzdCROG_pEgdKbZdDV2yXrXxQAg%40mail.gmail.com%3Ec

        Show
        Lorenzo Alberton added a comment - After sleeping over this, I think it's a really bad design decision. I appreciate that gzipping multiple messages together can lead to significant space savings, but I'm not convinced this is the right way. Since a compressed message can contain a collection of messages, the symmetry with the non-compressed message interface is broken, and a linear log is turned into an odd tree structure. This can't even be classified as normal iterator polymorphism. Other three very good reasons to rethink this design decision: as Michal also noted on the kafka-dev mailing list [1] , the compression flag of a child of a compressed message could easily slip to 1, leading to endless recursion calls. the collection within a compressed message can't be partially consumed, i.e. you can't save the offset within the inner collection, as it would result in an invalid offset for the kafka log. The inner collection has to be consumed as a whole and the offset needs to be advanced to the next Message in the outer collection, breaking another important Kafka property. even if we only allow one single message (instead of a collection) as compressed payload of an outer Message, I don't see the need for the extra wrapping: the outer message has a CRC to verify that the gzipped payload is valid, and gzip itself has a CRC on the content, no need to have a 3rd CRC on the uncompressed message (waste of space and CPU). Thoughts? Best, – Lorenzo Alberton Chief Tech Architect DataSift, Inc. [1] http://mail-archives.apache.org/mod_mbox/incubator-kafka-dev/201207.mbox/%3CCAP5ZrEiDjUyhYuNpmh7Xck1dzdCROG_pEgdKbZdDV2yXrXxQAg%40mail.gmail.com%3Ec
        Hide
        Jay Kreps added a comment -

        I actually don't see how the compression flag can slip to 1 unnoticed in a checksummed message sent over tcp. I mean you could incorrectly set it to 1, but you could incorrectly set all kinds of things in a client implementation including the message contents. Not sure if I am missing something...

        I actually share your distaste for some of the details of the compression implementation. But I think batch compression is fundamentally an invasive feature for a message-at-a-time system, so I am not sure if we can do better. I think we would be open to hearing alternative approaches, if fully thought through, though it would likely be a big change.

        Here are the basic requirements from my point of view:
        1. It must be batch compression. Single message compression doesn't buy much for concise serialization formats and in any case can be implemented in the client
        2. Compression must be maintained both in the on-disk format and the network. On disk, compression effectively triples the effective per-node cache size in our usage. In our usage we always produce to a local cluster and that then replicates cross datacenter. So for us the consumer is what really must be compressed over the network. However I am not sure that that is a universal design so ideally both producer and consumer should allow compression, although I think it is okay if the server decompresses and re-compresses in a different batch size.
        3. It should not break the message-at-a-time API.

        The alternative we considered was a paged log (e.g. stuffing messages into fixed-size pages). I am not sure if this is better or worse but we ended up rejecting it due to the complexity of implementation which would require splitting messages over pages on overflow etc.

        Show
        Jay Kreps added a comment - I actually don't see how the compression flag can slip to 1 unnoticed in a checksummed message sent over tcp. I mean you could incorrectly set it to 1, but you could incorrectly set all kinds of things in a client implementation including the message contents. Not sure if I am missing something... I actually share your distaste for some of the details of the compression implementation. But I think batch compression is fundamentally an invasive feature for a message-at-a-time system, so I am not sure if we can do better. I think we would be open to hearing alternative approaches, if fully thought through, though it would likely be a big change. Here are the basic requirements from my point of view: 1. It must be batch compression. Single message compression doesn't buy much for concise serialization formats and in any case can be implemented in the client 2. Compression must be maintained both in the on-disk format and the network. On disk, compression effectively triples the effective per-node cache size in our usage. In our usage we always produce to a local cluster and that then replicates cross datacenter. So for us the consumer is what really must be compressed over the network. However I am not sure that that is a universal design so ideally both producer and consumer should allow compression, although I think it is okay if the server decompresses and re-compresses in a different batch size. 3. It should not break the message-at-a-time API. The alternative we considered was a paged log (e.g. stuffing messages into fixed-size pages). I am not sure if this is better or worse but we ended up rejecting it due to the complexity of implementation which would require splitting messages over pages on overflow etc.
        Hide
        Jay Kreps added a comment -

        Oh yes, and the other design requirement we had was that messages not be re-compressed on a fetch request. A simple implementation that didn't have this requirement would just be to have the consumer request N messages, and either specify to compress or not, and have the server read these into memory, decompress if its local log format is comrpessed, and then batch compress exactly the messages the client asked for, and send just that. The problem with this is that we have about a 5x read-to-write ratio so recompressing on each read is now recompressing the same stuff 5 times on average. This makes consumption way more expensive. I don't think this is a hard requirement but to make that approach fly we would have to demonstrate that the cpu overhead of compression would not become a serious bottleneck. I know this won't work with GZIP, but it might be possible to do it with snappy or a faster compression algo.

        Show
        Jay Kreps added a comment - Oh yes, and the other design requirement we had was that messages not be re-compressed on a fetch request. A simple implementation that didn't have this requirement would just be to have the consumer request N messages, and either specify to compress or not, and have the server read these into memory, decompress if its local log format is comrpessed, and then batch compress exactly the messages the client asked for, and send just that. The problem with this is that we have about a 5x read-to-write ratio so recompressing on each read is now recompressing the same stuff 5 times on average. This makes consumption way more expensive. I don't think this is a hard requirement but to make that approach fly we would have to demonstrate that the cpu overhead of compression would not become a serious bottleneck. I know this won't work with GZIP, but it might be possible to do it with snappy or a faster compression algo.
        Hide
        Jun Rao added a comment -

        The current code actually supports arbitrary levels of recursion of a message (i.e., message with compressed messages, each of which has compressed messages, etc). I'd agree that this is not a very useful feature and is potentially dangerous. We can probably add some code to limit the recursion to 2 levels.

        Show
        Jun Rao added a comment - The current code actually supports arbitrary levels of recursion of a message (i.e., message with compressed messages, each of which has compressed messages, etc). I'd agree that this is not a very useful feature and is potentially dangerous. We can probably add some code to limit the recursion to 2 levels.
        Hide
        Lorenzo Alberton added a comment -

        Hi Jay, thanks for your comments. When I wrote that the compression flag could "slip" to 1 of course I meant there's nothing in the code to prevent nested levels of compressed messages, as Jun noted. I'd add some checks to prevent this at write time.

        I agree batch compression with a message-at-a-time system can't play nice with each other, so I don't have a better design to propose at this time.
        The only alternative I could see would be to disallow mixed compressed and non-compressed messages in the same topic, and having two different message set iterators.

        Anyway, for now I completely revamped the PHP client library to work with the current design (tested against code in trunk), @see KAFKA-419, so at least we can proceed .

        Show
        Lorenzo Alberton added a comment - Hi Jay, thanks for your comments. When I wrote that the compression flag could "slip" to 1 of course I meant there's nothing in the code to prevent nested levels of compressed messages, as Jun noted. I'd add some checks to prevent this at write time. I agree batch compression with a message-at-a-time system can't play nice with each other, so I don't have a better design to propose at this time. The only alternative I could see would be to disallow mixed compressed and non-compressed messages in the same topic, and having two different message set iterators. Anyway, for now I completely revamped the PHP client library to work with the current design (tested against code in trunk), @see KAFKA-419 , so at least we can proceed .

          People

          • Assignee:
            Unassigned
            Reporter:
            Lorenzo Alberton
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development