Kafka
  1. Kafka
  2. KAFKA-158

go consumer & producer to support compression

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: 0.7
    • Fix Version/s: None
    • Component/s: clients
    • Labels:

      Description

      As related to KAFKA-79, the go consumer and producer needs to support the compression attribute per https://cwiki.apache.org/confluence/display/KAFKA/Compression.

      Can someone assign this to me, i'll add support and create a patch.

      thanks

      1. kafka_158_go_compress_working_2.patch
        28 kB
        Jeffrey Damick
      2. kafka_158_go_compress_working.patch
        28 kB
        Jeffrey Damick
      3. kafka_158_go_compress.patch
        22 kB
        Jeffrey Damick
      4. KAFKA-158-try2.patch
        7 kB
        AaronR
      5. KAFKA-158.patch
        4 kB
        AaronR

        Activity

        Hide
        Neha Narkhede added a comment -

        Jeffrey, I tried assigning this to you, but your name doesn't show up in the JIRA list for Kafka.

        Show
        Neha Narkhede added a comment - Jeffrey, I tried assigning this to you, but your name doesn't show up in the JIRA list for Kafka.
        Hide
        AaronR added a comment -

        tested against the kafka/bin/producershell and consumer.

        Show
        AaronR added a comment - tested against the kafka/bin/producershell and consumer.
        Hide
        Jeffrey Damick added a comment -

        ok, but the tests weren't updated and no new ones added, it doesnt support either compressed or uncompressed messages, and it doesnt transparently decompress the messages or have a way to plugin different compression codecs.. Let me just finish up this patch...

        Show
        Jeffrey Damick added a comment - ok, but the tests weren't updated and no new ones added, it doesnt support either compressed or uncompressed messages, and it doesnt transparently decompress the messages or have a way to plugin different compression codecs.. Let me just finish up this patch...
        Hide
        AaronR added a comment -

        Take 2 on the patch, with updated tests

        Show
        AaronR added a comment - Take 2 on the patch, with updated tests
        Hide
        Jeffrey Damick added a comment -

        patch to add ability to compressed messages

        Seeing strange behavior from kafka when sending the compressed messages:

        go client sends this:

        00 00 00 36 00 00 00 04 74 65 73 74 00 00 00 00 00 00 00 26 00 00 00 22 01 01 F0 17 43 A5 1F 8B 08 00 00 00 00 00 04 FF 4A CE CF 2D 00 04 00 00 FF FF 3A 6F 0A CB 04 00 00 00

        and servers gives:

        [2011-10-20 11:51:50,106] DEBUG Listening to new connection from /127.0.0.1:59861 (kafka.network.Processor)
        [2011-10-20 11:51:50,107] TRACE 54 bytes read from /127.0.0.1:59861 (kafka.network.Processor)
        [2011-10-20 11:51:50,115] TRACE Handling produce request from /127.0.0.1:59861 (kafka.request.logger)
        [2011-10-20 11:51:50,119] TRACE Producer request ProducerRequest(test,0,38) (kafka.request.logger)
        [2011-10-20 11:51:50,119] DEBUG makeNext() in deepIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
        [2011-10-20 11:51:50,119] TRACE Remaining bytes in iterator = 34 (kafka.message.ByteBufferMessageSet)
        [2011-10-20 11:51:50,119] TRACE size of data = 34 (kafka.message.ByteBufferMessageSet)
        [2011-10-20 11:51:50,120] DEBUG Message is compressed. Valid byte count = 0 (kafka.message.ByteBufferMessageSet)
        [2011-10-20 11:51:50,133] DEBUG makeNext() in deepIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
        [2011-10-20 11:51:50,133] TRACE Remaining bytes in iterator = 0 (kafka.message.ByteBufferMessageSet)
        [2011-10-20 11:51:50,133] TRACE size of data = 1668246896 (kafka.message.ByteBufferMessageSet)
        [2011-10-20 11:51:50,134] ERROR Error processing ProduceRequest on test:0 (kafka.server.KafkaRequestHandlers)
        kafka.common.InvalidMessageSizeException: invalid message size: 1668246896 only received bytes: 0 at 0( possible causes (1) a single message larger than the fetch size; (2) log corruption )

        Show
        Jeffrey Damick added a comment - patch to add ability to compressed messages Seeing strange behavior from kafka when sending the compressed messages: go client sends this: 00 00 00 36 00 00 00 04 74 65 73 74 00 00 00 00 00 00 00 26 00 00 00 22 01 01 F0 17 43 A5 1F 8B 08 00 00 00 00 00 04 FF 4A CE CF 2D 00 04 00 00 FF FF 3A 6F 0A CB 04 00 00 00 and servers gives: [2011-10-20 11:51:50,106] DEBUG Listening to new connection from /127.0.0.1:59861 (kafka.network.Processor) [2011-10-20 11:51:50,107] TRACE 54 bytes read from /127.0.0.1:59861 (kafka.network.Processor) [2011-10-20 11:51:50,115] TRACE Handling produce request from /127.0.0.1:59861 (kafka.request.logger) [2011-10-20 11:51:50,119] TRACE Producer request ProducerRequest(test,0,38) (kafka.request.logger) [2011-10-20 11:51:50,119] DEBUG makeNext() in deepIterator: innerDone = true (kafka.message.ByteBufferMessageSet) [2011-10-20 11:51:50,119] TRACE Remaining bytes in iterator = 34 (kafka.message.ByteBufferMessageSet) [2011-10-20 11:51:50,119] TRACE size of data = 34 (kafka.message.ByteBufferMessageSet) [2011-10-20 11:51:50,120] DEBUG Message is compressed. Valid byte count = 0 (kafka.message.ByteBufferMessageSet) [2011-10-20 11:51:50,133] DEBUG makeNext() in deepIterator: innerDone = true (kafka.message.ByteBufferMessageSet) [2011-10-20 11:51:50,133] TRACE Remaining bytes in iterator = 0 (kafka.message.ByteBufferMessageSet) [2011-10-20 11:51:50,133] TRACE size of data = 1668246896 (kafka.message.ByteBufferMessageSet) [2011-10-20 11:51:50,134] ERROR Error processing ProduceRequest on test:0 (kafka.server.KafkaRequestHandlers) kafka.common.InvalidMessageSizeException: invalid message size: 1668246896 only received bytes: 0 at 0( possible causes (1) a single message larger than the fetch size; (2) log corruption )
        Hide
        Jun Rao added a comment -

        It seems that you are sending a message marked as compressed. However, after the payload is decompressed, the content is not well-formatted. Likely causes include: (1) the message is actually not compressed; (2) the compression codec is not gzip.

        Show
        Jun Rao added a comment - It seems that you are sending a message marked as compressed. However, after the payload is decompressed, the content is not well-formatted. Likely causes include: (1) the message is actually not compressed; (2) the compression codec is not gzip.
        Hide
        Jeffrey Damick added a comment -

        the curious part is that the scala producer writes this:

        0000003c0000000474657374000000000000002c000000280101891b70861f8b0800000000000000636060e0626438cd956f959c9f5b0000dce317a70e000000

        And after breaking it down, the message part is:

        1f 8b 08 00 00 00 00 00 00 00 63 60 60 e0 62 64 38 cd 95 6f 95 9c 9f 5b 00 00 dc e3 17 a7 0e 00 00 00

        after ungzip'ing it becomes (gzip cmd line to hexdump):

        00 00 00 0a 01 00 cb 0a 6f 3a 63 6f 6d 70 |........o:comp|

        Did something else in the message format change when it's compressed?

        I see the same result when i decompress it in go in my consumer..

        Show
        Jeffrey Damick added a comment - the curious part is that the scala producer writes this: 0000003c0000000474657374000000000000002c000000280101891b70861f8b0800000000000000636060e0626438cd956f959c9f5b0000dce317a70e000000 And after breaking it down, the message part is: 1f 8b 08 00 00 00 00 00 00 00 63 60 60 e0 62 64 38 cd 95 6f 95 9c 9f 5b 00 00 dc e3 17 a7 0e 00 00 00 after ungzip'ing it becomes (gzip cmd line to hexdump): 00 00 00 0a 01 00 cb 0a 6f 3a 63 6f 6d 70 |........o:comp| Did something else in the message format change when it's compressed? I see the same result when i decompress it in go in my consumer..
        Hide
        Jeffrey Damick added a comment -

        i should add the text i send from the producer was: 'comp'

        Show
        Jeffrey Damick added a comment - i should add the text i send from the producer was: 'comp'
        Hide
        Jun Rao added a comment -

        The go producer and the scala producer should send the same bytes for the same message, right?

        Show
        Jun Rao added a comment - The go producer and the scala producer should send the same bytes for the same message, right?
        Hide
        Jeffrey Damick added a comment - - edited

        not necessarily, the gzip implementation is different. But I'm wondering why when I use gzip (cmd line, not from go) to ungzip that message it has this on the front: (as seen above) 00 00 00 0a 01 00 cb 0a 6f 3a

        This happens to match what i see in go (the extra bytes on the front of 'comp') .. so i must be parsing it wrong somehow...

        Show
        Jeffrey Damick added a comment - - edited not necessarily, the gzip implementation is different. But I'm wondering why when I use gzip (cmd line, not from go) to ungzip that message it has this on the front: (as seen above) 00 00 00 0a 01 00 cb 0a 6f 3a This happens to match what i see in go (the extra bytes on the front of 'comp') .. so i must be parsing it wrong somehow...
        Hide
        Jun Rao added a comment -

        In the broker, we need to unzip compressed messages to verify the crc. Can unzip decoder messages encoded by compress?

        Show
        Jun Rao added a comment - In the broker, we need to unzip compressed messages to verify the crc. Can unzip decoder messages encoded by compress?
        Hide
        Jeffrey Damick added a comment -

        Are you saying you don't use gzip to decompress the messages in the broker? i don't understand the question?

        Show
        Jeffrey Damick added a comment - Are you saying you don't use gzip to decompress the messages in the broker? i don't understand the question?
        Hide
        Jeffrey Damick added a comment -

        I think I figured out my mistake, inside the gzip is yet another message... i misunderstood and thought it was only the payload that compressed. So it's a compressed message with an uncompressed message inside it, is that really what it's supposed to be?

        Show
        Jeffrey Damick added a comment - I think I figured out my mistake, inside the gzip is yet another message... i misunderstood and thought it was only the payload that compressed. So it's a compressed message with an uncompressed message inside it, is that really what it's supposed to be?
        Hide
        Jun Rao added a comment -

        That's right. We take one or more uncompressed messages and compress them using gzip and store the compressed bytes in the payload of a single message. This way, we can recursively iterate into a compressed message. See CompressionUtils.compress for details.

        Show
        Jun Rao added a comment - That's right. We take one or more uncompressed messages and compress them using gzip and store the compressed bytes in the payload of a single message. This way, we can recursively iterate into a compressed message. See CompressionUtils.compress for details.
        Hide
        Jeffrey Damick added a comment -

        thanks, i'll make the appropriate updates. But it seems like this compression flag would be a better fit on the 'message set' and then use that to encapsulate all messages..

        Show
        Jeffrey Damick added a comment - thanks, i'll make the appropriate updates. But it seems like this compression flag would be a better fit on the 'message set' and then use that to encapsulate all messages..
        Hide
        Jun Rao added a comment -

        There is a compression flag on ByteBufferMessageSet, with the following signature:

        def this(compressionCodec: CompressionCodec, messages: Message*)

        Show
        Jun Rao added a comment - There is a compression flag on ByteBufferMessageSet, with the following signature: def this(compressionCodec: CompressionCodec, messages: Message*)
        Hide
        Jeffrey Damick added a comment -

        nevermind, i see.

        Show
        Jeffrey Damick added a comment - nevermind, i see.
        Hide
        Jeffrey Damick added a comment -

        Working patch for dealing with compressed & uncompressed messages. Updated tests & added a pluggable interface for future other payload codecs (compression or other)

        Show
        Jeffrey Damick added a comment - Working patch for dealing with compressed & uncompressed messages. Updated tests & added a pluggable interface for future other payload codecs (compression or other)
        Hide
        Neha Narkhede added a comment -

        Jeffrey, thanks for the patch. It looks good, though I wasn't able to build the go code and run the unit tests. The instructions in the README seem to be outdated ?

        Show
        Neha Narkhede added a comment - Jeffrey, thanks for the patch. It looks good, though I wasn't able to build the go code and run the unit tests. The instructions in the README seem to be outdated ?
        Hide
        Joel Koshy added a comment -

        I get the following - are these files missing from your patch?

        Thanks,

        Joel

        clients/go]$ GOROOT=. make install
        Makefile:1: src/Make.inc: No such file or directory
        Makefile:14: src/Make.pkg: No such file or directory
        make: *** No rule to make target `src/Make.pkg'. Stop.

        Show
        Joel Koshy added a comment - I get the following - are these files missing from your patch? Thanks, Joel clients/go]$ GOROOT=. make install Makefile:1: src/Make.inc: No such file or directory Makefile:14: src/Make.pkg: No such file or directory make: *** No rule to make target `src/Make.pkg'. Stop.
        Hide
        Jeffrey Damick added a comment -

        joel: just a guess but it looks like you goroot isn't set right. It needs to point to the location where you installed go, mine points to /opt/go for example. May want to double check: http://golang.org/doc/install.html#install

        Neha: i'm glad to help, what error did you get?

        Show
        Jeffrey Damick added a comment - joel: just a guess but it looks like you goroot isn't set right. It needs to point to the location where you installed go, mine points to /opt/go for example. May want to double check: http://golang.org/doc/install.html#install Neha: i'm glad to help, what error did you get?
        Hide
        Neha Narkhede added a comment -

        I saw the same error that Joel mentioned. Let me follow the installation for go and see if that helps.

        Show
        Neha Narkhede added a comment - I saw the same error that Joel mentioned. Let me follow the installation for go and see if that helps.
        Hide
        Joel Koshy added a comment -

        Yes - that was the issue. It is probably obvious to go users. Otherwise, it would be good to mention this in the readme.

        +1

        Show
        Joel Koshy added a comment - Yes - that was the issue. It is probably obvious to go users. Otherwise, it would be good to mention this in the readme. +1
        Hide
        Neha Narkhede added a comment -

        +1 on updating the README. Thanks for the patch !

        Show
        Neha Narkhede added a comment - +1 on updating the README. Thanks for the patch !
        Hide
        Jeffrey Damick added a comment -

        who commits the patch then? i would if i could, can i have access to that part of the tree?

        Show
        Jeffrey Damick added a comment - who commits the patch then? i would if i could, can i have access to that part of the tree?
        Hide
        Neha Narkhede added a comment -

        One of the committers can accept the patch. Please can you update the README and upload an updated patch ?

        Show
        Neha Narkhede added a comment - One of the committers can accept the patch. Please can you update the README and upload an updated patch ?
        Hide
        Jeffrey Damick added a comment -

        updated the README, including links to the incubator website & go installation.

        Show
        Jeffrey Damick added a comment - updated the README, including links to the incubator website & go installation.
        Hide
        Neha Narkhede added a comment -

        Thanks for being responsive. Just committed this !

        Show
        Neha Narkhede added a comment - Thanks for being responsive. Just committed this !

          People

          • Assignee:
            Unassigned
            Reporter:
            Jeffrey Damick
          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development