Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-19650

Camel Kafka doesn't honor 'workerPool' configuration

    XMLWordPrintableJSON

Details

    • Novice

    Description

      When we set the "workerPool" property in the Kafka configuration it's not honored and "KafkaProducer" falls back to creating a new thread pool with the default 10 threads.

       

      As mentioned in https://camel.apache.org/components/3.14.x/kafka-component.html#_component_options There is a component option to provide custom thread pool "workerPool (producer)"

       

      So in KafkaProducer.java 

       

      // code placeholder
      
          @Override
          @SuppressWarnings("rawtypes")
          protected void doStart() throws Exception {
              Properties props = getProps();
              if (kafkaProducer == null) {
                  createProducer(props);
              }
      
              // if we are in asynchronous mode we need a worker pool
              if (!configuration.isSynchronous() && workerPool == null) {
                  workerPool = endpoint.createProducerExecutor();
                  // we create a thread pool so we should also shut it down
                  shutdownWorkerPool = true;
              }
          }
      
      

      " if (!configuration.isSynchronous() && workerPool == null) { "

      we only check for synchronization, since the worker pool is "null" when created, it goes ahead and creates a new pool with default threads by calling "createProducerExecutor()"

       

      Ideally, we should check if "configuration.getWorkerPool()" and if it's not null, we should assign this to the worker pool instead of creating the new one.

       

      Fix.

       

      // code placeholder
      
      protected void doStart() throws Exception {
          Properties props = this.getProps();
          if (this.kafkaProducer == null) {
              this.createProducer(props);
          }
      
          if (!this.configuration.isSynchronous() && this.workerPool == null) {
              if (this.configuration.getWorkerPool() != null) {
                  this.workerPool = this.configuration.getWorkerPool();
                  this.shutdownWorkerPool = false;
              } else {
                  this.workerPool = this.endpoint.createProducerExecutor();
                  this.shutdownWorkerPool = true;
              }
          }
      
      } 

       

       

       

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            kartikvk1996 Kartik
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: