Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16798

Mirrormaker2 dedicated mode - sync.group.offsets.interval not working

Agile BoardAttach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Bug
    • 3.7.0
    • None
    • mirrormaker
    • None

    Description

      Single instance MirrorMaker2 in dedicated mode, active passive replication logic.

      sync.group.offsets.interval.seconds=2 configuration is enabled and active

      [root@xxxxx-xxxxxxxxx ~]# docker logs cc-mm 2>&1 -f | grep -i "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds                                 "
              auto.commit.interval.ms = 5000
              auto.commit.interval.ms = 5000
              auto.commit.interval.ms = 5000
              auto.commit.interval.ms = 5000
              auto.commit.interval.ms = 5000
              auto.commit.interval.ms = 5000
              sync.group.offsets.interval.seconds = 2
              sync.group.offsets.interval.seconds = 2
              auto.commit.interval.ms = 5000
              sync.group.offsets.interval.seconds = 2
              auto.commit.interval.ms = 5000
              auto.commit.interval.ms = 5000
              auto.commit.interval.ms = 5000
              auto.commit.interval.ms = 5000
              auto.commit.interval.ms = 5000
              auto.commit.interval.ms = 5000
              auto.commit.interval.ms = 5000
              auto.commit.interval.ms = 5000
              sync.group.offsets.interval.seconds = 2
              auto.commit.interval.ms = 5000
              auto.commit.interval.ms = 5000
              sync.group.offsets.interval.seconds = 2
              sync.group.offsets.interval.seconds = 2
              auto.commit.interval.ms = 5000
      

      but is not working, the commit of offsets happens always every 60 seconds as you can see in the logs

      [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:33:44,850] INFO [MirrorSourceConnector|task-0|offsets] WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:33:44,854] INFO [MirrorSourceConnector|task-1|offsets] WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:33:44,878] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:33:44,883] INFO [MirrorCheckpointConnector|task-0|offsets] WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:33:44,895] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:34:44,852] INFO [MirrorSourceConnector|task-0|offsets] WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:34:44,857] INFO [MirrorSourceConnector|task-1|offsets] WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:34:44,880] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:34:44,886] INFO [MirrorCheckpointConnector|task-0|offsets] WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:34:44,897] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:35:44,855] INFO [MirrorSourceConnector|task-0|offsets] WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:35:44,860] INFO [MirrorSourceConnector|task-1|offsets] WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:35:44,883] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:35:44,888] INFO [MirrorCheckpointConnector|task-0|offsets] WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:35:44,901] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:36:44,860] INFO [MirrorSourceConnector|task-0|offsets] WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:36:44,863] INFO [MirrorSourceConnector|task-1|offsets] WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:36:44,886] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:36:44,893] INFO [MirrorCheckpointConnector|task-0|offsets] WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:36:44,904] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:37:44,863] INFO [MirrorSourceConnector|task-0|offsets] WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:37:44,866] INFO [MirrorSourceConnector|task-1|offsets] WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:37:44,888] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:37:44,895] INFO [MirrorCheckpointConnector|task-0|offsets] WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:37:44,906] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:38:44,866] INFO [MirrorSourceConnector|task-0|offsets] WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:38:44,869] INFO [MirrorSourceConnector|task-1|offsets] WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:38:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:38:44,898] INFO [MirrorCheckpointConnector|task-0|offsets] WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:38:44,910] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:39:44,869] INFO [MirrorSourceConnector|task-0|offsets] WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:39:44,872] INFO [MirrorSourceConnector|task-1|offsets] WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:39:44,892] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:39:44,900] INFO [MirrorCheckpointConnector|task-0|offsets] WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      [2024-05-20 09:39:44,915] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
      

       

      MM2 Configuration:

       

      clusters = cl01,cl02
      cl01.bootstrap.servers = cl01-kafka-01:9092,cl01-kafka-02:9092,cl01-kafka-03:9092
      cl02.bootstrap.servers = cl02-kafka-01:9092,cl02-kafka-02:9092,cl02-kafka-03:9092
      
      ##############################
      ### First cluster config
      ##############################
      
      # source.cluster.alias = cl01
      # target.cluster.alias = cl02
      cl01->cl02.enabled=true
      cl01->cl02.enable.auto.commit=true
      cl01->cl02.sync.group.offsets.enabled=true
      cl01->cl02.refresh.groups.enabled=true
      cl01->cl02.refresh.topics.enabled=true
      cl01->cl02.sync.topics.enabled=true
      cl01->cl02.sync.topic.acls.enabled=false
      cl01->cl02.sync.acls.enabled=false
      cl01->cl02.emit.heartbeats.enabled=true
      cl01->cl02.emit.checkpoints.enabled=true
      cl01->cl02.sync.topic.configs.enabled=true
      cl01->cl02.emit.offset-syncs.enabled=true
      cl01.consumer.auto.offset.reset=earliest
      
      ##############################
      ### Second cluster (DR) config
      ##############################
      
      # source.cluster.alias = cl01
      # target.cluster.alias = cl02
      cl02->cl01.enabled=false
      cl02->cl01.enable.auto.commit=false
      cl02->cl01.emit.offset-syncs.enabled=false
      cl02->cl01.sync.group.offsets.enabled=false
      cl02->cl01.sync.topic.acls.enabled=false
      cl02->cl01.sync.acls.enabled=false
      cl02->cl01.refresh.groups.enabled=true
      cl02->cl01.refresh.topics.enabled=true
      cl02->cl01.sync.topics.enabled=true
      cl02->cl01.emit.heartbeats.enabled=true
      cl02->cl01.emit.checkpoints.enabled=true
      cl02->cl01.sync.topic.configs.enabled=true
      #cl02.consumer.auto.offset.reset=earliest
      
      ##############################
      ### CC Configs
      ##############################
      
      group.id=crosscluster-consumer-group
      client.id=crosscluster-clientgroups=.*
      #groups='.*cc-consumer-group.*'
      offset-syncs.topic.whitelist='cl.*\.test-topic'
      #topics='cl.*\.test-topic'
      topics.blacklist='[.*[\-\.]internal, .*\.replica, __.*]'
      key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
      value.converter = org.apache.kafka.connect.converters.ByteArrayConverteremit.heartbeats.interval.seconds=5
      emit.checkpoints.interval.seconds=5
      emit.offset-syncs.interval.seconds=2
      auto.commit.interval.seconds=5
      checkpoint.interval.seconds=5
      consumer.commit.interval.seconds=5
      offset.flush.interval.seconds=5
      sync.topics.interval.seconds=2
      sync.group.offsets.interval.seconds=2
      sync.acls.interval.seconds=5
      refresh.groups.interval.seconds=5
      refresh.topics.interval.seconds=5
      max.poll.interval.seconds=5
      heartbeats.topic.retention.hours=1
      checkpoints.topic.retention.hours=1
      offset.syncs.topic.retention.hours=1
      errors.log.enable=true
      errors.log.include.messages=true
      dedicated.mode.enable.internal.rest=true
      cluster.producer.enable.idempotence=true
      allow.auto.create.topics=truetasks.max=48
      replication.factor=1
      config.storage.replication.factor=1
      offset.storage.replication.factor=1
      status.storage.replication.factor=1
      offset-syncs.topic.replication.factor=3
      heartbeats.topic.replication.factor=1
      checkpoints.topic.replication.factor=1
      offsets.storage.replication.factor=1
      
      
      

       

      I see that offset.flush.interval.seconds is always 60000 whatever change I make in the MM2 properties file and II suspect that this is the culprit but needs some more investigation.

      docker logs cc-mm 2>&1 -f | grep -i "offset.flush.interval"
              offset.flush.interval.ms = 60000
              offset.flush.interval.ms = 60000
              offset.flush.interval.ms = 60000
              offset.flush.interval.ms = 60000
      

       
      The docker-compose service spec

        cc-mm:
          image: 'bitnami/kafka:3.7.0'
          container_name: cc-mm
          hostname: cc-mm
          ports:
            - "9912:8083"
          command: /opt/bitnami/kafka/bin/connect-mirror-maker.sh /opt/bitnami/kafka/config/mm2.properties
          environment:
            <<: *kafka-cc-env-common
          volumes:
            - ./files/configs-compose/jmx-exporter:/opt/jmx-exporter
            - ./files/configs-compose/mm2/single-cluster/cc-single-mm.properties:/opt/bitnami/kafka/config/mm2.properties
            - ./files/configs-compose/kafka/log4j.properties:/opt/bitnami/kafka/config/log4j.properties:ro
          networks:
            - kafka-1
            - kafka-2
          depends_on:
            cl01-kafka-01:
              condition: service_healthy
            cl01-kafka-02:
              condition: service_healthy
            cl01-kafka-03:
              condition: service_healthy
            cl02-kafka-01:
              condition: service_healthy
            cl02-kafka-02:
              condition: service_healthy
            cl02-kafka-03:
              condition: service_healthy
              

       

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            sektor.coder Thanos Athanasopoulos
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment