Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.8.0
-
None
-
Unknown
Description
When using camel-kafka there will typically be many routes that produce to multiple Kafka topics with different schemas associated.
When using the camel-kafka client with apicurio, explicit schema artifacts set on Kafka producers override each other with the same settings despite them having different schema artifacts set through the camel additionalProperties settings.
This results in kafka client serialization exceptions stating a matching protobuf message is not found [1].
The expected behavior is that the apicurio properties should be applied to each Kafka producer independently.
A full example is available here: https://github.com/shuawest/apicurio-sandbox.
String kdestA = KafkaUriBuilder.create(ProducerService.TOPIC_A) .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}") .appendProperty("clientId", "producerA") .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName()) .appendProperty("maxRequestSize", "5242880") .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}") //.appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName()) //.appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName()) .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox") .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "samplea") .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true") //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000") .value(); String kdestB = KafkaUriBuilder.create(ProducerService.TOPIC_B) .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}") .appendProperty("clientId", "producerB") .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName()) .appendProperty("maxRequestSize", "5242880") .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}") //.appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName()) //.appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName()) .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox") .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "sampleb") .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true") //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000") .value(); log.info("Kafka connection A: {}\n\n\n", kdestA); log.info("Kafka connection B: {}\n\n\n", kdestB); from("timer:producerTimerA?repeatCount=1000&delay=1s&period=1s") .bean(this, "genA") //.log("producer timer fired ${headers.genCount}:\n${body}"); .to(kdestA); from("timer:producerTimerB?repeatCount=1000&delay=1s&period=1s") .bean(this, "genB") //.log("producer timer fired ${headers.genCount}:\n${body}"); .to(kdestB);
The root cause seems to be that the Kafka configuration is cloned for each new connector, but the clone does not make a copy of the additionalProperties field. The result is that all Kafka configuration instances share the same Map of additionalProperties.
Tested in 3.8.0 but it looks like this problem exists in the latest current version as well.
Attachments
Issue Links
- links to