The camel-kafka component instantiates Kafka client instances (e.g., KafkaConsumer and KafkaProducer) directly. This prevents users to leverage their own custom implementations for them. Kafka Streams library solved this issue a long time ago (see https://issues.apache.org/jira/browse/KAFKA-3616) adding the ability to pass a factory of Kafka clients to the Streams processor.
Steps for implementation:
- Add a new `KafkaClientFactory` interface with methods to get instances of Kafka's `KafkaConsumer` and `KafkaProducer`.
- Add a default implementation `DefaultKafkaClientFactory` that instantiates both as they are at the moment inside the Camel's `KafkaConsumer` and `KafkaProducer` (simply calling `new` with a `Properties` object).
- Add a new 'KafkaClientFactory' configuration option that defaults to `DefaultKafkaClientFactory`.
- Use this factory inside Camel's `KafkaConsumer` and `KafkaProducer` (instead of doing a `new` directly).