Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3616

Make kafka producers/consumers injectable for KafkaStreams

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments


    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s:
    • Fix Version/s:
    • Component/s: streams
    • Labels:


      While playing with Kafka Streams I found that there's some cases that we want to take control of kafka producer/consumers instantiation inside the StreamThread.

      Most significant case is that we have our own impl of kafka Producer which was built to provide much reliable message delivery but there's no way to inject our own instance into StreamTask ATM.
      Another example is that we wanna observe the result of producer.send() that is done inside the RecordCollector. We can provide our own Callback instance but there's no way to inject that callback to RecordCollector again.

      Here I'd like to suggest KafkaStreams giving an interface to inject these clients. I considered various approaches to do this like passing them through constructor or make instantiation methods overridable but eventually tried to simply intorude another argument to the KafkaStreams constructor which is responsible for supplying client instances.

      Incomplete PR will be filled up to show changeset in my mind, so please give me feedbacks. Will follow-up PR quickly if I get positive feedback.


        Issue Links



            • Assignee:
              kawamuray Yuto Kawamura
              kawamuray Yuto Kawamura
              Guozhang Wang


              • Created:

                Issue deployment