Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9459

MM2 sync topic config does not work

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.4.0
    • None
    • mirrormaker
    • None

    Description

      I have MM2 configured as follow:

      {
              "name": "mm2-from-1-to-2",
              "config": {
                "connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
                "topics":"foo",
                "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
                "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
                "sync.topic.configs.enabled":"true",
                "sync.topic.configs.interval.seconds": 60,
                "sync.topic.acls.enabled": "false",
                "replication.factor": 1,
                "offset-syncs.topic.replication.factor": 1,
                "heartbeats.topic.replication.factor": 1,
                "checkpoints.topic.replication.factor": 1,
      
                "target.cluster.alias":"dest",
                "target.cluster.bootstrap.servers":"dest.example.com:9092",
      
                "source.cluster.alias":"src",
                "source.cluster.bootstrap.servers":"src.example.com:9092",
      
                "tasks.max": 1}
      }
      

      Topic "foo" is configured with "cleanup.policy=compact". But after waiting for 15 minutes, I still don't see "src.foo" in the destination cluster has "cleanup.policy=compact".

      I had the connect node to run in TRACE level and I could not find any calls to describeConfigs (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L327). This implies it never actually get a list of topics that it needs to get topic configs from.

      And I am suspecting this code always return empty Set (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L214-L220):

          private Set<String> topicsBeingReplicated() {
              return knownTopicPartitions.stream()
                  .map(x -> x.topic())
                  .distinct()
                  .filter(x -> knownTargetTopics.contains(formatRemoteTopic(x)))
                  .collect(Collectors.toSet());
          }
      

      knownTopicPartitions contains topic-partitions from the source cluster.
      knownTargetTopics contains topic-partitions from the target cluster, whose topic names contain source alias already.

      So, why is topicsBeingReplicated (list of topic-partitions from source cluster) being filtered using knownTopicPartitions (list of topic-partitions from target cluster)?

      Attachments

        Activity

          People

            Unassigned Unassigned
            badai Badai Aqrandista
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: