Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.4.0
-
None
-
None
Description
I have MM2 configured as follow:
{ "name": "mm2-from-1-to-2", "config": { "connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector", "topics":"foo", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "sync.topic.configs.enabled":"true", "sync.topic.configs.interval.seconds": 60, "sync.topic.acls.enabled": "false", "replication.factor": 1, "offset-syncs.topic.replication.factor": 1, "heartbeats.topic.replication.factor": 1, "checkpoints.topic.replication.factor": 1, "target.cluster.alias":"dest", "target.cluster.bootstrap.servers":"dest.example.com:9092", "source.cluster.alias":"src", "source.cluster.bootstrap.servers":"src.example.com:9092", "tasks.max": 1} }
Topic "foo" is configured with "cleanup.policy=compact". But after waiting for 15 minutes, I still don't see "src.foo" in the destination cluster has "cleanup.policy=compact".
I had the connect node to run in TRACE level and I could not find any calls to describeConfigs (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L327). This implies it never actually get a list of topics that it needs to get topic configs from.
And I am suspecting this code always return empty Set (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L214-L220):
private Set<String> topicsBeingReplicated() { return knownTopicPartitions.stream() .map(x -> x.topic()) .distinct() .filter(x -> knownTargetTopics.contains(formatRemoteTopic(x))) .collect(Collectors.toSet()); }
knownTopicPartitions contains topic-partitions from the source cluster.
knownTargetTopics contains topic-partitions from the target cluster, whose topic names contain source alias already.
So, why is topicsBeingReplicated (list of topic-partitions from source cluster) being filtered using knownTopicPartitions (list of topic-partitions from target cluster)?