Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12468

Initial offsets are copied from source to target cluster

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.7.0
    • 3.5.0, 3.4.1, 3.3.3
    • mirrormaker
    • None

    Description

      We have an active-passive setup where  the 3 connectors from mirror maker 2 (heartbeat, checkpoint and source) are running on a dedicated Kafka connect cluster on the target cluster.

      Offset syncing is enabled as specified by KIP-545. But when activated, it seems the offsets from the source cluster are initially copied to the target cluster without translation. This causes a negative lag for all synced consumer groups. Only when we reset the offsets for each topic/partition on the target cluster and produce a record on the topic/partition in the source, the sync starts working correctly. 

      I would expect that the consumer groups are synced but that the current offsets of the source cluster are not copied to the target cluster.

      This is the configuration we are currently using:

      Heartbeat connector

       

      {
        "name": "mm2-mirror-heartbeat",
        "config": {
          "name": "mm2-mirror-heartbeat",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
          "source.cluster.alias": "eventador",
          "target.cluster.alias": "msk",
          "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
          "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
          "topics": ".*",
          "groups": ".*",
          "tasks.max": "1",
          "replication.policy.class": "CustomReplicationPolicy",
          "sync.group.offsets.enabled": "true",
          "sync.group.offsets.interval.seconds": "5",
          "emit.checkpoints.enabled": "true",
          "emit.checkpoints.interval.seconds": "30",
          "emit.heartbeats.interval.seconds": "30",
          "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
        }
      }
      

      Checkpoint connector:

      {
        "name": "mm2-mirror-checkpoint",
        "config": {
          "name": "mm2-mirror-checkpoint",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
          "source.cluster.alias": "eventador",
          "target.cluster.alias": "msk",
          "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
          "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
          "topics": ".*",
          "groups": ".*",
          "tasks.max": "40",
          "replication.policy.class": "CustomReplicationPolicy",
          "sync.group.offsets.enabled": "true",
          "sync.group.offsets.interval.seconds": "5",
          "emit.checkpoints.enabled": "true",
          "emit.checkpoints.interval.seconds": "30",
          "emit.heartbeats.interval.seconds": "30",
          "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
        }
      }
      

       Source connector:

      {
        "name": "mm2-mirror-source",
        "config": {
          "name": "mm2-mirror-source",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
          "source.cluster.alias": "eventador",
          "target.cluster.alias": "msk",
          "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
          "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
          "topics": ".*",
          "groups": ".*",
          "tasks.max": "40",
          "replication.policy.class": "CustomReplicationPolicy",
          "sync.group.offsets.enabled": "true",
          "sync.group.offsets.interval.seconds": "5",
          "emit.checkpoints.enabled": "true",
          "emit.checkpoints.interval.seconds": "30",
          "emit.heartbeats.interval.seconds": "30",
          "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
        }
      }
      

       

      Attachments

        Issue Links

          Activity

            People

              gharris1727 Greg Harris
              bdeneuter Bart De Neuter
              Votes:
              3 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: