Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
3.14.9
-
Java - 8
Camel 3.14.9
Kafka 3.x broker
-
Novice
Description
When we set the "workerPool" property in the Kafka configuration it's not honored and "KafkaProducer" falls back to creating a new thread pool with the default 10 threads.
As mentioned in https://camel.apache.org/components/3.14.x/kafka-component.html#_component_options There is a component option to provide custom thread pool "workerPool (producer)"
So in KafkaProducer.java
// code placeholder @Override @SuppressWarnings("rawtypes") protected void doStart() throws Exception { Properties props = getProps(); if (kafkaProducer == null) { createProducer(props); } // if we are in asynchronous mode we need a worker pool if (!configuration.isSynchronous() && workerPool == null) { workerPool = endpoint.createProducerExecutor(); // we create a thread pool so we should also shut it down shutdownWorkerPool = true; } }
" if (!configuration.isSynchronous() && workerPool == null) { "
we only check for synchronization, since the worker pool is "null" when created, it goes ahead and creates a new pool with default threads by calling "createProducerExecutor()"
Ideally, we should check if "configuration.getWorkerPool()" and if it's not null, we should assign this to the worker pool instead of creating the new one.
Fix.
// code placeholder protected void doStart() throws Exception { Properties props = this.getProps(); if (this.kafkaProducer == null) { this.createProducer(props); } if (!this.configuration.isSynchronous() && this.workerPool == null) { if (this.configuration.getWorkerPool() != null) { this.workerPool = this.configuration.getWorkerPool(); this.shutdownWorkerPool = false; } else { this.workerPool = this.endpoint.createProducerExecutor(); this.shutdownWorkerPool = true; } } }