Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.6.1
-
None
-
None
-
Mirrormaker2 version 3.6.1 running on docker containers
Description
I am running Mirrormaker2 with the following configuration:
clusters = fallingwaterfall, weatheredbase sync.group.offsets.interval.seconds=30 emit.checkpoints.interval.seconds=30 offset.lag.max=0 fallingwaterfall->weatheredbase.enabled = true weatheredbase->fallingwaterfall.enabled = false sync.group.offsets.enabled=true emit.heartbeats.enabled=true emit.checkpoints.enabled=true emit.checkpoints.interval.seconds=30 refresh.groups.enabled=true refresh.groups.interval.seconds=30 refresh.topics.enabled=true sync.topic.configs.enabled=true refresh.topics.interval.seconds=30 sync.topic.acls.enabled = false fallingwaterfall->weatheredbase.topics = storage-demo-.* fallingwaterfall->weatheredbase.groups = storage-demo-.* group.id=mirror-maker-fallingwaterfall-weatheredbase consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase fallingwaterfall.consumer.isolation.level = read_committed weatheredbase.producer.enable.idempotence = true weatheredbase.producer.acks=all weatheredbase.exactly.once.source.support = enabled replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
I am experiencing issues with the consumer group offset synchronisation.
I have a setup with a 12-partition topic, named storage-demo-test, a single transactional producer to this topic and a consumer group, named storage-demo-test-cg, consuming from it.
The consumer configuration is:
'auto.offset.reset': 'earliest', 'isolation.level': 'read_committed', 'enable.auto.commit': False,
and I'm committing the offsets explicitly and synchronously after each poll.
What I observed is that the synchronised offsets between the upstream and downstream cluster for the storage-demo-test-cg are often wrong.
For example in the case of this checkpoint:
(1, 1708505669764) - 6252 - CheckpointKey(consumer_group='storage-demo-test-cg', topic='storage-demo-test', partition=5) - CheckpointValue(upstream_offset=197532, downstream_offset=196300)
We have a mismatch in the replicated messages:
[fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1 Test message 1027-0
[weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1 Test message 1015-9
In the Mirrormaker2 logs I see many of these messages:
mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 09:02:18,534] TRACE [MirrorCheckpointConnector|task-0] latestDownstreamOffset 196300 is larger than or equal to convertedUpstreamOffset 196300 for TopicPartition storage-demo-test-5 (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337) mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=196913, downstreamOffset=195683}) (org.apache.kafka.connect.mirror.OffsetSyncStore:160) mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 09:02:01,557] TRACE [MirrorCheckpointConnector|task-0] Skipping Checkpoint{consumerGroupId=storage-demo-test-cg, topicPartition=storage-demo-test-5, upstreamOffset=197532, downstreamOffset=195684, metadata=} (preventing downstream rewind) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218) mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 08:59:00,859] TRACE [MirrorCheckpointConnector|task-0] Skipping Checkpoint{consumerGroupId=storage-demo-test-cg, topicPartition=storage-demo-test-5, upstreamOffset=197532, downstreamOffset=195684, metadata=} (preventing downstream rewind) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218) mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 08:59:00,859] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=196913, downstreamOffset=195683}) (org.apache.kafka.connect.mirror.OffsetSyncStore:160) mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 08:58:40,812] TRACE [MirrorCheckpointConnector|task-0] New sync OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=198765, downstreamOffset=197535} applied, new state is [198765:197535,198764:197534,198762:197532,198761:197531,198753:197523,198739:197509,198717:197487,198673:197443,198585:197355,198497:197267,198321:197091,197617:196387,196913:195683,194098:192868] (org.apache.kafka.connect.mirror.OffsetSyncStore:193) mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 08:54:05,030] TRACE [MirrorCheckpointConnector|task-0] New sync OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=197532, downstreamOffset=196302} applied, new state is [197532:196302,197530:196300,197529:196299,197521:196291,197507:196277,197485:196255,197441:196211,197353:196123,197265:196035,196913:195683,196209:194979,195505:194275,194098:192868] (org.apache.kafka.connect.mirror.OffsetSyncStore:193) mirrormaker2-fallingwaterfall-weatheredbase-0 - mirrormaker2-server - [2024-02-21 08:54:05,030] TRACE [MirrorSourceConnector|task-0] Sync'd offsets for storage-demo-test-5: 197532==196302 (org.apache.kafka.connect.mirror.MirrorSourceTask:251)
And looking in the OffsetSync topic, I see the correct value for the offset sync:
(1, 1708505645010) - 3945070 - OffsetSyncKey(topic='storage-demo-test', partition=5) - OffsetSyncValue(upstream_offset=197532, downstream_offset=196302)
[weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196302 -c 1 Test message 1027-0
So it seems that the offset conversions and checkpoints produced in the MirrorCheckpointTask are not matching the information committed to the OffsetSync topic by the MirrorSourceTask.
Please let me know if you need additional info about the setup I'm running or collecting more logs.
Thanks!
Attachments
Attachments
Issue Links
- is related to
-
KAFKA-12468 Initial offsets are copied from source to target cluster
- Resolved
- relates to
-
KAFKA-16364 MM2 High-Resolution Offset Translation
- Open