Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4618

FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.2
    • Fix Version/s: 1.2.0, 1.1.3
    • Component/s: Kafka Connector
    • Labels:
      None
    • Environment:

      Flink 1.1.2
      Kafka Broker 0.10.0
      Hadoop 2.7.0

      Description

      *Original reported ticket title: Last kafka message gets consumed twice when restarting job*

      There seem to be an issue with the offset management in Flink. When a job is stopped and startet again, a message from the previous offset is read again.
      I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new consumer group and emitted one record.

      You can cleary see, that the consumer waits for a new record at offset 4848911, which is correct. After restarting, it consumes a record at 4848910, causing the record to be consumed more than once.

      I checked the offset with the Kafka CMD tools, the commited offset in zookeeper is 4848910.

      Here is my log output:

      10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating connection to node 2147482646 at hdp1:6667.
      10:29:24,225 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching committed offsets for partitions: [myTopic-0]
      10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient                        - Completed connection to node 2147482646
      10:29:24,234 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No committed offset for partition myTopic-0
      10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting offset for partition myTopic-0 to latest offset.
      10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Fetched offset 4848910 for partition myTopic-0
      10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848910
      10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848910
      10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848910
      
      -- Inserting a new event here
      
      10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Adding fetched record for partition myTopic-0 with offset 4848910 to buffered record list
      10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning fetched records at offset 4848910 for assigned partition myTopic-0 and update position to 4848911
      10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 6 @ 1473841823887
      10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 6 (in 96 ms)
      10:30:24,196 TRACE org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, metadata=''}} to Node(2147482646, hdp1, 6667)
      10:30:24,204 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed offset 4848910 for partition myTopic-0
      10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:30:48,057 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:2946
      
      -- Restarting job
      
      10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating connection to node 2147482646 at hdp1:6667.
      10:32:01,673 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching committed offsets for partitions: [myTopic-0]
      10:32:01,677 DEBUG org.apache.kafka.clients.NetworkClient                        - Completed connection to node 2147482646
      // See below! Shouldn't the offset be 4848911?
      10:32:01,682 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting offset for partition myTopic-0 to the committed offset 4848910
      10:32:01,683 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848910
      10:32:01,685 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating connection to node 1001 at hdp1:6667.
      10:32:01,687 DEBUG org.apache.kafka.clients.NetworkClient                        - Completed connection to node 1001
      // Here record 4848910 gets consumed again!
      10:32:01,707 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Adding fetched record for partition myTopic-0 with offset 4848910 to buffered record list
      10:32:01,708 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning fetched records at offset 4848910 for assigned partition myTopic-0 and update position to 4848911
      10:32:03,721 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:32:04,224 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:32:04,726 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Added fetch request for partition myTopic-0 at offset 4848911
      10:32:04,894 INFO  org.apache.flink.runtime.blob.BlobCache                       - Shutting down BlobCache
      10:32:04,903 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:3079
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                melmoth static-max
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: