Kafka
  1. Kafka
  2. KAFKA-1755

Improve error handling in log cleaner

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.0.0
    • Component/s: None
    • Labels:

      Description

      The log cleaner is a critical process when using compacted topics.
      However, if there is any error in any topic (notably if a key is missing) then the cleaner exits and all other compacted topics will also be adversely affected - i.e., compaction stops across the board.

      This can be improved by just aborting compaction for a topic on any error and keep the thread from exiting.

      Another improvement would be to reject messages without keys that are sent to compacted topics although this is not enough by itself.

      1. KAFKA-1755_2015-02-23_14:29:54.patch
        39 kB
        Joel Koshy
      2. KAFKA-1755_2015-02-26_10:54:50.patch
        43 kB
        Joel Koshy
      3. KAFKA-1755.patch
        36 kB
        Joel Koshy

        Issue Links

          Activity

          Hide
          Chris Riccomini added a comment -

          It might also be desirable to allow the log compaction to continue on the topic in question, and simply keep all messages without keys without doing any compaction on them.

          Show
          Chris Riccomini added a comment - It might also be desirable to allow the log compaction to continue on the topic in question, and simply keep all messages without keys without doing any compaction on them.
          Hide
          Joel Koshy added a comment -

          There are a couple of issues that I was thinking of in scope for this jira:

          • Log cleaner threads quitting on errors (which may be a non-issue as discussed further below).
          • Dealing with cleaner failures due to unkeyed messages.
          • Other cleaner failures are possible as well (for e.g., compressed message sets until KAFKA-1374 is reviewed and checked-in)

          The reason this jira was filed is because the log cleaner compacts all compacted topics so one topic should (ideally) not affect another. Any practical deployment would need to set up alerts on the cleaner thread dying. Right now, I think the most reliable way to alert (with the currently available metrics) would be to monitor the max-dirty-ratio. If we set up this alert, then allowing the cleaner to continue would in practice only delay an alert. So one can argue that it is better to fail fast - i.e., let the log cleaner die because a problematic topic is something that needs to be looked into immediately. However, I think there are further improvements with alternatives that can be made. It would be helpful if others can share their thoughts/preferences on these:

          • Introduce a new LogCleaningState: LogCleaningPausedDueToError
          • Introduce a metric for the number of live cleaner threads
          • If the log cleaner encounters any uncaught error, there are a couple of options:
            • Don't let the thread die, but move the partition to LogCleaningPausedDueToError. Other topics-partitions can still be compacted. Alerts can be set up on the number of partitions in state LogCleaningPausedDueToError.
            • Let the cleaner die and decrement live cleaner count. Alerts can be set up on the number of live cleaner threads.
          • If the cleaner encounters un-keyed messages:
            • Delete those messages, and do nothing. i.e., ignore (or just log the count in log cleaner stats)
            • Keep the messages, move the partition to LogCleaningPausedDueToError. The motivation for this is accidental misconfiguration. i.e., it may be important to not lose those messages. The error log cleaning state can be cleared only by deleting and then recreating the topic.
          • Additionally, I think we should reject producer requests containing un-keyed messages to compacted topics.
          • With all of the above, a backup alert can also be set up on the max-dirty-ratio.
          Show
          Joel Koshy added a comment - There are a couple of issues that I was thinking of in scope for this jira: Log cleaner threads quitting on errors (which may be a non-issue as discussed further below). Dealing with cleaner failures due to unkeyed messages. Other cleaner failures are possible as well (for e.g., compressed message sets until KAFKA-1374 is reviewed and checked-in) The reason this jira was filed is because the log cleaner compacts all compacted topics so one topic should (ideally) not affect another. Any practical deployment would need to set up alerts on the cleaner thread dying. Right now, I think the most reliable way to alert (with the currently available metrics) would be to monitor the max-dirty-ratio. If we set up this alert, then allowing the cleaner to continue would in practice only delay an alert. So one can argue that it is better to fail fast - i.e., let the log cleaner die because a problematic topic is something that needs to be looked into immediately. However, I think there are further improvements with alternatives that can be made. It would be helpful if others can share their thoughts/preferences on these: Introduce a new LogCleaningState: LogCleaningPausedDueToError Introduce a metric for the number of live cleaner threads If the log cleaner encounters any uncaught error, there are a couple of options: Don't let the thread die, but move the partition to LogCleaningPausedDueToError. Other topics-partitions can still be compacted. Alerts can be set up on the number of partitions in state LogCleaningPausedDueToError. Let the cleaner die and decrement live cleaner count. Alerts can be set up on the number of live cleaner threads. If the cleaner encounters un-keyed messages: Delete those messages, and do nothing. i.e., ignore (or just log the count in log cleaner stats) Keep the messages, move the partition to LogCleaningPausedDueToError. The motivation for this is accidental misconfiguration. i.e., it may be important to not lose those messages. The error log cleaning state can be cleared only by deleting and then recreating the topic. Additionally, I think we should reject producer requests containing un-keyed messages to compacted topics. With all of the above, a backup alert can also be set up on the max-dirty-ratio.
          Hide
          Guozhang Wang added a comment -

          Here are my two cents:

          1. At the end of the day, Kafka will have two types of topics, one type only accepts keyed messages and log compaction is used; the other one accepts any message and log cleaning is used. Those two types of topics never exchange, i.e. once a topic is created with one of the two types, it will never change its type until deletion.

          2. Compressed message will be supported with log compaction, which will de-serialize the message set and re-serialize.

          3. With these two points in mind, I would suggest for now:
          a. Broker reject non-keyed messages for compacted topics.
          b. Broker reject compressed messages for compacted topics (this will be lifted after KAFKA-1374 is checked in).
          c. With this, it should never happen that compactor thread encountering a non-keyed / compressed (this will be lifted after KAFKA-1374); if it happens, this would be a FATAL error and we should throw an exception and halt the server. It indicates some operations are needed and there are some code fixes before it can be restarted.

          Show
          Guozhang Wang added a comment - Here are my two cents: 1. At the end of the day, Kafka will have two types of topics, one type only accepts keyed messages and log compaction is used; the other one accepts any message and log cleaning is used. Those two types of topics never exchange, i.e. once a topic is created with one of the two types, it will never change its type until deletion. 2. Compressed message will be supported with log compaction, which will de-serialize the message set and re-serialize. 3. With these two points in mind, I would suggest for now: a. Broker reject non-keyed messages for compacted topics. b. Broker reject compressed messages for compacted topics (this will be lifted after KAFKA-1374 is checked in). c. With this, it should never happen that compactor thread encountering a non-keyed / compressed (this will be lifted after KAFKA-1374 ); if it happens, this would be a FATAL error and we should throw an exception and halt the server. It indicates some operations are needed and there are some code fixes before it can be restarted.
          Hide
          Joel Koshy added a comment -

          Thanks for the comments. The issue with 3a is that once we do have compression support for compacted topics it will be very ugly to implement that check on message arrival. This is because we need to do a deep iteration on incoming messages to look at the key field. The only time we do that currently on the broker is when assigning offsets. However, this code is in ByteBufferMessageSet which is fairly low-level and has no log-config information associated with it. We would have to "leak" some flag indicating whether non-keyed messages are allowed or not which is ugly. That is why I prefer not doing that check on message arrival and just have the log cleaner drop/ignore non-keyed messages with a warning. Ultimately, the effect is the same. However, the benefit of rejecting is that the producer is made aware of it. So I guess I changed my mind with regard to my earlier comment - i.e., I would recommend against doing this unless we can think of an elegant implementation. 3b is easy to do and we can implement that until KAFKA-1374 is in place.

          Show
          Joel Koshy added a comment - Thanks for the comments. The issue with 3a is that once we do have compression support for compacted topics it will be very ugly to implement that check on message arrival. This is because we need to do a deep iteration on incoming messages to look at the key field. The only time we do that currently on the broker is when assigning offsets. However, this code is in ByteBufferMessageSet which is fairly low-level and has no log-config information associated with it. We would have to "leak" some flag indicating whether non-keyed messages are allowed or not which is ugly. That is why I prefer not doing that check on message arrival and just have the log cleaner drop/ignore non-keyed messages with a warning. Ultimately, the effect is the same. However, the benefit of rejecting is that the producer is made aware of it. So I guess I changed my mind with regard to my earlier comment - i.e., I would recommend against doing this unless we can think of an elegant implementation. 3b is easy to do and we can implement that until KAFKA-1374 is in place.
          Hide
          Jay Kreps added a comment -

          Rejecting messages without a key doesn't actually solve the problem, I think as you can change the retention setting of a topic to compaction later at which point there may already be null keys.

          Perhaps the most consistent thing to do would actually be to treat null as a key value. So the cleaner would retain a single null value and remove the others.

          Show
          Jay Kreps added a comment - Rejecting messages without a key doesn't actually solve the problem, I think as you can change the retention setting of a topic to compaction later at which point there may already be null keys. Perhaps the most consistent thing to do would actually be to treat null as a key value. So the cleaner would retain a single null value and remove the others.
          Hide
          Joel Koshy added a comment -

          Created reviewboard https://reviews.apache.org/r/31306/
          against branch origin/trunk

          Show
          Joel Koshy added a comment - Created reviewboard https://reviews.apache.org/r/31306/ against branch origin/trunk
          Hide
          Joel Koshy added a comment -

          I thought a bit more about this and here is a patch that summarizes my thoughts.

          This patch does message validation on arrival, and drops unkeyed messages during log compaction.

          I actually think it is better to reject invalid messages (unkeyed and for now compressed) up front as opposed to accepting those messages and only dropping/warning during compaction. This way the producer is given early indication via a client-side error that it is doing something wrong which is better than just a broker-side warning/invalid metric. We still need to deal with unkeyed messages that may already be in the log but that is orthogonal I think - this includes the case when you change a non-compacted topic to be compacted. That is perhaps an invalid operation - i.e., you should ideally delete the topic before doing that, but in any event this patch handles that case by deleting invalid messages during log compaction.

          Case in point: at LinkedIn we use Kafka-based offset management for some of our consumers. We recently discovered compressed messages in the offsets topic which caused the log cleaner to quit. We saw this issue in the past with Samza checkpoint topics and suspected that Samza was doing something wrong. However, after seeing it in the __consumer_offsets topic it is more likely to be an actual bug in the broker - either in the log cleaner itself, or even at the lower level byte-buffer message set API level. We currently do not know. If we at least reject invalid messages on arrival we can rule out clients as being the issue.

          Show
          Joel Koshy added a comment - I thought a bit more about this and here is a patch that summarizes my thoughts. This patch does message validation on arrival, and drops unkeyed messages during log compaction. I actually think it is better to reject invalid messages (unkeyed and for now compressed) up front as opposed to accepting those messages and only dropping/warning during compaction. This way the producer is given early indication via a client-side error that it is doing something wrong which is better than just a broker-side warning/invalid metric. We still need to deal with unkeyed messages that may already be in the log but that is orthogonal I think - this includes the case when you change a non-compacted topic to be compacted. That is perhaps an invalid operation - i.e., you should ideally delete the topic before doing that, but in any event this patch handles that case by deleting invalid messages during log compaction. Case in point: at LinkedIn we use Kafka-based offset management for some of our consumers. We recently discovered compressed messages in the offsets topic which caused the log cleaner to quit. We saw this issue in the past with Samza checkpoint topics and suspected that Samza was doing something wrong. However, after seeing it in the __consumer_offsets topic it is more likely to be an actual bug in the broker - either in the log cleaner itself, or even at the lower level byte-buffer message set API level. We currently do not know. If we at least reject invalid messages on arrival we can rule out clients as being the issue.
          Hide
          Joel Koshy added a comment -

          Also, I have an incremental patch that prevents the log cleaner from quitting due to uncaught errors while cleaning a specific partition. It basically moves that partition to a permanent failed state and allows the cleaner to continue compacting other partitions. It continues to include the failed partition when computing the max dirty ratio so you can still accurately alert on that metric. We can discuss whether we want to add that or not.

          Show
          Joel Koshy added a comment - Also, I have an incremental patch that prevents the log cleaner from quitting due to uncaught errors while cleaning a specific partition. It basically moves that partition to a permanent failed state and allows the cleaner to continue compacting other partitions. It continues to include the failed partition when computing the max dirty ratio so you can still accurately alert on that metric. We can discuss whether we want to add that or not.
          Hide
          Joel Koshy added a comment -

          Updated reviewboard https://reviews.apache.org/r/31306/diff/
          against branch origin/trunk

          Show
          Joel Koshy added a comment - Updated reviewboard https://reviews.apache.org/r/31306/diff/ against branch origin/trunk
          Hide
          Joel Koshy added a comment -

          Updated reviewboard https://reviews.apache.org/r/31306/diff/
          against branch origin/trunk

          Show
          Joel Koshy added a comment - Updated reviewboard https://reviews.apache.org/r/31306/diff/ against branch origin/trunk
          Hide
          Gwen Shapira added a comment -

          This was in fact committed to trunk and is in 0.9.0.0:

          commit 1cd6ed9e2c07a63474ed80a8224bd431d5d4243c Joel Koshy committed on Mar 3
          https://github.com/apache/kafka/commit/1cd6ed9e2c07a63474ed80a8224bd431d5d4243c#diff-d7330411812d23e8a34889bee42fedfe

          Show
          Gwen Shapira added a comment - This was in fact committed to trunk and is in 0.9.0.0: commit 1cd6ed9e2c07a63474ed80a8224bd431d5d4243c Joel Koshy committed on Mar 3 https://github.com/apache/kafka/commit/1cd6ed9e2c07a63474ed80a8224bd431d5d4243c#diff-d7330411812d23e8a34889bee42fedfe

            People

            • Assignee:
              Joel Koshy
              Reporter:
              Joel Koshy
              Reviewer:
              Jay Kreps
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development