Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1374

LogCleaner (compaction) does not support compressed topics

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.0.0
    • Component/s: None
    • Labels:

      Description

      This is a known issue, but opening a ticket to track.

      If you try to compact a topic that has compressed messages you will run into
      various exceptions - typically because during iteration we advance the
      position based on the decompressed size of the message. I have a bunch of
      stack traces, but it should be straightforward to reproduce.

      1. KAFKA-1374_2015-05-19_17:20:44.patch
        40 kB
        Joel Koshy
      2. KAFKA-1374.patch
        20 kB
        Joel Koshy
      3. KAFKA-1374_2015-05-18_22:55:48.patch
        20 kB
        Manikumar
      4. KAFKA-1374_2015-01-18_00:19:21.patch
        18 kB
        Manikumar
      5. KAFKA-1374_2014-10-03_19:17:17.patch
        17 kB
        Manikumar
      6. KAFKA-1374_2014-10-03_18:49:16.patch
        17 kB
        Manikumar
      7. KAFKA-1374_2014-09-23_21:47:12.patch
        17 kB
        Manikumar
      8. KAFKA-1374_2014-08-12_22:23:06.patch
        13 kB
        Manikumar
      9. KAFKA-1374_2014-08-09_16:18:55.patch
        12 kB
        Manikumar
      10. KAFKA-1374.patch
        10 kB
        Manikumar

        Issue Links

          Activity

          Hide
          jjkoshy Joel Koshy added a comment -

          I had started on this a while ago, but did not finish. Here's a WIP patch
          that doesn't quite work yet:
          https://gist.github.com/jjkoshy/4657a44e52e3f88be1c1

          Another nuance with compression and compaction is what compression-codec do
          we use when writing out the compacted data?

          We could adopt a broker-side compression config that can be overridden on a
          per-topic basis. This would not only enable a consistent compression codec
          for each topic, but it will also make the above decision more
          straightforward - i.e., write out compacted messages in the configured
          compression codec for that topic (or broker-default if the topic does not
          have any override).

          Will file a separate jira for the above.

          Show
          jjkoshy Joel Koshy added a comment - I had started on this a while ago, but did not finish. Here's a WIP patch that doesn't quite work yet: https://gist.github.com/jjkoshy/4657a44e52e3f88be1c1 Another nuance with compression and compaction is what compression-codec do we use when writing out the compacted data? We could adopt a broker-side compression config that can be overridden on a per-topic basis. This would not only enable a consistent compression codec for each topic, but it will also make the above decision more straightforward - i.e., write out compacted messages in the configured compression codec for that topic (or broker-default if the topic does not have any override). Will file a separate jira for the above.
          Hide
          omkreddy Manikumar added a comment - - edited

          I am trying to look in to the issue and WIP Patch.

          In WIP patch, the following code is used to traverse segment offsets .

              var currOffset = segment.baseOffset
               while (currOffset < segment.index.lastOffset) {
                  currOffset = entry.nextOffset
               }
          

          As per my observation, segment.index.lastOffset is not giving the last offset of a given segment.
          I have a segment with startingOffset=0 and lastOffset=7140. I am getting segment.index.lastOffset=7118.
          This is creating some issue in the code.

          Any idea on why segment.index.lastOffset is not returning proper lastOffset.?

          Show
          omkreddy Manikumar added a comment - - edited I am trying to look in to the issue and WIP Patch. In WIP patch, the following code is used to traverse segment offsets . var currOffset = segment.baseOffset while (currOffset < segment.index.lastOffset) { currOffset = entry.nextOffset } As per my observation, segment.index.lastOffset is not giving the last offset of a given segment. I have a segment with startingOffset=0 and lastOffset=7140. I am getting segment.index.lastOffset=7118. This is creating some issue in the code. Any idea on why segment.index.lastOffset is not returning proper lastOffset.?
          Hide
          omkreddy Manikumar added a comment -

          Created reviewboard https://reviews.apache.org/r/24214/diff/
          against branch origin/trunk

          Show
          omkreddy Manikumar added a comment - Created reviewboard https://reviews.apache.org/r/24214/diff/ against branch origin/trunk
          Hide
          omkreddy Manikumar added a comment -

          In this patch, LogCleaner decompress the compressed messages and writes back the retained messages in compressed form.

          I tested the following scenarios
          1. Topic with non-compressed messages
          2. Topic with compressed messages
          3. Topic with both compressed and non-compressed messages.

          Show
          omkreddy Manikumar added a comment - In this patch, LogCleaner decompress the compressed messages and writes back the retained messages in compressed form. I tested the following scenarios 1. Topic with non-compressed messages 2. Topic with compressed messages 3. Topic with both compressed and non-compressed messages.
          Hide
          nehanarkhede Neha Narkhede added a comment -

          Assigning to Joel Koshy for review

          Show
          nehanarkhede Neha Narkhede added a comment - Assigning to Joel Koshy for review
          Hide
          omkreddy Manikumar added a comment -

          Updated reviewboard https://reviews.apache.org/r/24214/diff/
          against branch origin/trunk

          Show
          omkreddy Manikumar added a comment - Updated reviewboard https://reviews.apache.org/r/24214/diff/ against branch origin/trunk
          Hide
          omkreddy Manikumar added a comment -

          Updated reviewboard https://reviews.apache.org/r/24214/diff/
          against branch origin/trunk

          Show
          omkreddy Manikumar added a comment - Updated reviewboard https://reviews.apache.org/r/24214/diff/ against branch origin/trunk
          Hide
          omkreddy Manikumar added a comment -

          Updated reviewboard https://reviews.apache.org/r/24214/diff/
          against branch origin/trunk

          Show
          omkreddy Manikumar added a comment - Updated reviewboard https://reviews.apache.org/r/24214/diff/ against branch origin/trunk
          Hide
          nehanarkhede Neha Narkhede added a comment -

          Bump. This is marked for 0.8.2. Feel free to reassign for review.

          Show
          nehanarkhede Neha Narkhede added a comment - Bump. This is marked for 0.8.2. Feel free to reassign for review.
          Hide
          omkreddy Manikumar added a comment -

          Updated reviewboard https://reviews.apache.org/r/24214/diff/
          against branch origin/trunk

          Show
          omkreddy Manikumar added a comment - Updated reviewboard https://reviews.apache.org/r/24214/diff/ against branch origin/trunk
          Hide
          omkreddy Manikumar added a comment -

          Updated reviewboard https://reviews.apache.org/r/24214/diff/
          against branch origin/trunk

          Show
          omkreddy Manikumar added a comment - Updated reviewboard https://reviews.apache.org/r/24214/diff/ against branch origin/trunk
          Hide
          omkreddy Manikumar added a comment -

          Updated reviewboard https://reviews.apache.org/r/24214/diff/
          against branch origin/trunk

          Show
          omkreddy Manikumar added a comment - Updated reviewboard https://reviews.apache.org/r/24214/diff/ against branch origin/trunk
          Hide
          omkreddy Manikumar added a comment -

          Updated reviewboard https://reviews.apache.org/r/24214/diff/
          against branch origin/trunk

          Show
          omkreddy Manikumar added a comment - Updated reviewboard https://reviews.apache.org/r/24214/diff/ against branch origin/trunk
          Hide
          jkreps Jay Kreps added a comment -

          Hey guys, the test kafka.tools.TestLogCleaning is a very aggressive test that runs against a kafka cluster configured for log compaction. It produces a bunch of messages and compacts them continuously and then does an out of band comparison of the two. It would be good to ensure that stills works on really large cleaner runs with deletes with this patch.

          Show
          jkreps Jay Kreps added a comment - Hey guys, the test kafka.tools.TestLogCleaning is a very aggressive test that runs against a kafka cluster configured for log compaction. It produces a bunch of messages and compacts them continuously and then does an out of band comparison of the two. It would be good to ensure that stills works on really large cleaner runs with deletes with this patch.
          Hide
          omkreddy Manikumar added a comment -

          yes, we are using TestLogCleaning tool to test the changes.

          TestLogCleaning stress test output for compressed messages

          Producing 100000 messages...
          Logging produce requests to /tmp/kafka-log-cleaner-produced-6014466306002699464.txt
          Sleeping for 120 seconds...
          Consuming messages...
          Logging consumed messages to /tmp/kafka-log-cleaner-consumed-177538909590644701.txt
          100000 rows of data produced, 13165 rows of data consumed (86.8% reduction).
          De-duplicating and validating output files...
          Validated 9005 values, 0 mismatches.
          
          Producing 1000000 messages...
          Logging produce requests to /tmp/kafka-log-cleaner-produced-3298578695475992991.txt
          Sleeping for 120 seconds...
          Consuming messages...
          Logging consumed messages to /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
          1000000 rows of data produced, 119926 rows of data consumed (88.0% reduction).
          De-duplicating and validating output files...
          Validated 89947 values, 0 mismatches.
          
          Producing 10000000 messages...
          Logging produce requests to /tmp/kafka-log-cleaner-produced-3336255463347572934.txt
          Sleeping for 120 seconds...
          Consuming messages...
          Logging consumed messages to /tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
          10000000 rows of data produced, 1645281 rows of data consumed (83.5% reduction).
          De-duplicating and validating output files...
          Validated 899853 values, 0 mismatches.
          
          Show
          omkreddy Manikumar added a comment - yes, we are using TestLogCleaning tool to test the changes. TestLogCleaning stress test output for compressed messages Producing 100000 messages... Logging produce requests to /tmp/kafka-log-cleaner-produced-6014466306002699464.txt Sleeping for 120 seconds... Consuming messages... Logging consumed messages to /tmp/kafka-log-cleaner-consumed-177538909590644701.txt 100000 rows of data produced, 13165 rows of data consumed (86.8% reduction). De-duplicating and validating output files... Validated 9005 values, 0 mismatches. Producing 1000000 messages... Logging produce requests to /tmp/kafka-log-cleaner-produced-3298578695475992991.txt Sleeping for 120 seconds... Consuming messages... Logging consumed messages to /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt 1000000 rows of data produced, 119926 rows of data consumed (88.0% reduction). De-duplicating and validating output files... Validated 89947 values, 0 mismatches. Producing 10000000 messages... Logging produce requests to /tmp/kafka-log-cleaner-produced-3336255463347572934.txt Sleeping for 120 seconds... Consuming messages... Logging consumed messages to /tmp/kafka-log-cleaner-consumed-9149188270705707725.txt 10000000 rows of data produced, 1645281 rows of data consumed (83.5% reduction). De-duplicating and validating output files... Validated 899853 values, 0 mismatches.
          Hide
          jjkoshy Joel Koshy added a comment -

          I can review this next week. However, as far as checking in is concerned I would strongly prefer to get KAFKA-1755 done first (for which I have a patch almost ready). The reason for that is that this patch is a significant change to the log cleaner and I would rather get some defensive code in first since the log cleaner health is critical for offset management as well as for Samza use-cases.

          Show
          jjkoshy Joel Koshy added a comment - I can review this next week. However, as far as checking in is concerned I would strongly prefer to get KAFKA-1755 done first (for which I have a patch almost ready). The reason for that is that this patch is a significant change to the log cleaner and I would rather get some defensive code in first since the log cleaner health is critical for offset management as well as for Samza use-cases.
          Hide
          jkreps Jay Kreps added a comment -

          Great!

          Show
          jkreps Jay Kreps added a comment - Great!
          Hide
          jjkoshy Joel Koshy added a comment -

          Sorry I dropped this. I just reviewed the patch. I think it looks good, but needs a rebase. Let me know if you are swamped though and we can help with it.

          Show
          jjkoshy Joel Koshy added a comment - Sorry I dropped this. I just reviewed the patch. I think it looks good, but needs a rebase. Let me know if you are swamped though and we can help with it.
          Hide
          omkreddy Manikumar added a comment -

          Updated reviewboard https://reviews.apache.org/r/24214/diff/
          against branch origin/trunk

          Show
          omkreddy Manikumar added a comment - Updated reviewboard https://reviews.apache.org/r/24214/diff/ against branch origin/trunk
          Hide
          jjkoshy Joel Koshy added a comment -

          Created reviewboard https://reviews.apache.org/r/34397/diff/
          against branch origin/trunk

          Show
          jjkoshy Joel Koshy added a comment - Created reviewboard https://reviews.apache.org/r/34397/diff/ against branch origin/trunk
          Hide
          jjkoshy Joel Koshy added a comment -

          Updated reviewboard https://reviews.apache.org/r/34397/diff/
          against branch origin/trunk

          Show
          jjkoshy Joel Koshy added a comment - Updated reviewboard https://reviews.apache.org/r/34397/diff/ against branch origin/trunk
          Hide
          guozhang Guozhang Wang added a comment -

          Committed to trunk.

          I think we need some integration tests before feeling confident about this issue being resolved, so leave it open just for now.

          Show
          guozhang Guozhang Wang added a comment - Committed to trunk. I think we need some integration tests before feeling confident about this issue being resolved, so leave it open just for now.
          Hide
          jjkoshy Joel Koshy added a comment -

          Please file a separate jira for that. Also, I'm going to amend this commit with the right author information.

          Show
          jjkoshy Joel Koshy added a comment - Please file a separate jira for that. Also, I'm going to amend this commit with the right author information.
          Hide
          jjkoshy Joel Koshy added a comment -

          I just wanted the commit to record Manikumar as author. Unfortunately our git repo does not seem to let me force push an amended commit so nm.

          Show
          jjkoshy Joel Koshy added a comment - I just wanted the commit to record Manikumar as author. Unfortunately our git repo does not seem to let me force push an amended commit so nm.
          Hide
          granthenke Grant Henke added a comment -

          With this being resolved, should we update this section of the docs?
          http://kafka.apache.org/documentation.html#design_compactionlimitations

          Show
          granthenke Grant Henke added a comment - With this being resolved, should we update this section of the docs? http://kafka.apache.org/documentation.html#design_compactionlimitations
          Hide
          omkreddy Manikumar added a comment -

          Grant Henke Yes, I will update the docs.

          Show
          omkreddy Manikumar added a comment - Grant Henke Yes, I will update the docs.

            People

            • Assignee:
              omkreddy Manikumar
              Reporter:
              jjkoshy Joel Koshy
              Reviewer:
              Guozhang Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development