Details

    • Type: Sub-task Sub-task
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: producer
    • Labels:
      None
    1. compression-fix.patch
      6 kB
      Jay Kreps
    2. KAFKA-1253_2014-02-21_16:15:21.patch
      21 kB
      Guozhang Wang
    3. KAFKA-1253_2014-02-21_17:55:52.patch
      26 kB
      Guozhang Wang
    4. KAFKA-1253_2014-02-24_13:31:50.patch
      44 kB
      Guozhang Wang
    5. KAFKA-1253_2014-02-26_17:31:30.patch
      47 kB
      Guozhang Wang
    6. KAFKA-1253_2014-03-06_17:48:11.patch
      56 kB
      Guozhang Wang
    7. KAFKA-1253_2014-03-07_16:34:33.patch
      61 kB
      Guozhang Wang
    8. KAFKA-1253_2014-03-10_14:35:56.patch
      67 kB
      Guozhang Wang
    9. KAFKA-1253_2014-03-10_14:39:58.patch
      68 kB
      Guozhang Wang
    10. KAFKA-1253_2014-03-10_15:27:47.patch
      68 kB
      Guozhang Wang
    11. KAFKA-1253_2014-03-14_13:46:40.patch
      65 kB
      Guozhang Wang
    12. KAFKA-1253_2014-03-14_17:39:53.patch
      391 kB
      Guozhang Wang
    13. KAFKA-1253_2014-03-17_15:56:04.patch
      69 kB
      Guozhang Wang
    14. KAFKA-1253_2014-03-18_17:10:10.patch
      70 kB
      Guozhang Wang
    15. KAFKA-1253_2014-03-19_16:31:39.patch
      72 kB
      Guozhang Wang
    16. KAFKA-1253_2014-03-22_17:53:44.patch
      70 kB
      Guozhang Wang
    17. KAFKA-1253_2014-03-25_13:47:54.patch
      71 kB
      Guozhang Wang
    18. KAFKA-1253_2014-03-26_10:59:00.patch
      71 kB
      Guozhang Wang
    19. KAFKA-1253.patch
      7 kB
      Guozhang Wang

      Activity

      Jay Kreps created issue -
      Jay Kreps made changes -
      Field Original Value New Value
      Component/s producer [ 12320325 ]
      Hide
      Guozhang Wang added a comment -

      It seems incremental compression would not be possible with nio.ByteBuffer, so we cannot really do that in RecordAccumulator's append call. So Instead I am proposing the following:

      1. Make compression type global to the producer client by adding COMPRESSION_TYPE_CONFIG to ProducerConfig. Then all the messages ever sent by this client would use the same compression type.

      2. In Sender.collate function, compress the messages in buffer when creating the InFlightRequest. By doing this the buffer pool would always contain non-compressed data.

      Thoughts?

      Show
      Guozhang Wang added a comment - It seems incremental compression would not be possible with nio.ByteBuffer, so we cannot really do that in RecordAccumulator's append call. So Instead I am proposing the following: 1. Make compression type global to the producer client by adding COMPRESSION_TYPE_CONFIG to ProducerConfig. Then all the messages ever sent by this client would use the same compression type. 2. In Sender.collate function, compress the messages in buffer when creating the InFlightRequest. By doing this the buffer pool would always contain non-compressed data. Thoughts?
      Hide
      Jay Kreps added a comment -

      This will be tricky but is possible.

      Here are a couple pointers:
      1. ByteBuffer.array will give the backing array for bytebuffers so we can work with apis that only accept arrays
      2. GZIPOutputStream requires a stream. Two options:
      a. Make an OutputStream implementation based on ByteBuffer. ByteArrayOutputStream would work but it will be tricky because you would have to do new ByteArrayOutputStream(size) then use the toByteArray() method to get the backing array and use ByteBuffer.wrap() on that array to create the ByteBuffer.
      b. Directly use the Deflate compression code java provides and what ByteArrayOutputStream uses under the covers. This is a better api but there are some subtly differences between GZIPOutputStream hacks around to get and we would have to do similar hacking.
      3. There are two snappy libraries: we currently use the JNI wrapper for the google native code, but there is also a pure java impl. Ideally either way snappy should not be a runtime dependency unless you enable snappy compression. This will mean not instantiating the classes in the snappy jar unless they are needed.
      4. The desired end result here is that our performance on compressed messages is comparable to the underlying compression codec and not artificially limited by lots and lots of byte copying (e.g. see http://grokbase.com/t/kafka/users/1383bcfkym/compression-performance). For example snappy claims performance on the order of hundreds of mb/sec. So it would be good to make a stand-alone main method that runs the message compression to create compressed messaged and benchmark the performance as well as look at it in hprof to ensure the time is actually going to compression. This performance will be particularly important on the server side where we need to both decompress and recompress and where compression is a big bottleneck.

      Show
      Jay Kreps added a comment - This will be tricky but is possible. Here are a couple pointers: 1. ByteBuffer.array will give the backing array for bytebuffers so we can work with apis that only accept arrays 2. GZIPOutputStream requires a stream. Two options: a. Make an OutputStream implementation based on ByteBuffer. ByteArrayOutputStream would work but it will be tricky because you would have to do new ByteArrayOutputStream(size) then use the toByteArray() method to get the backing array and use ByteBuffer.wrap() on that array to create the ByteBuffer. b. Directly use the Deflate compression code java provides and what ByteArrayOutputStream uses under the covers. This is a better api but there are some subtly differences between GZIPOutputStream hacks around to get and we would have to do similar hacking. 3. There are two snappy libraries: we currently use the JNI wrapper for the google native code, but there is also a pure java impl. Ideally either way snappy should not be a runtime dependency unless you enable snappy compression. This will mean not instantiating the classes in the snappy jar unless they are needed. 4. The desired end result here is that our performance on compressed messages is comparable to the underlying compression codec and not artificially limited by lots and lots of byte copying (e.g. see http://grokbase.com/t/kafka/users/1383bcfkym/compression-performance ). For example snappy claims performance on the order of hundreds of mb/sec. So it would be good to make a stand-alone main method that runs the message compression to create compressed messaged and benchmark the performance as well as look at it in hprof to ensure the time is actually going to compression. This performance will be particularly important on the server side where we need to both decompress and recompress and where compression is a big bottleneck.
      Hide
      Jay Kreps added a comment -

      Also the high-level idea I had for implementation was to instantiate a compressor instance with each empty InMemoryRecords we create. A compressor would be something like
      public interface Compressor

      { public void write(byte[] bytes, int offset, byte[] key, byte[] value); }

      For the non-compressed case we can either have a no-op compressor or just have special case logic.

      This compressor would be initialized lazily and stored with the InMemoryRecords instance and would incapsulate the GZIP or snappy compression dictionary (e.g. the Defalter instance or whatever).

      The previous compression code was quite complex so really simplifying that logic will be one of the core challenges. One thing we can do is avoid supporting arbitrarily nested messages. The scala code currently allows any amount of recursive message nesting. This is a bit complex and not really needed.

      Also feel free to change any of this stuff as it definitely isn't fully thought out.

      Show
      Jay Kreps added a comment - Also the high-level idea I had for implementation was to instantiate a compressor instance with each empty InMemoryRecords we create. A compressor would be something like public interface Compressor { public void write(byte[] bytes, int offset, byte[] key, byte[] value); } For the non-compressed case we can either have a no-op compressor or just have special case logic. This compressor would be initialized lazily and stored with the InMemoryRecords instance and would incapsulate the GZIP or snappy compression dictionary (e.g. the Defalter instance or whatever). The previous compression code was quite complex so really simplifying that logic will be one of the core challenges. One thing we can do is avoid supporting arbitrarily nested messages. The scala code currently allows any amount of recursive message nesting. This is a bit complex and not really needed. Also feel free to change any of this stuff as it definitely isn't fully thought out.
      Guozhang Wang made changes -
      Attachment KAFKA-1253.patch [ 12629984 ]
      Hide
      Guozhang Wang added a comment -

      Created reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Created reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Hide
      Guozhang Wang added a comment -

      Uploaded a first-step patch which illustrate the idea. Could anyone give some comments about the proposal before I go further?

      Show
      Guozhang Wang added a comment - Uploaded a first-step patch which illustrate the idea. Could anyone give some comments about the proposal before I go further?
      Hide
      Guozhang Wang added a comment -

      One note is that in this preliminary patch I follow the existing consumer behavior, which decompress the whole message set first and put them into a non-compressed buffer, then iterate one-by-one; but it is possible to also have in-place decompression here, which can save one copy.

      Show
      Guozhang Wang added a comment - One note is that in this preliminary patch I follow the existing consumer behavior, which decompress the whole message set first and put them into a non-compressed buffer, then iterate one-by-one; but it is possible to also have in-place decompression here, which can save one copy.
      Hide
      Jay Kreps added a comment -

      I think this approach may work but we have to work out a few issues to know.

      I like that the compressed message set uses a different impl than the non-compressed but I wonder if this will work? On the producer side data is either compressed or not so I think this works great. But on the consumer side you may get a mixture of compressed and non-compressed records and you won't know ahead of time so I'm not sure if you can choose impls.

      We also need to avoid the double copying of data. We should ideally find a way to refactor to not duplicate code too. But regardless double allocating all our memory and double writing is a non-starter.

      Show
      Jay Kreps added a comment - I think this approach may work but we have to work out a few issues to know. I like that the compressed message set uses a different impl than the non-compressed but I wonder if this will work? On the producer side data is either compressed or not so I think this works great. But on the consumer side you may get a mixture of compressed and non-compressed records and you won't know ahead of time so I'm not sure if you can choose impls. We also need to avoid the double copying of data. We should ideally find a way to refactor to not duplicate code too. But regardless double allocating all our memory and double writing is a non-starter.
      Hide
      Guozhang Wang added a comment -

      Thanks for the comments, I think I should elaborate a little bit more on the proposed approach:

      1. On the producer end, with compression each MemoryRecords will be write to channel as a single compressed message.

      2. The iterator of the MemoryRecords, when encounter a compressed message, will create a second-level CompressedMemoryRecords.iterator to iterate them if shallow == false. This also partially enforce to not use nested compression.

      3. As I said in the previous comment, we could do in-place decompression just like in-place compression to avoid double-copy. It is just today we do the double-copy and I was just following that mechanism.

      About double-allocation, I will try to refactor the code a bit to avoid that.

      Show
      Guozhang Wang added a comment - Thanks for the comments, I think I should elaborate a little bit more on the proposed approach: 1. On the producer end, with compression each MemoryRecords will be write to channel as a single compressed message. 2. The iterator of the MemoryRecords, when encounter a compressed message, will create a second-level CompressedMemoryRecords.iterator to iterate them if shallow == false. This also partially enforce to not use nested compression. 3. As I said in the previous comment, we could do in-place decompression just like in-place compression to avoid double-copy. It is just today we do the double-copy and I was just following that mechanism. About double-allocation, I will try to refactor the code a bit to avoid that.
      Hide
      Jay Kreps added a comment -

      Hey Guozhang I think this patch fixes a bug in that crc check that you were seeing. It also adds test coverage for this case in the MemoryRecordsTest which uses a non-zero array offset (since the records are backed by the shared byte buffer).

      Show
      Jay Kreps added a comment - Hey Guozhang I think this patch fixes a bug in that crc check that you were seeing. It also adds test coverage for this case in the MemoryRecordsTest which uses a non-zero array offset (since the records are backed by the shared byte buffer).
      Jay Kreps made changes -
      Attachment compression-fix.patch [ 12630395 ]
      Hide
      Guozhang Wang added a comment -

      Thanks Jay, will incorporate into the patch.

      Show
      Guozhang Wang added a comment - Thanks Jay, will incorporate into the patch.
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-02-21_16:15:21.patch [ 12630422 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-02-21_17:55:52.patch [ 12630441 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-02-24_13:31:50.patch [ 12630805 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-02-26_17:31:30.patch [ 12631423 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Guozhang Wang made changes -
      Assignee Guozhang Wang [ guozhang ]
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-03-06_17:48:11.patch [ 12633286 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-03-07_16:34:33.patch [ 12633498 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-03-10_14:35:56.patch [ 12633782 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-03-10_14:39:58.patch [ 12633784 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Hide
      Guozhang Wang added a comment - - edited

      Some interesting performance numbers (1000 messages, 1K message size, ack=1):

      new-producer-performance old-producer-performance-new-producer old-producer-performance-old-producer

      none 4329.00 3246.7532 2183.4061
      gzip 3134.80 1620.7455 1754.3860
      snappy 3731.34 2247.1910 1890.3592

      The difference between the old and new producer performance is that for the old producer performance, each message creates a new byte array, while for the new one the same byte array is reused for all messages.

      Show
      Guozhang Wang added a comment - - edited Some interesting performance numbers (1000 messages, 1K message size, ack=1): new-producer-performance old-producer-performance-new-producer old-producer-performance-old-producer none 4329.00 3246.7532 2183.4061 gzip 3134.80 1620.7455 1754.3860 snappy 3731.34 2247.1910 1890.3592 The difference between the old and new producer performance is that for the old producer performance, each message creates a new byte array, while for the new one the same byte array is reused for all messages.
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-03-10_15:27:47.patch [ 12633798 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Hide
      Guozhang Wang added a comment -

      With the new code, new performance numbers (1m messages, 1K message size, ack=1):

      Two cases: totally random bits in the message (R), all 1's in the message (A). Note that with the old producer the block size for GZIP is 512 bytes, while for new producer GZIP is 16KB. According to my test this seems not affect the compression rate, but may affect the compression performance.

      linger-time 500ms:
      -----------------------------------

      R-NewProducer-NONE: 94215.1875
      R-NewProducer-GZIP: 8750.2844
      R-NewProducer-SNAPPY: 38133.0079
      R-OldProducer-NONE: 64226.0758
      R-OldProducer-GZIP: 6073.1204
      R-OldProducer-SNAPPY: 28178.5392

      A-NewProducer-NONE: 64754.2576
      A-NewProducer-GZIP: 34665.6498
      A-NewProducer-SNAPPY: 155062.8004
      A-OldProducer-NONE: 59948.4443
      A-OldProducer-GZIP: 13014.3939
      A-OldProducer-SNAPPY: 45945.3251
      -----------------------------------

      linger-time 0ms (this only affect new producer):
      -----------------------------------

      R-NewProducer-NONE: 84459.4595
      R-NewProducer-GZIP: 8008.2645
      R-NewProducer-SNAPPY: 37605.2948
      R-OldProducer-NONE: 59084.1950
      R-OldProducer-GZIP: 6128.0142
      R-OldProducer-SNAPPY: 27643.4001

      A-NewProducer-NONE: 68189.5670
      A-NewProducer-GZIP: 34461.3688
      A-NewProducer-SNAPPY: 116022.7405
      A-OldProducer-NONE: 61214.4956
      A-OldProducer-GZIP: 13536.7455
      A-OldProducer-SNAPPY: 44529.5454

      Show
      Guozhang Wang added a comment - With the new code, new performance numbers (1m messages, 1K message size, ack=1): Two cases: totally random bits in the message (R), all 1's in the message (A). Note that with the old producer the block size for GZIP is 512 bytes, while for new producer GZIP is 16KB. According to my test this seems not affect the compression rate, but may affect the compression performance. linger-time 500ms: ----------------------------------- R-NewProducer-NONE: 94215.1875 R-NewProducer-GZIP: 8750.2844 R-NewProducer-SNAPPY: 38133.0079 R-OldProducer-NONE: 64226.0758 R-OldProducer-GZIP: 6073.1204 R-OldProducer-SNAPPY: 28178.5392 A-NewProducer-NONE: 64754.2576 A-NewProducer-GZIP: 34665.6498 A-NewProducer-SNAPPY: 155062.8004 A-OldProducer-NONE: 59948.4443 A-OldProducer-GZIP: 13014.3939 A-OldProducer-SNAPPY: 45945.3251 ----------------------------------- linger-time 0ms (this only affect new producer): ----------------------------------- R-NewProducer-NONE: 84459.4595 R-NewProducer-GZIP: 8008.2645 R-NewProducer-SNAPPY: 37605.2948 R-OldProducer-NONE: 59084.1950 R-OldProducer-GZIP: 6128.0142 R-OldProducer-SNAPPY: 27643.4001 A-NewProducer-NONE: 68189.5670 A-NewProducer-GZIP: 34461.3688 A-NewProducer-SNAPPY: 116022.7405 A-OldProducer-NONE: 61214.4956 A-OldProducer-GZIP: 13536.7455 A-OldProducer-SNAPPY: 44529.5454
      Hide
      Neha Narkhede added a comment -

      Guozhang Wang What metric does the above test measure? messages/sec?

      Show
      Neha Narkhede added a comment - Guozhang Wang What metric does the above test measure? messages/sec?
      Hide
      Guozhang Wang added a comment -

      Yes. And I dig a little bit deeper for the reallocation effect, for linger-time = 0 no reallocation is triggered at all since records are wrapped in each iteration of the Sender thread and is far from getting full; for linger-time = 500 ms both gzip and snappy have about 5 reallocations per 10K messages, so reallocation/data-copy does not play an important role in the difference of GZIP and SNAPPY.

      Changing the GZIP block size back to 512 and re-do the experiments, and here is what we get:

      R-linger-500ms: 8842.9058
      R-linger-0ms: 8582.4386
      A-linger-500ms: 36333.2486
      A-linger-0ms: 34940.6010

      From the results it seems the block size does not affect the performance of GZIP either.

      Show
      Guozhang Wang added a comment - Yes. And I dig a little bit deeper for the reallocation effect, for linger-time = 0 no reallocation is triggered at all since records are wrapped in each iteration of the Sender thread and is far from getting full; for linger-time = 500 ms both gzip and snappy have about 5 reallocations per 10K messages, so reallocation/data-copy does not play an important role in the difference of GZIP and SNAPPY. Changing the GZIP block size back to 512 and re-do the experiments, and here is what we get: R-linger-500ms: 8842.9058 R-linger-0ms: 8582.4386 A-linger-500ms: 36333.2486 A-linger-0ms: 34940.6010 From the results it seems the block size does not affect the performance of GZIP either.
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-03-14_13:46:40.patch [ 12634836 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Hide
      Jay Kreps added a comment -

      What is the performance difference with and without this patch in the new producer with no compression?

      Show
      Jay Kreps added a comment - What is the performance difference with and without this patch in the new producer with no compression?
      Hide
      Guozhang Wang added a comment -

      message/sec from trunk: 68390.0971

      Show
      Guozhang Wang added a comment - message/sec from trunk: 68390.0971
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-03-14_17:39:53.patch [ 12634878 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Hide
      Jay Kreps added a comment -

      As a reference the unix gzip command on random data seems to be about 28MB/sec and java's gzip seems to be comparable.

      Show
      Jay Kreps added a comment - As a reference the unix gzip command on random data seems to be about 28MB/sec and java's gzip seems to be comparable.
      Hide
      Jay Kreps added a comment - - edited

      Found this formatting helpful:

      Linger = 500ms, All 1s                    
      Comp           New          Old         %     MB/sec
      None      64,754.0     59,948.0       8.0     63.2
      GZIP      34,665.0     13,014.0     166.4     33.9
      Snappy   155,062.0     45,945.0     237.5     151.4
                          
      Linger = 500ms, Random                    
      Comp          New          Old        %     MB/sec
      None     94,215.0     64,226.0     46.7     92.0
      GZIP      8,750.0      6,073.0     44.1      8.5
      Snappy   38,133.0     28,178.0     35.3     37.2
                          
      Linger = 0 ms, All 1s                    
      Comp     New               Old         %    MB/sec
      None     68,189.0     61,214.0      11.4      66.6
      GZIP     34,461.0     13,536.0     154.6      33.7
      Snappy  116,022.0     44,529.0     160.6     113.3
                          
      Linger = 0 ms, Random                    
      Comp         New          Old        %     MB/sec
      None    84,459.0     59,084.0     42.9     82.5
      GZIP     8,008.0      6,128.0     30.7      7.8
      Snappy  37,605.0     27,643.0     36.0     36.7
      
      
      Show
      Jay Kreps added a comment - - edited Found this formatting helpful: Linger = 500ms, All 1s Comp New Old % MB/sec None 64,754.0 59,948.0 8.0 63.2 GZIP 34,665.0 13,014.0 166.4 33.9 Snappy 155,062.0 45,945.0 237.5 151.4 Linger = 500ms, Random Comp New Old % MB/sec None 94,215.0 64,226.0 46.7 92.0 GZIP 8,750.0 6,073.0 44.1 8.5 Snappy 38,133.0 28,178.0 35.3 37.2 Linger = 0 ms, All 1s Comp New Old % MB/sec None 68,189.0 61,214.0 11.4 66.6 GZIP 34,461.0 13,536.0 154.6 33.7 Snappy 116,022.0 44,529.0 160.6 113.3 Linger = 0 ms, Random Comp New Old % MB/sec None 84,459.0 59,084.0 42.9 82.5 GZIP 8,008.0 6,128.0 30.7 7.8 Snappy 37,605.0 27,643.0 36.0 36.7
      Hide
      Jay Kreps added a comment -

      Also, in case helpful, here was my java code to test gzip performance:

      package org.apache.kafka;
      
      import java.io.BufferedInputStream;
      import java.io.FileInputStream;
      import java.io.FileOutputStream;
      import java.util.zip.GZIPOutputStream;
      
      public class Gzip {
          public static void main(String[] args) throws Exception {
              GZIPOutputStream output = new GZIPOutputStream(new FileOutputStream("/dev/null"));
              BufferedInputStream input = new BufferedInputStream(new FileInputStream(args[0]));
              byte[] bytes = new byte[64 * 1024];
              while (true) {
                  int read = input.read(bytes);
                  if (read < 0)
                      return;
                  output.write(bytes);
              }
          }
      }
      
      Show
      Jay Kreps added a comment - Also, in case helpful, here was my java code to test gzip performance: package org.apache.kafka; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.FileOutputStream; import java.util.zip.GZIPOutputStream; public class Gzip { public static void main(String[] args) throws Exception { GZIPOutputStream output = new GZIPOutputStream(new FileOutputStream("/dev/null")); BufferedInputStream input = new BufferedInputStream(new FileInputStream(args[0])); byte[] bytes = new byte[64 * 1024]; while (true) { int read = input.read(bytes); if (read < 0) return; output.write(bytes); } } }
      Hide
      Guozhang Wang added a comment -

      Thanks a lot Jay, I think these numbers tell why gzip is quite slow compared with snappy.

      At the mean time I would also do some test on the effectiveness of the heuristics in reducing reallocation probabilities.

      Show
      Guozhang Wang added a comment - Thanks a lot Jay, I think these numbers tell why gzip is quite slow compared with snappy. At the mean time I would also do some test on the effectiveness of the heuristics in reducing reallocation probabilities.
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-03-17_15:56:04.patch [ 12635188 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Hide
      Guozhang Wang added a comment -

      Here is the experimental results for various heuristics to reduce the chance of reallocation. First the heuristics we compared:

      0. Estimate compressed data written based on un-compressed data appended so far * compression rate; another estimate based on current compressed data written to underlying buffer + the compressor block size * compression rate. Take the smaller value.

      1. Estimate based on current compressed data written to underlying buffer + the compressor block size * compression rate.

      2. Estimate compressed data written based on un-compressed data appended so far * compression rate.

      3. Estimated compressed data just as un-compressed data appended so far (i.e. assuming a compression rate of 1)

      4. Estimated compressed data just as the data written in the underlying compressed buffer.

      The first experiments are done with 10K random bit messages (which will cause a compression rate near 1), batch size = 16K, recorded #.reallocations. Note that each message append will at most trigger one reallocation:

      GZIP:

      message size 1K 10K 100K
      heuristic0 0 0 10K
      heuristic1 0 0 10K
      heuristic2 0 0 10K
      heuristic3 0 0 10K
      heuristic4 4 4999 10K

      SNAPPY:

      message size 1K 10K 100K
      heuristic0 0 0 10K
      heuristic1 0 0 10K
      heuristic2 0 0 10K
      heuristic3 0 0 10K
      heuristic4 1 4993 10K
      Show
      Guozhang Wang added a comment - Here is the experimental results for various heuristics to reduce the chance of reallocation. First the heuristics we compared: 0. Estimate compressed data written based on un-compressed data appended so far * compression rate; another estimate based on current compressed data written to underlying buffer + the compressor block size * compression rate. Take the smaller value. 1. Estimate based on current compressed data written to underlying buffer + the compressor block size * compression rate. 2. Estimate compressed data written based on un-compressed data appended so far * compression rate. 3. Estimated compressed data just as un-compressed data appended so far (i.e. assuming a compression rate of 1) 4. Estimated compressed data just as the data written in the underlying compressed buffer. The first experiments are done with 10K random bit messages (which will cause a compression rate near 1), batch size = 16K, recorded #.reallocations. Note that each message append will at most trigger one reallocation: GZIP: message size 1K 10K 100K heuristic0 0 0 10K heuristic1 0 0 10K heuristic2 0 0 10K heuristic3 0 0 10K heuristic4 4 4999 10K SNAPPY: message size 1K 10K 100K heuristic0 0 0 10K heuristic1 0 0 10K heuristic2 0 0 10K heuristic3 0 0 10K heuristic4 1 4993 10K
      Hide
      Jay Kreps added a comment -

      What is the memory usage efficiency for each approach. That is if we allocate batches of size B, what percent of B is actually used?

      Show
      Jay Kreps added a comment - What is the memory usage efficiency for each approach. That is if we allocate batches of size B, what percent of B is actually used?
      Hide
      Guozhang Wang added a comment - - edited

      The second experiments are done with 10K messages half of which are random bits and the other half all 1's, batch size = 16K, linger time 5 seconds, recorded #.reallocations.

      GZIP:

      message size 1K 10K 100K
      heuristic0 312 0 0
      heuristic1 0 0 0
      heuristic2 0 0 0
      heuristic3 0 0 0
      heuristic4 312 2500 0

      SNAPPY:

      message size 1K 10K 100K
      heuristic0 1 0 0
      heuristic1 1 0 0
      heuristic2 1 0 0
      heuristic3 0 0 0
      heuristic4 312 0 0

      Memory usage:

      GZIP:

      message size 1K 10K 100K
      heuristic0 0.5187830764937806 0.6361027327647318 0.5027070816956634
      heuristic1 0.9398563032097622 0.6360584137828137 0.5027141148860828
      heuristic2 0.9397809434478939 0.6361234861388692 0.5026772336472551
      heuristic3 0.48929535651283906 0.32104673640806436 0.5027114615581034
      heuristic4 0.5188796489295215 0.6345498820519015 0.5027165618824355

      SNAPPY:

      message size 1K 10K 100K
      heuristic0 0.9534642084279842 0.6618060085311569 0.5244664440669362
      heuristic1 0.9532791947655151 0.6622619777918128 0.5244664440669362
      heuristic2 0.9533003355592538 0.6618328095700078 0.5244664440669362
      heuristic3 0.511382939911297 0.33245849609718725 0.5244664440669362
      heuristic4 0.5445814384269451 0.6618052651191988 0.5244664440669362
      Show
      Guozhang Wang added a comment - - edited The second experiments are done with 10K messages half of which are random bits and the other half all 1's, batch size = 16K, linger time 5 seconds, recorded #.reallocations. GZIP: message size 1K 10K 100K heuristic0 312 0 0 heuristic1 0 0 0 heuristic2 0 0 0 heuristic3 0 0 0 heuristic4 312 2500 0 SNAPPY: message size 1K 10K 100K heuristic0 1 0 0 heuristic1 1 0 0 heuristic2 1 0 0 heuristic3 0 0 0 heuristic4 312 0 0 Memory usage: GZIP: message size 1K 10K 100K heuristic0 0.5187830764937806 0.6361027327647318 0.5027070816956634 heuristic1 0.9398563032097622 0.6360584137828137 0.5027141148860828 heuristic2 0.9397809434478939 0.6361234861388692 0.5026772336472551 heuristic3 0.48929535651283906 0.32104673640806436 0.5027114615581034 heuristic4 0.5188796489295215 0.6345498820519015 0.5027165618824355 SNAPPY: message size 1K 10K 100K heuristic0 0.9534642084279842 0.6618060085311569 0.5244664440669362 heuristic1 0.9532791947655151 0.6622619777918128 0.5244664440669362 heuristic2 0.9533003355592538 0.6618328095700078 0.5244664440669362 heuristic3 0.511382939911297 0.33245849609718725 0.5244664440669362 heuristic4 0.5445814384269451 0.6618052651191988 0.5244664440669362
      Guozhang Wang made changes -
      Comment [ So it seems except the baseline heuristics #4, all others are doing pretty well in reducing possibility of reallocation.

      In terms of memory efficiency, I re-run the second experiment and the memory usage (= #.bytes written / #.bytes as buffer capacity) is the following:

      The first experiments are done with 10K messages half of which are random bits and the other half all 1's, batch size = 16K, recorded #.reallocations, linger time 1 second.

      GZIP:

      || message size || 1K || 10K || 100K ||
      |heuristic0|0.16393931370995588|0.6360426060498783|0.5027185390566682|
      |heuristic1|0.08041873550981989|0.6361295505149862|0.5026977360002329|
      |heuristic2|0.037097550186846885|0.6360425366609646|0.5027069612634141|
      |heuristic3|0.07121086649677909|0.321025252352334|0.5027315390995597|
      |heuristic4|0.07140996394426881|0.634673472157968|0.5027017302210279|

      SNAPPY:

      || message size || 1K || 10K || 100K ||
      |heuristic0|0.6140103928071738|0.6618065853371036|0.5244664440669362|
      |heuristic1|0.3794715113367395|0.6618042001500875|0.524466444066936|
      |heuristic2|0.44042297446570144|0.661811600542026|0.524466444066936|
      |heuristic3|0.5112405714891525|0.33291628770530224|0.5244664440669362|
      |heuristic4|0.4887840474881868|0.6618042070775481|0.5244664440669362|

      ]
      Hide
      Guozhang Wang added a comment -

      According to these numbers, I think using just heuristic1 will be good enough.

      Show
      Guozhang Wang added a comment - According to these numbers, I think using just heuristic1 will be good enough.
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-03-18_17:10:10.patch [ 12635441 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-03-19_16:31:39.patch [ 12635680 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-03-22_17:53:44.patch [ 12636235 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Hide
      Guozhang Wang added a comment - - edited

      The comparison of the latest patch with trunk on no compression, 1M messages with num.acks = 1 (I ran each experiment three times, and take the average):

      message size (bytes) 100 200 400 800 1600
      trunk (msg. per sec) 347584.2892 260620.2763 180570.6031 84274.3974 40049.6616
      KAFKA-1253 (msg. per sec) 353982.3009 266951.4148 176429.0755 89549.5657 40125.1906
      Show
      Guozhang Wang added a comment - - edited The comparison of the latest patch with trunk on no compression, 1M messages with num.acks = 1 (I ran each experiment three times, and take the average): message size (bytes) 100 200 400 800 1600 trunk (msg. per sec) 347584.2892 260620.2763 180570.6031 84274.3974 40049.6616 KAFKA-1253 (msg. per sec) 353982.3009 266951.4148 176429.0755 89549.5657 40125.1906
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-03-25_13:47:54.patch [ 12636775 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Hide
      Guozhang Wang added a comment -

      Just realized the old producer performance has some overhead that will limit the perf numbers, re-run the same experiments with the new producer performance as follows (to accommodate high variance we choose max instead of average over 5 runs):

      message size (bytes) 100 200 400 800
      trunk (mb/sec) 70.04 109.76 170.24 203.30
      KAFKA-1253 (mb/sec) 78.65 119.70 170.68 202.35
      Show
      Guozhang Wang added a comment - Just realized the old producer performance has some overhead that will limit the perf numbers, re-run the same experiments with the new producer performance as follows (to accommodate high variance we choose max instead of average over 5 runs): message size (bytes) 100 200 400 800 trunk (mb/sec) 70.04 109.76 170.24 203.30 KAFKA-1253 (mb/sec) 78.65 119.70 170.68 202.35
      Guozhang Wang made changes -
      Attachment KAFKA-1253_2014-03-26_10:59:00.patch [ 12636961 ]
      Hide
      Guozhang Wang added a comment -

      Updated reviewboard https://reviews.apache.org/r/18299/
      against branch origin/trunk

      Show
      Guozhang Wang added a comment - Updated reviewboard https://reviews.apache.org/r/18299/ against branch origin/trunk
      Hide
      Neha Narkhede added a comment -

      Thanks for the patches and your patience Guozhang Wang! Committed to trunk

      Show
      Neha Narkhede added a comment - Thanks for the patches and your patience Guozhang Wang ! Committed to trunk
      Neha Narkhede made changes -
      Status Open [ 1 ] Resolved [ 5 ]
      Resolution Fixed [ 1 ]

        People

        • Assignee:
          Guozhang Wang
          Reporter:
          Jay Kreps
        • Votes:
          0 Vote for this issue
          Watchers:
          3 Start watching this issue

          Dates

          • Created:
            Updated:
            Resolved:

            Development