Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-20864

camel-kafka - With confluent schema registry does not work properly.

    XMLWordPrintableJSON

Details

    • Unknown

    Description

      In confluent kafka, we can register the topic against schema validation from the schema registry. When configured the confluent document says either we should have a pojo object defined in the code that is used for serialization/deserialization OR a custom "ObjectNode" can be created from their schema utils. Attaching the document below

      https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-json.html#sending-a-jsonnode-payload 

       

      For our case, we have a different schema registered and can't have all the POJO defined as schema registered at run time, so we are using the below code to generate an object from the schema.

      Map<String, Object> config = new HashMap<>();
      config.put("basic.auth.credentials.source", "USER_INFO");
      config.put("basic.auth.user.info", "<secret-key>");
      config.put("auto.register.schemas", false);
      config.put("use.latest.version", true);
      
      CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient("<registry-url>", 10, config);
      String schemaDoc = registryClient.getLatestSchemaMetadata("topicTest-value").getSchema();
      JsonSchema schema = new JsonSchema(schemaDoc);
      
      ObjectMapper mapper = new ObjectMapper();
      JsonNode jsonNode = mapper.readTree("{\"myField1\":123,\"myField2\":123.32,\"myField3\":\"pqr\"}");
      ObjectNode envelope = JsonSchemaUtils.envelope(schema, jsonNode);
      
      from("timer://foo?fixedRate=true&period=60000")
              .setBody(ExpressionBuilder.constantExpression(envelope))
              .log("Sent new message")// Message to send
              .to(kafkaEndpoint); 

      If the "ObjectNode" payload is directly written using kafka-client library it works. But when written using camel component, The "KafkaProducer" in camel component does "isIterable" check and if true sends each value and this doesn't work for confluent kafka as the custom "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer" expects a whole object.

       

       The code in "

      io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer expects whole object.

       

      Basically, in simple words, The "envelope" object created is no longer the same object but is iterated and values are iterated and sent independently resulting in schema validation error.

      Attachments

        1. image-2024-06-12-12-51-58-966.png
          84 kB
          Kartik
        2. image-2024-06-12-11-06-33-915.png
          86 kB
          Kartik
        3. image-2024-06-12-11-05-12-616.png
          94 kB
          Kartik

        Issue Links

          Activity

            People

              davsclaus Claus Ibsen
              kartikvk1996 Kartik
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: