Kafka
  1. Kafka
  2. KAFKA-1006

Consumer loses messages of a new topic with auto.offset.reset = largest

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: None
    • Labels:

      Description

      Consumer currently uses auto.offset.reset = largest by default. If a new topic is created, consumer's topic watcher is fired. The consumer will first finish partition reassignment as part of rebalance and then start consuming from the tail of each partition. Until the partition reassignment is over, the server may have appended new messages to the new topic, consumer won't consume these messages. Thus, multiple batches of messages may be lost when a topic is newly created.

      The fix is to start consuming from the earliest offset for newly created topics.

        Activity

        Hide
        Guozhang Wang added a comment -

        Hi Eli Naeher, the new Java consumer is going to be released to replace the old Scala consumer soon, in which a rebalance callback is provided in the API such that users can manually set the starting offset to be any value upon newly added topic / partitions. This should resolve this problem. Would you be willing to wait for the Java consumer and try it out? We can try to patch the old Consumer implementation but at this stage I think people are trying to minimize the changes unless it is a severe production issue.

        Show
        Guozhang Wang added a comment - Hi Eli Naeher , the new Java consumer is going to be released to replace the old Scala consumer soon, in which a rebalance callback is provided in the API such that users can manually set the starting offset to be any value upon newly added topic / partitions. This should resolve this problem. Would you be willing to wait for the Java consumer and try it out? We can try to patch the old Consumer implementation but at this stage I think people are trying to minimize the changes unless it is a severe production issue.
        Hide
        Eli Naeher added a comment -

        This issue has caused us problems trying to write automated integration tests which set up new topics. Are there any plans to address it?

        Show
        Eli Naeher added a comment - This issue has caused us problems trying to write automated integration tests which set up new topics. Are there any plans to address it?
        Hide
        Neha Narkhede added a comment -

        Instead of adding a new property, we can treat this as the right behavior in all cases where a consumer discovers a new topic. I think this is a fairly easy change that has a big impact on usability and we should probably fix it even before the consumer rewrite in 0.9.

        Show
        Neha Narkhede added a comment - Instead of adding a new property, we can treat this as the right behavior in all cases where a consumer discovers a new topic. I think this is a fairly easy change that has a big impact on usability and we should probably fix it even before the consumer rewrite in 0.9.
        Hide
        Guozhang Wang added a comment -

        Propose the following fix:

        1. Add one more property in ConsumerConfig besides auto.offset.reset, named new.topic.offset.reset, which can be either largest or smallest, with default to smallest.

        2. In handleTopicEvent, when new topic is added, record the new topic in a list.

        3. In handleOffsetOutOfRange, if the topic is recorded as new topic, use the new config, otherwise use the global config.

        4. The list will be checked/cleared on commit offsets.

        Show
        Guozhang Wang added a comment - Propose the following fix: 1. Add one more property in ConsumerConfig besides auto.offset.reset, named new.topic.offset.reset, which can be either largest or smallest, with default to smallest. 2. In handleTopicEvent, when new topic is added, record the new topic in a list. 3. In handleOffsetOutOfRange, if the topic is recorded as new topic, use the new config, otherwise use the global config. 4. The list will be checked/cleared on commit offsets.

          People

          • Assignee:
            Guozhang Wang
            Reporter:
            Swapnil Ghike
          • Votes:
            2 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:

              Development