Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4682

Committed offsets should not be deleted if a consumer is still active (KIP-211)

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:

      Description

      Kafka will delete committed offsets that are older than offsets.retention.minutes

      If there is an active consumer on a low traffic partition, it is possible that Kafka will delete the committed offset for that consumer. Once the offset is deleted, a restart or a rebalance of that consumer will cause the consumer to not find any committed offset and start consuming from earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker failover might also cause you to start reading from auto.offset.reset (due to broker restart, or coordinator failover).

      I think that Kafka should only delete offsets for inactive consumers. The timer should only start after a consumer group goes inactive. For example, if a consumer group goes inactive, then after 1 week, delete the offsets for that consumer group. This is a solution that Jun Rao mentioned in https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521

      The current workarounds are to:

      1. Commit an offset on every partition you own on a regular basis, making sure that it is more frequent than offsets.retention.minutes (a broker-side setting that a consumer might not be aware of)
        or
      2. Turn the value of offsets.retention.minutes up really really high. You have to make sure it is higher than any valid low-traffic rate that you want to support. For example, if you want to support a topic where someone produces once a month, you would have to set offsetes.retention.mintues to 1 month.
        or
      3. Turn on enable.auto.commit (this is essentially #1, but easier to implement).

      None of these are ideal.

      #1 can be spammy. It requires your consumers know something about how the brokers are configured. Sometimes it is out of your control. Mirrormaker, for example, only commits offsets on partitions where it receives data. And it is duplication that you need to put into all of your consumers.

      #2 has disk-space impact on the broker (in __consumer_offsets) as well as memory-size on the broker (to answer OffsetFetch).

      #3 I think has the potential for message loss (the consumer might commit on messages that are not yet fully processed)

        Issue Links

          Activity

          Hide
          jeffwidman Jeff Widman added a comment - - edited

          Now that consumers have background heartbeat thread, it should be much easier to identify when consumer dies vs alive. So this makes sense to me. However, this would make KAFKA-2000 more important because you can't count on offsets expiring.

          We also had a production problem where a couple of topics log files were totally cleared, but the offsets weren't cleared, so we had negative lag where consumer offset was higher than broker highwater. This was with zookeeper offset storage, but regardless I could envision something getting screwed up or someone resetting a cluster w/o understanding what they're doing and making offsets screwed up. If this was implemented those old offsets would never go away unless manually cleared up also. So I'd want to make sure that's protected against somehow... like if a broker ever encounters consumer offset that's higher than highwater mark, either an exception is thrown or those consumer offsets get reset to the broker highwater mark. Probably safest to just throw an exception in case something else funky is going on.

          Show
          jeffwidman Jeff Widman added a comment - - edited Now that consumers have background heartbeat thread, it should be much easier to identify when consumer dies vs alive. So this makes sense to me. However, this would make KAFKA-2000 more important because you can't count on offsets expiring. We also had a production problem where a couple of topics log files were totally cleared, but the offsets weren't cleared, so we had negative lag where consumer offset was higher than broker highwater. This was with zookeeper offset storage, but regardless I could envision something getting screwed up or someone resetting a cluster w/o understanding what they're doing and making offsets screwed up. If this was implemented those old offsets would never go away unless manually cleared up also. So I'd want to make sure that's protected against somehow... like if a broker ever encounters consumer offset that's higher than highwater mark, either an exception is thrown or those consumer offsets get reset to the broker highwater mark. Probably safest to just throw an exception in case something else funky is going on.
          Hide
          hachikuji Jason Gustafson added a comment -

          I think the suggestion makes sense. If we did that, I wonder if it is still necessary to enforce each offset's retention time individually. It might make more sense to drop the retention_time field from the offset commit request (which we don't use in the new consumer anyway) and expire all offsets for the group at the same time. I'm not sure if we would then need to provide another avenue to override the collective expiration time though (seems few people need it?). A change to the protocol would require a short KIP, so another way would be to continue allowing offset expiration individually, but don't start the timeout until the group is empty.

          One minor issue is the handling of simple consumers, but perhaps we can just let the expiration timer reset after every new offset commit.

          Show
          hachikuji Jason Gustafson added a comment - I think the suggestion makes sense. If we did that, I wonder if it is still necessary to enforce each offset's retention time individually. It might make more sense to drop the retention_time field from the offset commit request (which we don't use in the new consumer anyway) and expire all offsets for the group at the same time. I'm not sure if we would then need to provide another avenue to override the collective expiration time though (seems few people need it?). A change to the protocol would require a short KIP, so another way would be to continue allowing offset expiration individually, but don't start the timeout until the group is empty. One minor issue is the handling of simple consumers, but perhaps we can just let the expiration timer reset after every new offset commit.
          Hide
          wushujames James Cheng added a comment -

          I think that if we decide to start the timer when the partition no longer has an active consumer, then we no longer need an expiration time on the individual commit.

          You said "so another way would be to continue allowing offset expiration individually, but don't start the timeout until the group is empty". I'm not sure that will work. According to the code and protocol doc, there is a commit time in the offset in the API call, as well as an expiration time (calculated upon receipt). And the expiration time says something like "expire at so-and-so" time. And there's no way to compare that time against the "time the partition was last assigned to an active consumer".

          I suppose you could take (expiration_time - commit_time) and use that to calculate a duration, and then start the timer for that duration when the partition loses an active consumer. That would work, but it'd be very roundabout.

          What do you mean by "expire all offsets for the group at the same time"? And "collective expiration time"?

          Show
          wushujames James Cheng added a comment - I think that if we decide to start the timer when the partition no longer has an active consumer, then we no longer need an expiration time on the individual commit. You said "so another way would be to continue allowing offset expiration individually, but don't start the timeout until the group is empty". I'm not sure that will work. According to the code and protocol doc, there is a commit time in the offset in the API call, as well as an expiration time (calculated upon receipt). And the expiration time says something like "expire at so-and-so" time. And there's no way to compare that time against the "time the partition was last assigned to an active consumer". I suppose you could take (expiration_time - commit_time) and use that to calculate a duration, and then start the timer for that duration when the partition loses an active consumer. That would work, but it'd be very roundabout. What do you mean by "expire all offsets for the group at the same time"? And "collective expiration time"?
          Hide
          hachikuji Jason Gustafson added a comment -

          Yeah, the hack you mentioned is what I had in mind. We could also just change the message format so that instead of storing the commit expiration times, we only store the retention time. Obviously I would prefer the KIP route to take the retention time out of the OffsetCommit API (since it's not used anyway). Are you interested in taking this on?

          By "collective expiration time," I was referring to the duration before which all offsets are expired. I'm unsure whether users would want to be able to control this, or whether a single global config on the broker would be sufficient

          Show
          hachikuji Jason Gustafson added a comment - Yeah, the hack you mentioned is what I had in mind. We could also just change the message format so that instead of storing the commit expiration times, we only store the retention time. Obviously I would prefer the KIP route to take the retention time out of the OffsetCommit API (since it's not used anyway). Are you interested in taking this on? By "collective expiration time," I was referring to the duration before which all offsets are expired. I'm unsure whether users would want to be able to control this, or whether a single global config on the broker would be sufficient
          Hide
          vahid Vahid Hashemian added a comment -

          Jason Gustafson et al.
          I'm interested in taking this on. Just to make sure I understand the proper approach (KIP) before starting to write the draft:

          We don't want to expire group offsets while the group is active, so we would want to phase out individual offset expirations inside the group (that would mean removing the unused retention time field from the OffsetCommit protocol). On the other hand, we seem to now need some sort of an expiration time per consumer group so we can remove offsets within the group if that expiration time is passed and the group is no longer active. This expiration time is set and takes effect only when the group becomes empty.

          Is this a reasonable summary of what needs to happen?

          Show
          vahid Vahid Hashemian added a comment - Jason Gustafson et al. I'm interested in taking this on. Just to make sure I understand the proper approach (KIP) before starting to write the draft: We don't want to expire group offsets while the group is active, so we would want to phase out individual offset expirations inside the group (that would mean removing the unused retention time field from the OffsetCommit protocol). On the other hand, we seem to now need some sort of an expiration time per consumer group so we can remove offsets within the group if that expiration time is passed and the group is no longer active. This expiration time is set and takes effect only when the group becomes empty. Is this a reasonable summary of what needs to happen?
          Hide
          wushujames James Cheng added a comment -

          Yes, that's a reasonable summary.

          Other details to consider:

          • Jason Gustafson asked whether we have a single broker-level config for the expiration time that applies to all groups, or if we need to allow individual consumer groups to specify their own expiration time. In the discussion for KIP-186, Jason Gustafson raised the same question. I'm not sure.
          • Offsets are normally saved per (partition, groupId). Do we want to allow offsets to be expired for individual partitions separately from the group? As an example, say I have a groupId="foo" that commits for (Topic A, partition 0) and (Topic B, partition 0). And then groupId stops subscribing to Topic B, and only subscribes to (Topic A, partition 0). Should the offset for (Topic B, partition 0) stay around as long as the group is active? Or, should it be expired, since it is not part of the group anymore?
          Show
          wushujames James Cheng added a comment - Yes, that's a reasonable summary. Other details to consider: Jason Gustafson asked whether we have a single broker-level config for the expiration time that applies to all groups, or if we need to allow individual consumer groups to specify their own expiration time. In the discussion for KIP-186 , Jason Gustafson raised the same question. I'm not sure. Offsets are normally saved per (partition, groupId). Do we want to allow offsets to be expired for individual partitions separately from the group? As an example, say I have a groupId="foo" that commits for (Topic A, partition 0) and (Topic B, partition 0). And then groupId stops subscribing to Topic B, and only subscribes to (Topic A, partition 0). Should the offset for (Topic B, partition 0) stay around as long as the group is active? Or, should it be expired, since it is not part of the group anymore?
          Hide
          vahid Vahid Hashemian added a comment - - edited

          James Cheng Thanks for your feedback. Regarding the other details you brought up:

          • Jason Gustafson's suggestion on KIP-186 makes sense to me. The OffsetCommit API can be used to override the default broker level property offset.retention.minutes for specific group/topic/partitions. This means we probably wouldn't need to have a group-level retention config. What a potential KIP for this JIRA would be adding is that the retention timer kicks off at the moment the group becomes empty, and while the group is stable no offset will be removed (as retention timer is not ticking yet).
          • Regarding your second point, I guess we could pick either method. It all would depend on the criteria for triggering the retention timer for a partition. If we trigger it when the group is empty (as in the previous bullet) then we would be expiring the offset for B-0 with all other group partitions. If, on the other hand, we decide to trigger the timer when the partition stops being consumed within the group, then B-0's offset could expire while the group is still active. I'm not sure how common this scenario is in real applications. If it's not that common perhaps it wouldn't cost a lot to keep B-0's offsets around with the rest of the group. In any case, we should be able to pick one approach or the other depending on what you and others believe is more reasonable.

          What do you think? Jason Gustafson, what are your thoughts on this?

          Show
          vahid Vahid Hashemian added a comment - - edited James Cheng Thanks for your feedback. Regarding the other details you brought up: Jason Gustafson 's suggestion on KIP-186 makes sense to me. The OffsetCommit API can be used to override the default broker level property offset.retention.minutes for specific group/topic/partitions. This means we probably wouldn't need to have a group-level retention config. What a potential KIP for this JIRA would be adding is that the retention timer kicks off at the moment the group becomes empty, and while the group is stable no offset will be removed (as retention timer is not ticking yet). Regarding your second point, I guess we could pick either method. It all would depend on the criteria for triggering the retention timer for a partition. If we trigger it when the group is empty (as in the previous bullet) then we would be expiring the offset for B-0 with all other group partitions. If, on the other hand, we decide to trigger the timer when the partition stops being consumed within the group, then B-0 's offset could expire while the group is still active. I'm not sure how common this scenario is in real applications. If it's not that common perhaps it wouldn't cost a lot to keep B-0 's offsets around with the rest of the group. In any case, we should be able to pick one approach or the other depending on what you and others believe is more reasonable. What do you think? Jason Gustafson , what are your thoughts on this?
          Hide
          hachikuji Jason Gustafson added a comment -

          Sorry for the late response. There seem to be a few open questions:

          1. Does the consumer need the ability to control the retention timeout or is a broker config sufficient? I am not too sure about this. There is at least one use case (ConsoleConsumer) where we might intentionally set a low value, but I'm not sure how bad it would be to let it stick with the default. It certainly would have been helpful prior to Ewen's KIP.

          2. Do we still need offset-level expiration or should we move it to the group? Personally, it feels a little odd to expire offsets at different times once a group is empty. It's a little more intuitive to expire them all at once. Another way to view this would be that we deprecate the offset retention setting and add a group metadata retention setting. Once the group has gone empty, we start its retention timer. If it expires, we clear all of its state including offsets.

          3. Do we need to change the format of the offset metadata messages? Currently the offset metadata that is stored in the log includes an expiration timestamp. This won't make much sense any more because we won't know what timestamp to use when the offset is first stored. While we're at it, we could probably also remove the commit timestamp and use the timestamp from the message itself. This also depends on the answer to the first question.

          4. Should we start the expiration timer for an individual offset if the group is no longer subscribed to the corresponding topic? My inclination is to keep it simple and say no, but I guess there is a risk that this tends to grow the cache more than existing behavior. If we're concerned about this, then we probably need to keep the individual offset expiration timer. Unfortunately because of the generic group protocol (which is also used in Connect), we don't currently have the ability to inspect subscriptions to know if a topic is still subscribed.

          Show
          hachikuji Jason Gustafson added a comment - Sorry for the late response. There seem to be a few open questions: 1. Does the consumer need the ability to control the retention timeout or is a broker config sufficient? I am not too sure about this. There is at least one use case (ConsoleConsumer) where we might intentionally set a low value, but I'm not sure how bad it would be to let it stick with the default. It certainly would have been helpful prior to Ewen's KIP. 2. Do we still need offset-level expiration or should we move it to the group? Personally, it feels a little odd to expire offsets at different times once a group is empty. It's a little more intuitive to expire them all at once. Another way to view this would be that we deprecate the offset retention setting and add a group metadata retention setting. Once the group has gone empty, we start its retention timer. If it expires, we clear all of its state including offsets. 3. Do we need to change the format of the offset metadata messages? Currently the offset metadata that is stored in the log includes an expiration timestamp. This won't make much sense any more because we won't know what timestamp to use when the offset is first stored. While we're at it, we could probably also remove the commit timestamp and use the timestamp from the message itself. This also depends on the answer to the first question. 4. Should we start the expiration timer for an individual offset if the group is no longer subscribed to the corresponding topic? My inclination is to keep it simple and say no, but I guess there is a risk that this tends to grow the cache more than existing behavior. If we're concerned about this, then we probably need to keep the individual offset expiration timer. Unfortunately because of the generic group protocol (which is also used in Connect), we don't currently have the ability to inspect subscriptions to know if a topic is still subscribed.
          Hide
          vahid Vahid Hashemian added a comment - - edited

          Jason Gustafson Thank you for your comments. You seem to be looking at this with an inclination to get rid of the retention time from the OffsetCommit protocol. I think with my comments below I'm considering the alternative:

          1. Ewen's KIP proposes to increase the default retention from 1 day to 7 days. So, allowing consumers to set a lower timeout (for the console consumer) seems to be helpful after his KIP; the same way allowing them to set a higher timeout (for actual consumer applications) is helpful before his KIP.
          2. Even if we have offset-level expiration, all offsets in the group should expire together, because the expiration timer starts ticking for all partitions at the same time (when the group becomes empty). The only exception is when a consumer has set a non-default retention time for particular partitions (e.g. using the OffsetCommit API).
          3. Agreed. The expiration timestamp won't make sense. Perhaps the retention time should be stored and whether to expire or not could be calculated on the fly from the time group becomes empty + retention time (we would need to somehow keep the timestamp of the group becoming empty). This expiration check needs to be performed only if the group is empty; otherwise there is no need to expire at all.
          4. I don't have a strong feeling about this. It's for sure simpler to let all offsets expire at the same time. And if we keep the individual offset retention it would be easier to change this in case the cache size becomes an issue.

          I think there is a risk involved in removing the individual retention from the protocol: could some requirement arise in the future that makes us bring it back to the protocol? One option is to let that field stay for now, and remove it later once we are more certain that it won't be needed back.

          Show
          vahid Vahid Hashemian added a comment - - edited Jason Gustafson Thank you for your comments. You seem to be looking at this with an inclination to get rid of the retention time from the OffsetCommit protocol. I think with my comments below I'm considering the alternative: Ewen's KIP proposes to increase the default retention from 1 day to 7 days. So, allowing consumers to set a lower timeout (for the console consumer) seems to be helpful after his KIP; the same way allowing them to set a higher timeout (for actual consumer applications) is helpful before his KIP. Even if we have offset-level expiration, all offsets in the group should expire together, because the expiration timer starts ticking for all partitions at the same time (when the group becomes empty). The only exception is when a consumer has set a non-default retention time for particular partitions (e.g. using the OffsetCommit API). Agreed. The expiration timestamp won't make sense. Perhaps the retention time should be stored and whether to expire or not could be calculated on the fly from the time group becomes empty + retention time (we would need to somehow keep the timestamp of the group becoming empty). This expiration check needs to be performed only if the group is empty; otherwise there is no need to expire at all. I don't have a strong feeling about this. It's for sure simpler to let all offsets expire at the same time. And if we keep the individual offset retention it would be easier to change this in case the cache size becomes an issue. I think there is a risk involved in removing the individual retention from the protocol: could some requirement arise in the future that makes us bring it back to the protocol? One option is to let that field stay for now, and remove it later once we are more certain that it won't be needed back.
          Hide
          vahid Vahid Hashemian added a comment -

          Jason Gustafson I have started drafting a KIP for the changes discussed here. Could you please clarify what you mean by

          ... we could probably also remove the commit timestamp and use the timestamp from the message itself. ...

          I see that the commit timestamp is set to the time the request is processed (which supposedly is when the offset is committed). So I'm not clear what you mean by "timestamp from the message itself".
          Thanks.

          Show
          vahid Vahid Hashemian added a comment - Jason Gustafson I have started drafting a KIP for the changes discussed here. Could you please clarify what you mean by ... we could probably also remove the commit timestamp and use the timestamp from the message itself. ... I see that the commit timestamp is set to the time the request is processed (which supposedly is when the offset is committed). So I'm not clear what you mean by "timestamp from the message itself". Thanks.
          Hide
          vahid Vahid Hashemian added a comment -

          I just started a KIP discussion for this JIRA. The KIP can be found here.

          Show
          vahid Vahid Hashemian added a comment - I just started a KIP discussion for this JIRA. The KIP can be found here .
          Hide
          drew_kutchar Drew Kutcharian added a comment -

          This just happened to us and I just stumbled upon this JIRA while trying to figure out the cause. A few questions:

          1. Aren't consumer offset topics compacted? Shouldn't at least the last entry stay on disk after cleanup?

          2. Considering that they are compacted, what is the real concern with workaround 2 in the description: "2. Turn the value of offsets.retention.minutes up really really high"?

          3. As a workaround, would it make sense to set offsets.retention.ms to the same value as logs.retention.ms and auto.offset.reset to earliest? That way consumers and logs would "reset" the same time?

          4. Is there a timeline for the release of KIP-211?

          Show
          drew_kutchar Drew Kutcharian added a comment - This just happened to us and I just stumbled upon this JIRA while trying to figure out the cause. A few questions: 1. Aren't consumer offset topics compacted? Shouldn't at least the last entry stay on disk after cleanup? 2. Considering that they are compacted, what is the real concern with workaround 2 in the description: "2. Turn the value of offsets.retention.minutes up really really high"? 3. As a workaround, would it make sense to set offsets.retention.ms to the same value as logs.retention.ms and auto.offset.reset to earliest ? That way consumers and logs would "reset" the same time? 4. Is there a timeline for the release of KIP-211?
          Hide
          jcrowley John Crowley added a comment -

          Just found this entry - had previously commented on https://issues.apache.org/jira/browse/KAFKA-3806

          Is it possible to allow the offsets.retention.minutes to be set per groupId (in a similar way that retention.ms can be set per topic)?

          This would allow a fairly short default - 1 day as is current - to remove abandoned groupId metadata yet allow the user to indicate that a particular groupId should be handled differently. Example in 3806 was a PubSub using Kafka as a persistent, reliable store supporting multiple subscribers. Some of the source data has very low volatility - e.g. next year's holiday calendar for a company, which probably only changes once a year. A consumer must still poll in case an error update is posted, but will in the normal case not do a real commit for 12 months!

          Show
          jcrowley John Crowley added a comment - Just found this entry - had previously commented on https://issues.apache.org/jira/browse/KAFKA-3806 Is it possible to allow the offsets.retention.minutes to be set per groupId (in a similar way that retention.ms can be set per topic)? This would allow a fairly short default - 1 day as is current - to remove abandoned groupId metadata yet allow the user to indicate that a particular groupId should be handled differently. Example in 3806 was a PubSub using Kafka as a persistent, reliable store supporting multiple subscribers. Some of the source data has very low volatility - e.g. next year's holiday calendar for a company, which probably only changes once a year. A consumer must still poll in case an error update is posted, but will in the normal case not do a real commit for 12 months!

            People

            • Assignee:
              vahid Vahid Hashemian
              Reporter:
              wushujames James Cheng
            • Votes:
              6 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

              • Created:
                Updated:

                Development