Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-16807

camel-kafka - problem using two kafka connections in the same application

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.8.0
    • 3.7.6, 3.11.1, 3.12.0
    • camel-kafka
    • None
    • Unknown

    Description

      When using camel-kafka there will typically be many routes that produce to multiple Kafka topics with different schemas associated.

      When using the camel-kafka client with apicurio, explicit schema artifacts set on Kafka producers override each other with the same settings despite them having different schema artifacts set through the camel additionalProperties settings.

      This results in kafka client serialization exceptions stating a matching protobuf message is not found [1].

      The expected behavior is that the apicurio properties should be applied to each Kafka producer independently.

      A full example is available here: https://github.com/shuawest/apicurio-sandbox.

       

      String kdestA = KafkaUriBuilder.create(ProducerService.TOPIC_A)
                  .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}")
                  .appendProperty("clientId", "producerA")
                  .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName())
                  .appendProperty("maxRequestSize", "5242880")
                  .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}")
                  //.appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName())
                  //.appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName())
                  .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox")
                  .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "samplea")
                  .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true")
                  //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000")
                  .value();
          
              String kdestB = KafkaUriBuilder.create(ProducerService.TOPIC_B)
                  .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}")
                  .appendProperty("clientId", "producerB")
                  .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName())
                  .appendProperty("maxRequestSize", "5242880")
                  .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}")
                  //.appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName())
                  //.appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName())
                  .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox")
                  .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "sampleb")
                  .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true")
                  //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000")
                  .value();
          
              log.info("Kafka connection A: {}\n\n\n", kdestA);
              log.info("Kafka connection B: {}\n\n\n", kdestB);
      
              from("timer:producerTimerA?repeatCount=1000&delay=1s&period=1s")
                  .bean(this, "genA")
                  //.log("producer timer fired ${headers.genCount}:\n${body}");
                  .to(kdestA);
      
              from("timer:producerTimerB?repeatCount=1000&delay=1s&period=1s")
                  .bean(this, "genB")
                  //.log("producer timer fired ${headers.genCount}:\n${body}");
                  .to(kdestB);
      

      The root cause seems to be that the Kafka configuration is cloned for each new connector, but the clone does not make a copy of the additionalProperties field. The result is that all Kafka configuration instances share the same Map of additionalProperties.

      Tested in 3.8.0 but it looks like this problem exists in the latest current version as well.

      Attachments

        Issue Links

          Activity

            People

              acosentino Andrea Cosentino
              EricWittmann Eric Wittmann
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: