Kafka
  1. Kafka
  2. KAFKA-2894

WorkerSinkTask doesn't handle rewinding offsets on rebalance

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.9.0.0
    • Fix Version/s: 0.10.1.0
    • Component/s: KafkaConnect
    • Labels:
      None

      Description

      rewind() is only invoked at the beginning of each poll(). This means that if a rebalance occurs in the poll, it's feasible to get data that doesn't match a request to change offsets during the rebalance. I think the consumer will hold on to consumer data across the rebalance if it is reassigned the same offset, so there may already be data ready to be delivered. Additionally we may already have data in an incomplete messageBatch that should be discarded when the rewind is requested.

      While connectors that care about this (i.e. ones that manage their own offsets) can handle this correctly by tracking the offsets they're expecting to see, it's a hassle, error prone, an pretty unintuitive.

        Issue Links

          Activity

          Hide
          Ewen Cheslack-Postava added a comment -

          More generally, I think we need to handle cases like rewind() after any connector methods are invoked. The rewind() in poll() handles Connector.start() and Connector.put(), but we also need to handle the rebalance callbacks (where we can ignore Connector.close() and only do this after Connector.open()) and Connector.flush().

          Show
          Ewen Cheslack-Postava added a comment - More generally, I think we need to handle cases like rewind() after any connector methods are invoked. The rewind() in poll() handles Connector.start() and Connector.put(), but we also need to handle the rebalance callbacks (where we can ignore Connector.close() and only do this after Connector.open()) and Connector.flush().
          Hide
          Lucas Ariel Martinez added a comment -

          I apologize if I am asking in the wrong place, but I believe this issue could solve the problem I am having, and which I’ll explain in a bit.
          First, how do connectors actually manage their own offsets? I tried creating a sink connector extending SinkTask, and:

          1) I tried assigning a Map of <TopicPartition, Offset> to the context obtained in initialize() by doing context.offset(offsetMap). But this results in an exception “No current assignment for partition…” because the partitions were not assigned yet.

          2) I tried doing the same in the open() method by saving the context from initialize() in a variable and assigning to it a map built with the Collection<TopicPartition> parameter. The result was to fetch all the messages from the last kafka committed offset, plus again, all the messages from the offsets I assigned manually to the partitions!
          I also tried setting a worker property “auto.offset.reset=latest” as a workaround, but apparently it is not supported and the default value of earliest was used anyway.

          Am I wrong to think that rewinding offsets on rebalance would solve this issue and I would be able to manually assign offsets on the open() method?

          Show
          Lucas Ariel Martinez added a comment - I apologize if I am asking in the wrong place, but I believe this issue could solve the problem I am having, and which I’ll explain in a bit. First, how do connectors actually manage their own offsets? I tried creating a sink connector extending SinkTask, and: 1) I tried assigning a Map of <TopicPartition, Offset> to the context obtained in initialize() by doing context.offset(offsetMap). But this results in an exception “No current assignment for partition…” because the partitions were not assigned yet. 2) I tried doing the same in the open() method by saving the context from initialize() in a variable and assigning to it a map built with the Collection<TopicPartition> parameter. The result was to fetch all the messages from the last kafka committed offset, plus again, all the messages from the offsets I assigned manually to the partitions! I also tried setting a worker property “auto.offset.reset=latest” as a workaround, but apparently it is not supported and the default value of earliest was used anyway. Am I wrong to think that rewinding offsets on rebalance would solve this issue and I would be able to manually assign offsets on the open() method?
          Hide
          ASF GitHub Bot added a comment -

          GitHub user kkonstantine opened a pull request:

          https://github.com/apache/kafka/pull/1771

          KAFKA-2894: WorkerSinkTask should rewind offsets on rebalance

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kkonstantine/kafka KAFKA-2894-rewind-offsets-on-rebalance

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/kafka/pull/1771.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1771


          commit d14eff6084bafcf5014c8309703faafd96fe7071
          Author: Konstantine Karantasis <k.karantasis@gmail.com>
          Date: 2016-08-22T23:30:27Z

          KAFKA-2894: WorkerSinkTask should rewind offsets on rebalance


          Show
          ASF GitHub Bot added a comment - GitHub user kkonstantine opened a pull request: https://github.com/apache/kafka/pull/1771 KAFKA-2894 : WorkerSinkTask should rewind offsets on rebalance You can merge this pull request into a Git repository by running: $ git pull https://github.com/kkonstantine/kafka KAFKA-2894 -rewind-offsets-on-rebalance Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1771.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1771 commit d14eff6084bafcf5014c8309703faafd96fe7071 Author: Konstantine Karantasis <k.karantasis@gmail.com> Date: 2016-08-22T23:30:27Z KAFKA-2894 : WorkerSinkTask should rewind offsets on rebalance
          Hide
          Ewen Cheslack-Postava added a comment -

          Issue resolved by pull request 1771
          https://github.com/apache/kafka/pull/1771

          Show
          Ewen Cheslack-Postava added a comment - Issue resolved by pull request 1771 https://github.com/apache/kafka/pull/1771
          Hide
          ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/kafka/pull/1771

          Show
          ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/1771

            People

            • Assignee:
              Liquan Pei
              Reporter:
              Ewen Cheslack-Postava
            • Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development