Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4618

FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.2
    • Fix Version/s: 1.2.0, 1.1.3
    • Component/s: Kafka Connector
    • Labels:
      None
    • Environment:

      Flink 1.1.2
      Kafka Broker 0.10.0
      Hadoop 2.7.0

      Description

      *Original reported ticket title: Last kafka message gets consumed twice when restarting job*

      There seem to be an issue with the offset management in Flink. When a job is stopped and startet again, a message from the previous offset is read again.
      I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new consumer group and emitted one record.

      You can cleary see, that the consumer waits for a new record at offset 4848911, which is correct. After restarting, it consumes a record at 4848910, causing the record to be consumed more than once.

      I checked the offset with the Kafka CMD tools, the commited offset in zookeeper is 4848910.

      Here is my log output:

      10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating connection to node 2147482646 at hdp1:6667.
      10:29:24,225 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching committed offsets for partitions: [myTopic-0]
      10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient                        - Completed connection to node 2147482646
      10:29:24,234 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No committed offset for partition myTopic-0
      10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting offset for partition myTopic-0 to latest offset.
      10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Fetched offset 4848910 for partition myTopic-0
      10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848910
      10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848910
      10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848910
      
      -- Inserting a new event here
      
      10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Adding fetched record for partition myTopic-0 with offset 4848910 to buffered record list
      10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning fetched records at offset 4848910 for assigned partition myTopic-0 and update position to 4848911
      10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 6 @ 1473841823887
      10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 6 (in 96 ms)
      10:30:24,196 TRACE org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, metadata=''}} to Node(2147482646, hdp1, 6667)
      10:30:24,204 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed offset 4848910 for partition myTopic-0
      10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:30:48,057 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:2946
      
      -- Restarting job
      
      10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating connection to node 2147482646 at hdp1:6667.
      10:32:01,673 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching committed offsets for partitions: [myTopic-0]
      10:32:01,677 DEBUG org.apache.kafka.clients.NetworkClient                        - Completed connection to node 2147482646
      // See below! Shouldn't the offset be 4848911?
      10:32:01,682 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting offset for partition myTopic-0 to the committed offset 4848910
      10:32:01,683 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848910
      10:32:01,685 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating connection to node 1001 at hdp1:6667.
      10:32:01,687 DEBUG org.apache.kafka.clients.NetworkClient                        - Completed connection to node 1001
      // Here record 4848910 gets consumed again!
      10:32:01,707 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Adding fetched record for partition myTopic-0 with offset 4848910 to buffered record list
      10:32:01,708 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning fetched records at offset 4848910 for assigned partition myTopic-0 and update position to 4848911
      10:32:03,721 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:32:04,224 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:32:04,726 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:32:04,894 INFO  org.apache.flink.runtime.blob.BlobCache                       - Shutting down BlobCache
      10:32:04,903 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:3079
      

        Issue Links

          Activity

          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Hi static-max,
          Flink achieves exactly-once guarantees by working with offsets that are checkpointed internally in Flink, not the offsets that are committed back to ZK / Kafka. This offset committing back to ZK is either done periodically or on Flink checkpointing, depending on the consumer configuration, and merely serves as a purpose of exposing a measure of progress to the outside world (wrt Flink).

          On a "fresh" startup of a job ("fresh" startup meaning that the execution of the job is not an automatic restore from previous failure - a manual restart of a job is a fresh startup), the Kafka consumer respects any existing offsets committed in ZK as starting points.
          So, if I am correct, what is actually happening is that, on your second execution of the job, the Kafka consumer is simply just starting from the offsets it finds in ZK.

          If you want exactly-once for manual job restarts, you would use Flink savepoints. See https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html for more detail.
          Otherwise, the exactly-once guarantee refers to job automatic restores across job failures.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Hi static-max , Flink achieves exactly-once guarantees by working with offsets that are checkpointed internally in Flink, not the offsets that are committed back to ZK / Kafka. This offset committing back to ZK is either done periodically or on Flink checkpointing, depending on the consumer configuration, and merely serves as a purpose of exposing a measure of progress to the outside world (wrt Flink). On a "fresh" startup of a job ("fresh" startup meaning that the execution of the job is not an automatic restore from previous failure - a manual restart of a job is a fresh startup), the Kafka consumer respects any existing offsets committed in ZK as starting points. So, if I am correct, what is actually happening is that, on your second execution of the job, the Kafka consumer is simply just starting from the offsets it finds in ZK. If you want exactly-once for manual job restarts, you would use Flink savepoints. See https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html for more detail. Otherwise, the exactly-once guarantee refers to job automatic restores across job failures.
          Hide
          melmoth static-max added a comment - - edited

          Hi Gordon,

          what if I update/change the job and re-submit it? Why can't the correct offset from ZK be used for that?
          It seems rather easy to fix by incrementing the offset by one, or am I missing a point? I don't want to use the commandline or add additional de-duplication in my downstream apps. That would make it very complex for my usecase.
          Or am I using Flink wrong?

          Show
          melmoth static-max added a comment - - edited Hi Gordon, what if I update/change the job and re-submit it? Why can't the correct offset from ZK be used for that? It seems rather easy to fix by incrementing the offset by one, or am I missing a point? I don't want to use the commandline or add additional de-duplication in my downstream apps. That would make it very complex for my usecase. Or am I using Flink wrong?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          I'm answering your questions in SO, so that we can keep the discussion over there

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - I'm answering your questions in SO, so that we can keep the discussion over there
          Hide
          melmoth static-max added a comment -

          No bug, use savepoints before canceling a task.
          See Gordons comments.

          Show
          melmoth static-max added a comment - No bug, use savepoints before canceling a task. See Gordons comments.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Hi static-max,
          I've revisited this issue, and I think this may actually be a bug.
          Like what you mentioned in SO, the Kafka consumer is either committing 1 less offset back to ZK, or that it's supposed to start from the next record when starting from offsets found in ZK.
          Note that this still has nothing to do with Flink's exactly-once guarantee though; this is simply that the consumer isn't starting at the right place when starting with offsets found from ZK. The savepoint / exactly-once descriptions I described before still holds. I'm sorry for making a wrong conclusion on this bug in the first place.

          I'll update information in this ticket once I confirm the bug!

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Hi static-max , I've revisited this issue, and I think this may actually be a bug. Like what you mentioned in SO, the Kafka consumer is either committing 1 less offset back to ZK, or that it's supposed to start from the next record when starting from offsets found in ZK. Note that this still has nothing to do with Flink's exactly-once guarantee though; this is simply that the consumer isn't starting at the right place when starting with offsets found from ZK. The savepoint / exactly-once descriptions I described before still holds. I'm sorry for making a wrong conclusion on this bug in the first place. I'll update information in this ticket once I confirm the bug!
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          I've confirmed that the problem only exists in the 0.9 consumer.
          static-max and Matthew Barlocker, since you were the original reporters of the bug, would any one of you want to work on fixing it and open a PR? You can ping me to help review when you're ready
          Otherwise I can also pick it up. In any case, let me know

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited I've confirmed that the problem only exists in the 0.9 consumer. static-max and Matthew Barlocker , since you were the original reporters of the bug, would any one of you want to work on fixing it and open a PR? You can ping me to help review when you're ready Otherwise I can also pick it up. In any case, let me know
          Hide
          mbarlocker Matthew Barlocker added a comment -

          Nice find static-max and Tzu-Li (Gordon) Tai - I don't know the code well enough to feel confident making the change or testing the fix. Please feel free. I started learning Flink about a week ago, and have put a solid 3 hours into it.

          Show
          mbarlocker Matthew Barlocker added a comment - Nice find static-max and Tzu-Li (Gordon) Tai - I don't know the code well enough to feel confident making the change or testing the fix. Please feel free. I started learning Flink about a week ago, and have put a solid 3 hours into it.
          Hide
          melmoth static-max added a comment -

          I will try to fix the bug, but I'm also a Flink newbie .

          Show
          melmoth static-max added a comment - I will try to fix the bug, but I'm also a Flink newbie .
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          static-max Great to hear, thanks for picking up the issue!
          I've been working on the Kafka connector recently and am quite familiar with the code, so if you need any input or help, feel free to just tag / ping me

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - static-max Great to hear, thanks for picking up the issue! I've been working on the Kafka connector recently and am quite familiar with the code, so if you need any input or help, feel free to just tag / ping me
          Hide
          melmoth static-max added a comment -

          The offset handling seems to be done by the Kafka client library outside of Flink (org.apache.kafka.clients.consumer.KafkaConsumer<K, V>). The commited offset seems to be correct, AFAIK the last read offset gets commited to ZK.

          However, the 0.9 consumer seems to have a workaround in the run() method that increments the offset by one, but that code does not get called as partition.isOffsetDefined() returns false.

          (line 194):

          // seek the consumer to the initial offsets
          for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
          	if (partition.isOffsetDefined()) {
          		consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
          	}
          }
          

          I don't think the Kafka library is broken, any ideas?

          Show
          melmoth static-max added a comment - The offset handling seems to be done by the Kafka client library outside of Flink (org.apache.kafka.clients.consumer.KafkaConsumer<K, V>). The commited offset seems to be correct, AFAIK the last read offset gets commited to ZK. However, the 0.9 consumer seems to have a workaround in the run() method that increments the offset by one, but that code does not get called as partition.isOffsetDefined() returns false. (line 194): // seek the consumer to the initial offsets for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { if (partition.isOffsetDefined()) { consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); } } I don't think the Kafka library is broken, any ideas?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          partition.isOffsetDefined() in this part of the code only returns true when the job is started from a checkpoint / savepoint.
          The bug here seems to be that on a fresh startup (not from checkpoint/savepoint) and the consumer uses offsets committed in ZK, it's not starting from the next unread record.
          AFAIK, I think the 0.9 KafkaConsumer handles fetching offsets from ZK and using them as starting points as part of the client library. So, on a fresh startup, consumer.seek(...) isn't called at all, and it seems like we're simply letting the KafkaConsumer client do the work.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - partition.isOffsetDefined() in this part of the code only returns true when the job is started from a checkpoint / savepoint. The bug here seems to be that on a fresh startup (not from checkpoint/savepoint) and the consumer uses offsets committed in ZK, it's not starting from the next unread record. AFAIK, I think the 0.9 KafkaConsumer handles fetching offsets from ZK and using them as starting points as part of the client library. So, on a fresh startup, consumer.seek(...) isn't called at all, and it seems like we're simply letting the KafkaConsumer client do the work.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          I'm not entirely sure of whether the KafkaConsumer starts "at" or "after" the found offsets in ZK though. Correctly, it should be "after". Perhaps something works as unexpected here, and we need to workaround the behaviour?

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited I'm not entirely sure of whether the KafkaConsumer starts "at" or "after" the found offsets in ZK though. Correctly, it should be "after". Perhaps something works as unexpected here, and we need to workaround the behaviour?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Hi static-max,

          I just had a look at the Kafka 0.9 API, and it seems like when committing offsets using the new `KafkaConsumer` API, the correct value to commit back to Kafka is lastProcessedOffset + 1 (https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync(java.util.Map)).
          I believe correcting this should fix the issue Let me know if you bump into any other problems.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Hi static-max , I just had a look at the Kafka 0.9 API, and it seems like when committing offsets using the new `KafkaConsumer` API, the correct value to commit back to Kafka is lastProcessedOffset + 1 ( https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync(java.util.Map )). I believe correcting this should fix the issue Let me know if you bump into any other problems.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Hi static-max,

          I'd like to make sure this bug is fixed in the upcoming 1.1.3 bugfix release.
          Let me know if you'd like to continue working on this Otherwise I can pick it up soon.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Hi static-max , I'd like to make sure this bug is fixed in the upcoming 1.1.3 bugfix release. Let me know if you'd like to continue working on this Otherwise I can pick it up soon.
          Hide
          melmoth static-max added a comment - - edited

          Hi Tzu-Li (Gordon) Tai,

          I will give it a try this weekend.

          Show
          melmoth static-max added a comment - - edited Hi Tzu-Li (Gordon) Tai , I will give it a try this weekend.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user static-max opened a pull request:

          https://github.com/apache/flink/pull/2579

          FLINK-4618 FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka

          This PR addresses https://issues.apache.org/jira/browse/FLINK-4618, which causes the last message to be read again from Kafka after a fresh start of the job.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/static-max/flink flink-connector-kafka-0.9-fix-duplicate-messages

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2579.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2579


          commit 0b564203cdae3b21b00bb499b85feb799136e29b
          Author: static-max <max.kuklinski@live.de>
          Date: 2016-09-30T19:45:38Z

          Merge pull request #1 from apache/master

          Pull from origin

          commit 3618f5053e0ffb0ec1f789c56d878ed400e27056
          Author: Max Kuklinski <max.kuklinski@live.de>
          Date: 2016-09-30T21:03:30Z

          FLINK-4618 Incremented the commited offset by one to avoid duplicate read message.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user static-max opened a pull request: https://github.com/apache/flink/pull/2579 FLINK-4618 FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka This PR addresses https://issues.apache.org/jira/browse/FLINK-4618 , which causes the last message to be read again from Kafka after a fresh start of the job. You can merge this pull request into a Git repository by running: $ git pull https://github.com/static-max/flink flink-connector-kafka-0.9-fix-duplicate-messages Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2579.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2579 commit 0b564203cdae3b21b00bb499b85feb799136e29b Author: static-max <max.kuklinski@live.de> Date: 2016-09-30T19:45:38Z Merge pull request #1 from apache/master Pull from origin commit 3618f5053e0ffb0ec1f789c56d878ed400e27056 Author: Max Kuklinski <max.kuklinski@live.de> Date: 2016-09-30T21:03:30Z FLINK-4618 Incremented the commited offset by one to avoid duplicate read message.
          Hide
          melmoth static-max added a comment -
          Show
          melmoth static-max added a comment - I made a PR: https://github.com/apache/flink/pull/2579
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2579

          Thank you for working on this @static-max! Changes look good, will merge this.

          I'll also add an IT test when merging to ensure that the Kafka consumer is starting at the right place.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2579 Thank you for working on this @static-max! Changes look good, will merge this. I'll also add an IT test when merging to ensure that the Kafka consumer is starting at the right place.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2579

          Btw, just a small tip: the Flink community usually use git rebase on the current master before submitting PRs to reduce the unnecessary merge commit

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2579 Btw, just a small tip: the Flink community usually use git rebase on the current master before submitting PRs to reduce the unnecessary merge commit
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2579

          Merging ...

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2579 Merging ...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2579

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2579
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master via https://git-wip-us.apache.org/repos/asf/flink/commit/9dbd1e3f Resolved for 1.1.3 via https://git-wip-us.apache.org/repos/asf/flink/commit/400c49ca Thank you for your contribution static-max !

            People

            • Assignee:
              Unassigned
              Reporter:
              melmoth static-max
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development