Kafka
  1. Kafka
  2. KAFKA-987

Avoid checkpointing offsets in Kafka consumer that have not changed since the last commit

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:

      Description

      We need to fix the Kafka zookeeper consumer to avoid checkpointing offsets that have not changed since the last offset commit. This will help reduce the write load on zookeeper.

      1. kafka-987.patch
        2 kB
        Swapnil Ghike
      2. kafka-987-v2.patch
        2 kB
        Swapnil Ghike

        Activity

        Hide
        Swapnil Ghike added a comment -

        The only case wherein the code does not have a bug and yet the consumer could end up consuming from an offset < the previously committed offset is when there is an unclean leader election. Hence, this patch will commit new offsets as long as they are different from the previously committed offsets.

        Show
        Swapnil Ghike added a comment - The only case wherein the code does not have a bug and yet the consumer could end up consuming from an offset < the previously committed offset is when there is an unclean leader election. Hence, this patch will commit new offsets as long as they are different from the previously committed offsets.
        Hide
        Neha Narkhede added a comment -

        Thanks for the patch, Swapnil. Good thinking about not limiting the check to offset < committed offset. Just one question about your patch -

        In commitOffsets, should the the map update move to inside the try block to ensure that the map is updated only if the zk write succeeds ?

        Show
        Neha Narkhede added a comment - Thanks for the patch, Swapnil. Good thinking about not limiting the check to offset < committed offset. Just one question about your patch - In commitOffsets, should the the map update move to inside the try block to ensure that the map is updated only if the zk write succeeds ?
        Hide
        Swapnil Ghike added a comment -

        Yes, thank you. Attached v2.

        Show
        Swapnil Ghike added a comment - Yes, thank you. Attached v2.
        Hide
        Neha Narkhede added a comment -

        +1 on v2.

        Show
        Neha Narkhede added a comment - +1 on v2.
        Hide
        Neha Narkhede added a comment -

        committed v2 to 0.8

        Show
        Neha Narkhede added a comment - committed v2 to 0.8
        Hide
        Jun Rao added a comment -

        It seems that we don't need to update the offset map in addPartitionTopicInfo(). In fact, currently, if there is no new messages coming in, we won't checkpoint the first offset.

        Show
        Jun Rao added a comment - It seems that we don't need to update the offset map in addPartitionTopicInfo(). In fact, currently, if there is no new messages coming in, we won't checkpoint the first offset.
        Hide
        Swapnil Ghike added a comment -

        I think that any call to createMessageStreams will trigger a rebalance, that will fill up the topicregistry and the checkpointing of offsets will start regardless of whether new messages are being consumed or not. Hence, we should probably update the cached checkpointedOffsets map in addPartitionTopicInfo().

        May be I have missed something?

        Show
        Swapnil Ghike added a comment - I think that any call to createMessageStreams will trigger a rebalance, that will fill up the topicregistry and the checkpointing of offsets will start regardless of whether new messages are being consumed or not. Hence, we should probably update the cached checkpointedOffsets map in addPartitionTopicInfo(). May be I have missed something?
        Hide
        Neha Narkhede added a comment -

        Jun,

        I think what you are suggesting makes sense on startup before the consumer has consumed any messages. However, since the offset map is a cache for what's in zookeeper, the safest way is to keep it in sync with the zookeeper data. Before the consumer can pull any data, it has to rebalance and while rebalancing we read the offsets from zk anyways. So I think it is correct to update the offset cache in addPartitionTopicInfo()

        Show
        Neha Narkhede added a comment - Jun, I think what you are suggesting makes sense on startup before the consumer has consumed any messages. However, since the offset map is a cache for what's in zookeeper, the safest way is to keep it in sync with the zookeeper data. Before the consumer can pull any data, it has to rebalance and while rebalancing we read the offsets from zk anyways. So I think it is correct to update the offset cache in addPartitionTopicInfo()
        Hide
        Jun Rao added a comment -

        1. The issue on startup is the following. If a consumer starts up from the end of the log and there is no new message coming in, no offset will be checkpointed to ZK. This will affect tools like ConsumerOffsetChecker.

        2. During rebalance, a consumer may pick up offsets committed by other consumer instances. If we don't update the offset cache in addPartitionTopicInfo(), we will do an extra unnecessary offset update to ZK.

        It seems to me that the impact for #1 is bigger than the slight performance impact in #2. Another way to do that is to always force the very first offset (per partition) write to ZK. However, I am not sure if it's worth the complexity.

        Show
        Jun Rao added a comment - 1. The issue on startup is the following. If a consumer starts up from the end of the log and there is no new message coming in, no offset will be checkpointed to ZK. This will affect tools like ConsumerOffsetChecker. 2. During rebalance, a consumer may pick up offsets committed by other consumer instances. If we don't update the offset cache in addPartitionTopicInfo(), we will do an extra unnecessary offset update to ZK. It seems to me that the impact for #1 is bigger than the slight performance impact in #2. Another way to do that is to always force the very first offset (per partition) write to ZK. However, I am not sure if it's worth the complexity.
        Hide
        Neha Narkhede added a comment -

        I'm trying to see if I understand what you are saying here.

        1. The basic logic is that as long as the consumer rebalances before starting consumption, the offset cache will be updated. This is true for the zookeeper consumer behavior today. Now, it really doesn't matter much where the consumer starts consuming from. If it hasn't read any messages, there is no need to update offsets in zookeeper. If it reads messages, the offsets will be different from what's in the cache, so they will get checkpointed.

        2. I don't think this is worth doing since it only reduces one zookeeper write.

        Show
        Neha Narkhede added a comment - I'm trying to see if I understand what you are saying here. 1. The basic logic is that as long as the consumer rebalances before starting consumption, the offset cache will be updated. This is true for the zookeeper consumer behavior today. Now, it really doesn't matter much where the consumer starts consuming from. If it hasn't read any messages, there is no need to update offsets in zookeeper. If it reads messages, the offsets will be different from what's in the cache, so they will get checkpointed. 2. I don't think this is worth doing since it only reduces one zookeeper write.
        Hide
        Swapnil Ghike added a comment -

        I discussed this yesterday with Jun. If there is no offset already present in zookeeper, we set the offset value to -1 in the offset cache in addPartitionInfo(). Later, even if no message is consumed, the real offset will be checkpointed. Jun said that he was ok with this patch.

        Show
        Swapnil Ghike added a comment - I discussed this yesterday with Jun. If there is no offset already present in zookeeper, we set the offset value to -1 in the offset cache in addPartitionInfo(). Later, even if no message is consumed, the real offset will be checkpointed. Jun said that he was ok with this patch.

          People

          • Assignee:
            Swapnil Ghike
            Reporter:
            Swapnil Ghike
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development