Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16291

Mirrormaker2 wrong checkpoints

Agile BoardAttach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.6.1
    • None
    • mirrormaker
    • None
    • Mirrormaker2 version 3.6.1 running on docker containers

    Description

      I am running Mirrormaker2 with the following configuration:

      clusters = fallingwaterfall, weatheredbase
      
      sync.group.offsets.interval.seconds=30
      emit.checkpoints.interval.seconds=30
      offset.lag.max=0
      
      
      fallingwaterfall->weatheredbase.enabled = true
      weatheredbase->fallingwaterfall.enabled = false
      
      
      sync.group.offsets.enabled=true
      emit.heartbeats.enabled=true
      emit.checkpoints.enabled=true
      emit.checkpoints.interval.seconds=30
      refresh.groups.enabled=true
      refresh.groups.interval.seconds=30
      refresh.topics.enabled=true
      sync.topic.configs.enabled=true
      refresh.topics.interval.seconds=30
      sync.topic.acls.enabled = false
      
      
      fallingwaterfall->weatheredbase.topics = storage-demo-.*
      fallingwaterfall->weatheredbase.groups = storage-demo-.*
      group.id=mirror-maker-fallingwaterfall-weatheredbase
      consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase
      
      
      fallingwaterfall.consumer.isolation.level = read_committed
      
      weatheredbase.producer.enable.idempotence = true
      weatheredbase.producer.acks=all
      weatheredbase.exactly.once.source.support = enabled
      
      replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
      
      

      I am experiencing issues with the consumer group offset synchronisation.

      I have a setup with a 12-partition topic, named storage-demo-test, a single transactional producer to this topic and a consumer group, named storage-demo-test-cg, consuming from it.

      The consumer configuration is:

      'auto.offset.reset': 'earliest',
      'isolation.level': 'read_committed',
      'enable.auto.commit': False, 

      and I'm committing the offsets explicitly and synchronously after each poll.

      What I observed is that the synchronised offsets between the upstream and downstream cluster for the storage-demo-test-cg are often wrong.
      For example in the case of this checkpoint:

      (1, 1708505669764) - 6252 - CheckpointKey(consumer_group='storage-demo-test-cg', topic='storage-demo-test', partition=5) - CheckpointValue(upstream_offset=197532, downstream_offset=196300) 

      We have a mismatch in the replicated messages:

      [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1
      Test message 1027-0 
      [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1
      Test message 1015-9 

      In the Mirrormaker2 logs I see many of these messages:

      mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 09:02:18,534] TRACE [MirrorCheckpointConnector|task-0] latestDownstreamOffset 196300 is larger than or equal to convertedUpstreamOffset 196300 for TopicPartition storage-demo-test-5 (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)
      
      mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=196913, downstreamOffset=195683}) (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
      
      mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 09:02:01,557] TRACE [MirrorCheckpointConnector|task-0] Skipping Checkpoint{consumerGroupId=storage-demo-test-cg, topicPartition=storage-demo-test-5, upstreamOffset=197532, downstreamOffset=195684, metadata=} (preventing downstream rewind) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
      
      mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 08:59:00,859] TRACE [MirrorCheckpointConnector|task-0] Skipping Checkpoint{consumerGroupId=storage-demo-test-cg, topicPartition=storage-demo-test-5, upstreamOffset=197532, downstreamOffset=195684, metadata=} (preventing downstream rewind) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
      
      mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 08:59:00,859] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=196913, downstreamOffset=195683}) (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
      
      mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 08:58:40,812] TRACE [MirrorCheckpointConnector|task-0] New sync OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=198765, downstreamOffset=197535} applied, new state is [198765:197535,198764:197534,198762:197532,198761:197531,198753:197523,198739:197509,198717:197487,198673:197443,198585:197355,198497:197267,198321:197091,197617:196387,196913:195683,194098:192868] (org.apache.kafka.connect.mirror.OffsetSyncStore:193)
      
      mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 08:54:05,030] TRACE [MirrorCheckpointConnector|task-0] New sync OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=197532, downstreamOffset=196302} applied, new state is [197532:196302,197530:196300,197529:196299,197521:196291,197507:196277,197485:196255,197441:196211,197353:196123,197265:196035,196913:195683,196209:194979,195505:194275,194098:192868] (org.apache.kafka.connect.mirror.OffsetSyncStore:193)
      
      mirrormaker2-fallingwaterfall-weatheredbase-0 - mirrormaker2-server - [2024-02-21 08:54:05,030] TRACE [MirrorSourceConnector|task-0] Sync'd offsets for storage-demo-test-5: 197532==196302 (org.apache.kafka.connect.mirror.MirrorSourceTask:251)

      And looking in the OffsetSync topic, I see the correct value for the offset sync:

      (1, 1708505645010) - 3945070 - OffsetSyncKey(topic='storage-demo-test', partition=5) - OffsetSyncValue(upstream_offset=197532, downstream_offset=196302)
       
      [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196302 -c 1
      Test message 1027-0 

      So it seems that the offset conversions and checkpoints produced in the MirrorCheckpointTask are not matching the information committed to the OffsetSync topic by the MirrorSourceTask.
      Please let me know if you need additional info about the setup I'm running or collecting more logs.

      Thanks!

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            claudio.benfatto Claudio Benfatto

            Dates

              Created:
              Updated:

              Slack

                Issue deployment