Kafka
  1. Kafka
  2. KAFKA-79

Introduce the compression feature in Kafka

    Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.6
    • Fix Version/s: 0.7
    • Component/s: None
    • Labels:
      None

      Description

      With this feature, we can enable end-to-end block compression in Kafka. The idea is to enable compression on the producer for some or all topics, write the data in compressed format on the server and make the consumers compression aware. The data will be decompressed only on the consumer side. Ideally, there should be a choice of compression codecs to be used by the producer. That means a change to the message header as well as the network byte format. On the consumer side, the state maintenance behavior of the zookeeper consumer changes. For compressed data, the consumed offset will be advanced one compressed message at a time. For uncompressed data, consumed offset will be advanced one message at a time.

        Activity

        Hide
        Chris Burroughs added a comment -
        • Have you done any performance comparisons?
        • Do you think you could produce a diff (link to github is fine) that shows all of the compression changes?
        • My reading of CompressionCodec is that getCompressionCodec would have to be edited to add a new codec (so it would not be user plugable). Do you think thats something we should support (and in that case I guess a separate ticket)
        Show
        Chris Burroughs added a comment - Have you done any performance comparisons? Do you think you could produce a diff (link to github is fine) that shows all of the compression changes? My reading of CompressionCodec is that getCompressionCodec would have to be edited to add a new codec (so it would not be user plugable). Do you think thats something we should support (and in that case I guess a separate ticket)
        Hide
        Jay Kreps added a comment -

        We have some performance comparisons, we should include that information on the performance page at least by the time this is released. Of course our primary concern is interdatacenter bandwidth rather than performance per se. We see a ~30% compression ratio on our Avro tracking data.

        Neha should be able to give a diff. I think it was the last checkin on github before the cutover.

        It is important that decompression always happen with the codec used for compression, so it can't just be the case that there is some property compression.codec=org.apache.kafka.GzipCompressor in the config because a mismatch on producer and consumer would lead to unreadable data, and if two people send messages with different codecs you would be totally screwed. This means the codec used must be maintained with the message set. We do this by having a compression id where 0=none, 1=gzip, etc. This doesn't lend itself to extensability since that list has to be predetermined, but we could reserve a codec id for "user defined" codec and leave it up to the user to configure it right.

        My intuition is that most people just want a good compression implementation included out of the box and don't want to fiddle with it so i think it would be best to get that right. I think even in the long run there are really only 2-3 algorithms that have a reasonable cpu/size compression/decompression tradeoff so it makes sense to just implement and fully test those for perf and correctness and include those in a way that can't break.

        Show
        Jay Kreps added a comment - We have some performance comparisons, we should include that information on the performance page at least by the time this is released. Of course our primary concern is interdatacenter bandwidth rather than performance per se. We see a ~30% compression ratio on our Avro tracking data. Neha should be able to give a diff. I think it was the last checkin on github before the cutover. It is important that decompression always happen with the codec used for compression, so it can't just be the case that there is some property compression.codec=org.apache.kafka.GzipCompressor in the config because a mismatch on producer and consumer would lead to unreadable data, and if two people send messages with different codecs you would be totally screwed. This means the codec used must be maintained with the message set. We do this by having a compression id where 0=none, 1=gzip, etc. This doesn't lend itself to extensability since that list has to be predetermined, but we could reserve a codec id for "user defined" codec and leave it up to the user to configure it right. My intuition is that most people just want a good compression implementation included out of the box and don't want to fiddle with it so i think it would be best to get that right. I think even in the long run there are really only 2-3 algorithms that have a reasonable cpu/size compression/decompression tradeoff so it makes sense to just implement and fully test those for perf and correctness and include those in a way that can't break.
        Hide
        Neha Narkhede added a comment -

        The github diff URL is here - https://github.com/nehanarkhede/kafka/compare/8db50143d7362750225e...4c49c4e2a9490255c35e21f89a6f4545de87cc5a

        This diff url will capture the major changes, though some more minor fixes went in the Apache SVN repo after this.

        I started writing a wiki page for the compression feature here - https://cwiki.apache.org/confluence/display/KAFKA/Compression

        Comments and suggestions are welcome.

        Show
        Neha Narkhede added a comment - The github diff URL is here - https://github.com/nehanarkhede/kafka/compare/8db50143d7362750225e...4c49c4e2a9490255c35e21f89a6f4545de87cc5a This diff url will capture the major changes, though some more minor fixes went in the Apache SVN repo after this. I started writing a wiki page for the compression feature here - https://cwiki.apache.org/confluence/display/KAFKA/Compression Comments and suggestions are welcome.
        Hide
        C. Scott Andreas added a comment -

        This looks like an excellent feature, Neha - thanks for working on it. We push a lot of highly compressible data into Kafka. Trading a bit of CPU for reduced disk and network activity sounds excellent.

        Would you be willing to accept a patch that implements support for http://code.google.com/p/snappy in addition to (or instead of) GZip? When consuming high-data-rate streams, we quickly peg the core on GZip decoding and have switched to Snappy (specifically, this implementation: http://code.google.com/p/snappy-java/) as a result.

        If you have a chance, take a quick look at this JVM de/compressor throughput comparison: https://github.com/ning/jvm-compressor-benchmark/wiki – these results mirror ours pretty closely. On a 36GB dataset of serialized data, we see an 89% compression ratio out of Snappy and 95% out of GZip. At least in our case, the slightly lower compression ratio still left us with a huge win in terms of codec throughput (and reducing the CPU burden on consuming / producing applications).

        – Scott

        Show
        C. Scott Andreas added a comment - This looks like an excellent feature, Neha - thanks for working on it. We push a lot of highly compressible data into Kafka. Trading a bit of CPU for reduced disk and network activity sounds excellent. Would you be willing to accept a patch that implements support for http://code.google.com/p/snappy in addition to (or instead of) GZip? When consuming high-data-rate streams, we quickly peg the core on GZip decoding and have switched to Snappy (specifically, this implementation: http://code.google.com/p/snappy-java/ ) as a result. If you have a chance, take a quick look at this JVM de/compressor throughput comparison: https://github.com/ning/jvm-compressor-benchmark/wiki – these results mirror ours pretty closely. On a 36GB dataset of serialized data, we see an 89% compression ratio out of Snappy and 95% out of GZip. At least in our case, the slightly lower compression ratio still left us with a huge win in terms of codec throughput (and reducing the CPU burden on consuming / producing applications). – Scott
        Hide
        Jay Kreps added a comment -

        Scott, we would gladly take that feature! The compression is pluggable so adding that should not be too hard, we just need another compression id for snappy (gzip=1, so snappy=2). We should still keep gzip, I think it has a place, for example in our usage our biggest bottleneck is inter-datacenter bandwidth, so we will probably stay with gzip I think, plus it has no native depenencies so it is a little better "out-of-the-box" experience.

        One thing we should think through is how this is packaged. Typically we make these things contrib/ packages to avoid adding to the main dependencies. It doesn't appear that the snappy jar has any external dependencies though and it looks like it packages up the .so/.dll files for all platforms in its jar, which is nice. I think it might be better just to add it in the main code, and ensure that none of the snappy classes are loaded unless the user selects snappy as the compression type (this will make the snappy jar optional for people--you only need it if you use it).

        Any other thoughts from people?

        Show
        Jay Kreps added a comment - Scott, we would gladly take that feature! The compression is pluggable so adding that should not be too hard, we just need another compression id for snappy (gzip=1, so snappy=2). We should still keep gzip, I think it has a place, for example in our usage our biggest bottleneck is inter-datacenter bandwidth, so we will probably stay with gzip I think, plus it has no native depenencies so it is a little better "out-of-the-box" experience. One thing we should think through is how this is packaged. Typically we make these things contrib/ packages to avoid adding to the main dependencies. It doesn't appear that the snappy jar has any external dependencies though and it looks like it packages up the .so/.dll files for all platforms in its jar, which is nice. I think it might be better just to add it in the main code, and ensure that none of the snappy classes are loaded unless the user selects snappy as the compression type (this will make the snappy jar optional for people--you only need it if you use it). Any other thoughts from people?
        Hide
        Chris Burroughs added a comment -
        • I think we should have a clear convention for ids. For example: core < 10000, contrib < 20000, HERE-BE-DRAGONS > 20000.
        • I think there is room for gzip, and something else in the LZF/Snappy area in the default kafka install.
        • I'm mildly uncomfortable with native code dependencies, but the Hadoop guys seem to have gotten something working.
        Show
        Chris Burroughs added a comment - I think we should have a clear convention for ids. For example: core < 10000, contrib < 20000, HERE-BE-DRAGONS > 20000. I think there is room for gzip, and something else in the LZF/Snappy area in the default kafka install. I'm mildly uncomfortable with native code dependencies, but the Hadoop guys seem to have gotten something working.
        Hide
        Jun Rao added a comment -

        Chris, which ids are you referring to?

        Show
        Jun Rao added a comment - Chris, which ids are you referring to?
        Hide
        Chris Burroughs added a comment -

        What Jay called "compression ids" (ie 1==gzip).

        Show
        Chris Burroughs added a comment - What Jay called "compression ids" (ie 1==gzip).
        Hide
        Neha Narkhede added a comment -

        Scott,

        Thanks for pointing us to Snappy. I took a brief look at the benchmarks for Snappy, and it does look promising to me. As Jay mentioned, GZIP buys us increased throughput and better utilization of the network bandwidth, due to relatively high compression ratio. Though, its decompression cost, in terms of both TPS and CPU usage is not very low. According to preliminary Kafka compression performance benchmarks, with fetch size of 1MB, the consumer throughput doubled, while consuming a GZIP compressed topic. When the consumer is fully caught up, the CPU usage is ~45%, as compared to ~12% when the same consumer is consuming uncompressed data. On the producer side, for a batch size of 200, message size of 200, the producer throughput for generating compressed data is 1/2 the throughput when producing uncompressed data. That is the cost of compression for GZIP. Though this is tolerable for inter-DC replication, we could do better for more real-time applications that care about TPS more than the compression ratio. I see Snappy fitting well here (http://ning.github.com/jvm-compressor-benchmark/results/canterbury-roundtrip-2011-07-28/index.html).

        The compression ratio that we see (for a producer batch size of 200) is 3x for GZIP on our typical tracking data set. I wonder how low this will be for Snappy. It will be good to check.

        It will be great to see a Snappy integration patch with some Kafka performance benchmarks that measure compression/decompression overhead, compression ratio, effect on producer/consumer throughput.

        • Neha
        Show
        Neha Narkhede added a comment - Scott, Thanks for pointing us to Snappy. I took a brief look at the benchmarks for Snappy, and it does look promising to me. As Jay mentioned, GZIP buys us increased throughput and better utilization of the network bandwidth, due to relatively high compression ratio. Though, its decompression cost, in terms of both TPS and CPU usage is not very low. According to preliminary Kafka compression performance benchmarks, with fetch size of 1MB, the consumer throughput doubled, while consuming a GZIP compressed topic. When the consumer is fully caught up, the CPU usage is ~45%, as compared to ~12% when the same consumer is consuming uncompressed data. On the producer side, for a batch size of 200, message size of 200, the producer throughput for generating compressed data is 1/2 the throughput when producing uncompressed data. That is the cost of compression for GZIP. Though this is tolerable for inter-DC replication, we could do better for more real-time applications that care about TPS more than the compression ratio. I see Snappy fitting well here ( http://ning.github.com/jvm-compressor-benchmark/results/canterbury-roundtrip-2011-07-28/index.html ). The compression ratio that we see (for a producer batch size of 200) is 3x for GZIP on our typical tracking data set. I wonder how low this will be for Snappy. It will be good to check. It will be great to see a Snappy integration patch with some Kafka performance benchmarks that measure compression/decompression overhead, compression ratio, effect on producer/consumer throughput. Neha
        Hide
        C. Scott Andreas added a comment -

        Jay and Neha,

        Fantastic - thanks for taking a look. Your GZip choice makes sense - for inter-DC transport, totally agree with pushing the CPU a little harder to drive down the wire size. Most of our current use is within the same rack, so we've been optimizing for codec throughput. That said, I agree and like the idea of pluggable compressors.

        I'll clone, apply Neha's patch, and drop that in this weekend. I don't expect that it will be too complicated (it's just an InputStream; our local use is just a couple lines).

        (Apologies for the slow reply; I had mistakenly disabled e-mail notifications).

        – Scott

        Show
        C. Scott Andreas added a comment - Jay and Neha, Fantastic - thanks for taking a look. Your GZip choice makes sense - for inter-DC transport, totally agree with pushing the CPU a little harder to drive down the wire size. Most of our current use is within the same rack, so we've been optimizing for codec throughput. That said, I agree and like the idea of pluggable compressors. I'll clone, apply Neha's patch, and drop that in this weekend. I don't expect that it will be too complicated (it's just an InputStream; our local use is just a couple lines). (Apologies for the slow reply; I had mistakenly disabled e-mail notifications). – Scott

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development