Description
I found an issue while running MM2 using Kconnect Framework. Based on the configuration provided in this https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
source : localhost:9092, Alias : A
target : localhost:9093, Alias : B
Issue Faced:
Topics from the source cluster are created at target cluster but instead of producing data of source topics to target it is producing to source cluster topics and those topics are getting created at source because the allowAutoTopicCreation=true.
How the issue arrises:
Kconnect is running on source cluster and connectors are configured from A->B. Connect framework uses connect-distributed.properties to initialise producerConfig, consumerConfig. So, producer and consumer are initialised with source bootstrap servers i.e., with localhost:9092("source.cluster.producer.bootstrap.servers" : "localhost:9092”, "source.cluster.consumer.bootstrap.servers" : "localhost:9092”).
This configuration works perfectly fine with all traditional connectors because they produce and consume from that cluster itself. But MM2 connectors are contrary to this. Let's see this example
Ideally MM2 should work like this
A B
test-mm2 A.test-mm2
Indeed it is working like this and creating topics recursively. Please refer to the attachments.
A B
A.test-mm2 A.A.test-mm2
test-mm2 A.test-mm2
This applies to all the topics where data need to be copied. Suppose if it doesn't have any data it won't cause this issue.
Fixing the issue:
MirrorSourceConnector should produce to destination cluster topics while copying data. This issue can be fixed by overriding bootstrap.servers. To do this connector.client.config.override.policy should be set to "All" in connect-distributed.properties & (producer.override.bootstrap.servers, consumer.override.bootstrap.servers) should match with target.cluster.bootstrap.servers and this need to be provided in connectors payload.So, connectors now produce the data to target cluster.
A B
test-mm2 A.test-mm2
List of the topics in both cases: list_of_topics.txt
You can find the config in both cases here:
MirrorSourceConnector-config.json - Config that matches with MirrorConnectorConfig
MirrorSourceConnector-override-config.jsonConfig that works.
To imitate the scenario: Start the kconnect at source cluster, Configure connector with MirrorSourceConnector-config.json, create a topic and produce data to it. Check the list of topics in 5-10seconds.