Kafka
  1. Kafka
  2. KAFKA-232

ConsumerConnector has no access to "getOffsetsBefore"

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      kafka.javaapi.SimpleConsumer has "getOffsetsBefore". I would like this ability in KafkaMessageStream or in ConsumerConnector. In this way clients can access their current position.

        Activity

        Hide
        Neha Narkhede added a comment -

        I think here is how that can work -

        Say we enable a "custom.beginning.offsets" option in the zk consumer. What that means is that this particular group id has chosen to work with custom beginning offsets.

        The requirement is to make sure that only one consumer in the group specifies this option. And if more than one consumer in a group tries to do the same, we throw an exception. This can be implemented via zookeeper locks, where the first consumer id that specifies this option writes the custom offsets in the /consumers/groups/[group_id]/custom_offsets path. Following that any other consumer trying to do the same will get back an exception stating the consumer id that currently has the lock.

        On the first successful rebalance after startup, the consumer ids can check for this path and reset the consumed offset for the partitions they own to that value.

        This is just a cursory explanation of the idea and many details would have to be worked out. This is very tricky with the current implementation of the consumer rebalancing logic. I feel that this will be much easier to implement if we move to the co-ordinator approach for the consumer rebalancing. I've been meaning to write up a proposal for that. Will write up my ideas and add some ideas for this JIRA to it.

        Show
        Neha Narkhede added a comment - I think here is how that can work - Say we enable a "custom.beginning.offsets" option in the zk consumer. What that means is that this particular group id has chosen to work with custom beginning offsets. The requirement is to make sure that only one consumer in the group specifies this option. And if more than one consumer in a group tries to do the same, we throw an exception. This can be implemented via zookeeper locks, where the first consumer id that specifies this option writes the custom offsets in the /consumers/groups/ [group_id] /custom_offsets path. Following that any other consumer trying to do the same will get back an exception stating the consumer id that currently has the lock. On the first successful rebalance after startup, the consumer ids can check for this path and reset the consumed offset for the partitions they own to that value. This is just a cursory explanation of the idea and many details would have to be worked out. This is very tricky with the current implementation of the consumer rebalancing logic. I feel that this will be much easier to implement if we move to the co-ordinator approach for the consumer rebalancing. I've been meaning to write up a proposal for that. Will write up my ideas and add some ideas for this JIRA to it.
        Hide
        Jun Rao added a comment -

        We discussed in the mailing before about allowing the ZK-based consumer to consume from an arbitrary offset during startup time. Here is the main complexity. Multiple consumers in the same group can consume a topic jointly. When they start up, which consumer sets the offset for which partitions? How do we prevent 2 consumers from setting the offset for the same partition?

        Show
        Jun Rao added a comment - We discussed in the mailing before about allowing the ZK-based consumer to consume from an arbitrary offset during startup time. Here is the main complexity. Multiple consumers in the same group can consume a topic jointly. When they start up, which consumer sets the offset for which partitions? How do we prevent 2 consumers from setting the offset for the same partition?
        Hide
        Edward Capriolo added a comment -

        Interesting idea. I am not sure that will work well, many producers will be logging and I would expect that some messages will arrive out of order. Especially as the minute turns over. In the worst case scenario a producer might have an unexpected shutdown and wake up at some time later. Ideally I would like the producers to not have to write special headers etc. I thought of recording the last message seen assuming that each record is more or less unique but there are edge cases here as well.

        I opened this issue up because I am wondering why the two consumers have different functionality. Which one should be used for which cases? It is not entirely clear to a new user like myself.

        From a mile-high view it seems like if the ConsumerConnection has access to commitOffset() it should be able to call getOffset(). Is there some technical complexity in implementing this?

        Show
        Edward Capriolo added a comment - Interesting idea. I am not sure that will work well, many producers will be logging and I would expect that some messages will arrive out of order. Especially as the minute turns over. In the worst case scenario a producer might have an unexpected shutdown and wake up at some time later. Ideally I would like the producers to not have to write special headers etc. I thought of recording the last message seen assuming that each record is more or less unique but there are edge cases here as well. I opened this issue up because I am wondering why the two consumers have different functionality. Which one should be used for which cases? It is not entirely clear to a new user like myself. From a mile-high view it seems like if the ConsumerConnection has access to commitOffset() it should be able to call getOffset(). Is there some technical complexity in implementing this?
        Hide
        Jun Rao added a comment -

        Do you think you can add meta data in the message itself so that you know whether the consumed data has moved to the next minute? If so, you can turn off auto commit and call commitOffset() directly.

        Show
        Jun Rao added a comment - Do you think you can add meta data in the message itself so that you know whether the consumed data has moved to the next minute? If so, you can turn off auto commit and call commitOffset() directly.
        Hide
        Edward Capriolo added a comment -

        Sure. I am using Kafka to build aggregations of logs. These aggregations are going to be minutely. I want to record clientid and my log position externally. If my consumer goes down 30 seconds into the minute I want to replay the entire minute over. I might also want to bring this client up at position x on another node.

        Show
        Edward Capriolo added a comment - Sure. I am using Kafka to build aggregations of logs. These aggregations are going to be minutely. I want to record clientid and my log position externally. If my consumer goes down 30 seconds into the minute I want to replay the entire minute over. I might also want to bring this client up at position x on another node.
        Hide
        Jun Rao added a comment -

        Could you explain in a bit more detail how you plan to use this method? Today, ConsumerConnector doesn't support consuming from an arbitrary offset.

        Show
        Jun Rao added a comment - Could you explain in a bit more detail how you plan to use this method? Today, ConsumerConnector doesn't support consuming from an arbitrary offset.

          People

          • Assignee:
            Unassigned
            Reporter:
            Edward Capriolo
          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:

              Development