Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
3.22.2
-
Confluent Kafka
Camel-3.22.x
Java 17
-
Unknown
Description
In confluent kafka, we can register the topic against schema validation from the schema registry. When configured the confluent document says either we should have a pojo object defined in the code that is used for serialization/deserialization OR a custom "ObjectNode" can be created from their schema utils. Attaching the document below
For our case, we have a different schema registered and can't have all the POJO defined as schema registered at run time, so we are using the below code to generate an object from the schema.
Map<String, Object> config = new HashMap<>(); config.put("basic.auth.credentials.source", "USER_INFO"); config.put("basic.auth.user.info", "<secret-key>"); config.put("auto.register.schemas", false); config.put("use.latest.version", true); CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient("<registry-url>", 10, config); String schemaDoc = registryClient.getLatestSchemaMetadata("topicTest-value").getSchema(); JsonSchema schema = new JsonSchema(schemaDoc); ObjectMapper mapper = new ObjectMapper(); JsonNode jsonNode = mapper.readTree("{\"myField1\":123,\"myField2\":123.32,\"myField3\":\"pqr\"}"); ObjectNode envelope = JsonSchemaUtils.envelope(schema, jsonNode); from("timer://foo?fixedRate=true&period=60000") .setBody(ExpressionBuilder.constantExpression(envelope)) .log("Sent new message")// Message to send .to(kafkaEndpoint);
If the "ObjectNode" payload is directly written using kafka-client library it works. But when written using camel component, The "KafkaProducer" in camel component does "isIterable" check and if true sends each value and this doesn't work for confluent kafka as the custom "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer" expects a whole object.
The code in "
io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer expects whole object.
Basically, in simple words, The "envelope" object created is no longer the same object but is iterated and values are iterated and sent independently resulting in schema validation error.
Attachments
Attachments
Issue Links
- links to