Kafka
  1. Kafka
  2. KAFKA-364 Consumer re-design
  3. KAFKA-170

Support for non-blocking polling on multiple streams

    Details

    • Type: Sub-task Sub-task
    • Status: Open
    • Priority: Critical Critical
    • Resolution: Unresolved
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:

      Description

      Currently we provide a blocking iterator in the consumer. This is a good mechanism for consuming data from a single topic, but is limited as a mechanism for polling multiple streams.

      For example if one wants to implement a non-blocking union across multiple streams this is hard to do because calls may block indefinitely. A similar situation arrises if trying to implement a streaming join of between two streams.

      I would propose two changes:
      1. Implement a next(timeout) interface on KafkaMessageStream. This will easily handle some simple cases with minimal change. This handles certain limited cases nicely and is easy to implement, but doesn't actually cover the two cases above.
      2. Add an interface to poll streams.

      I don't know the best approach for the later api, but it is important to get it right. One option would be to add a ConsumerConnector.drainTopics("topic1", "topic2", ...) which blocks until there is at least one message and then returns a list of triples (topic, partition, message).

        Activity

        Hide
        Taylor Gautier added a comment -

        Hi - I solved this problem in the NodeJS client here: https://github.com/tagged/node-kafka

        You may or may not like the approach - but at least you can see which solution I went with. Node of course is an event based system so it's more natural to use callbacks which may not necessarily be appropriate for Java.

        Show
        Taylor Gautier added a comment - Hi - I solved this problem in the NodeJS client here: https://github.com/tagged/node-kafka You may or may not like the approach - but at least you can see which solution I went with. Node of course is an event based system so it's more natural to use callbacks which may not necessarily be appropriate for Java.
        Hide
        Jun Rao added a comment -

        1. KafkaMessageStream currently does have a timeout controlled by consumer.timeout.ms. If the next call on the iterator doesn't get any message within that time, an exception is thrown during the next call. This is probably not what you want since the iterator may never timeout if one topic has new messages constantly.

        2. Do we really need to return the triple? Do users care about the topic/partition that a messages comes from or do they just want to simply consumer messages from all topics? If they don't, we can probably implement a special fetcher that put messages from different topics into a shared in-memory queue for the end user to iterate. The interface for KafkaMessageStream may not need to be changed.

        Taylor,

        Could you elaborate your approach a bit more here?

        Show
        Jun Rao added a comment - 1. KafkaMessageStream currently does have a timeout controlled by consumer.timeout.ms. If the next call on the iterator doesn't get any message within that time, an exception is thrown during the next call. This is probably not what you want since the iterator may never timeout if one topic has new messages constantly. 2. Do we really need to return the triple? Do users care about the topic/partition that a messages comes from or do they just want to simply consumer messages from all topics? If they don't, we can probably implement a special fetcher that put messages from different topics into a shared in-memory queue for the end user to iterate. The interface for KafkaMessageStream may not need to be changed. Taylor, Could you elaborate your approach a bit more here?
        Hide
        Chris Burroughs added a comment -

        > Do users care about the topic/partition

        I think yes. If we allow users to provide an arbitrary partitioner, the partitioning may be meaningful through their pipeline.

        Show
        Chris Burroughs added a comment - > Do users care about the topic/partition I think yes. If we allow users to provide an arbitrary partitioner, the partitioning may be meaningful through their pipeline.
        Hide
        Taylor Gautier added a comment - - edited

        Hi Jun.

        My approach works like this:

        1) There is a low-level API that was created by Marcus Westin. The low-level API contains a Client.js (the reader) and Producer.js (the writer). Client.js contains the following APIs:

        • connect(args)
        • fetchTopic(topicName, callback)
        • fetchOffsets(topicName, arg, callback)

        2) I created the Consumer.js implementation which extends the low-level Client.js and provides 2 additional APIs:

        • subscribeTopic(topicName, callback)
        • unsubscribeTopic(topicName, callback)

        It is simply the job of the Consumer.js to keep track of the topics that the caller has so far subscribed to, and whenever it polls for messages, it simply writes a request using the low-level API for every subscribed topic. So, if for example the caller has subscribed to A, B, C, then the Consumer will call the low-level API and request A, B, C. Note that order is not defined, but is important for the retreival.

        Next, Kafka will write responses for A, B, C, and the messages that arrive will then each be individually returned to the caller. The callback allows for its args to contain a topic name, so the callback that is passed can be the same callback for all topics. Once that is done, there is a configurable timeout. If that timeout has passed, then the Consumer simply sends a poll request for all the subscribed topics again. If the timeout has not passed, the Consumer waits for that timeout to pass and then polls for all subscribed topics.

        Show
        Taylor Gautier added a comment - - edited Hi Jun. My approach works like this: 1) There is a low-level API that was created by Marcus Westin. The low-level API contains a Client.js (the reader) and Producer.js (the writer). Client.js contains the following APIs: connect(args) fetchTopic(topicName, callback) fetchOffsets(topicName, arg, callback) 2) I created the Consumer.js implementation which extends the low-level Client.js and provides 2 additional APIs: subscribeTopic(topicName, callback) unsubscribeTopic(topicName, callback) It is simply the job of the Consumer.js to keep track of the topics that the caller has so far subscribed to, and whenever it polls for messages, it simply writes a request using the low-level API for every subscribed topic. So, if for example the caller has subscribed to A, B, C, then the Consumer will call the low-level API and request A, B, C. Note that order is not defined, but is important for the retreival. Next, Kafka will write responses for A, B, C, and the messages that arrive will then each be individually returned to the caller. The callback allows for its args to contain a topic name, so the callback that is passed can be the same callback for all topics. Once that is done, there is a configurable timeout. If that timeout has passed, then the Consumer simply sends a poll request for all the subscribed topics again. If the timeout has not passed, the Consumer waits for that timeout to pass and then polls for all subscribed topics.
        Hide
        Jun Rao added a comment -

        Taylor,

        I see. Yes, if you use the low-level API, you can implement this. However, there is other work you have to do. It would be good if there is a simple way to extend the high-level API to do the same thing.

        Show
        Jun Rao added a comment - Taylor, I see. Yes, if you use the low-level API, you can implement this. However, there is other work you have to do. It would be good if there is a simple way to extend the high-level API to do the same thing.
        Hide
        Neha Narkhede added a comment -

        Since this is proposing an API change, it will be good to think about this as part of 0.8

        Show
        Neha Narkhede added a comment - Since this is proposing an API change, it will be good to think about this as part of 0.8
        Hide
        Claude Mamo added a comment -

        I've been browsing for a non-blocking simple consumer library and I came across this issue. It so happens that I had modified the high-level consumer to use callbacks instead of blocking queues. I don't know if using non-blocking polling would be a better approach but here is a code example for those who are interested:

        val partitionSize = 4
        val topicCountMap = new util.HashMap[EventHandler[String, String], Integer]()
        val consumer = Consumer.create(...)
        val cb = (messageHolder: MessageAndMetadata[String, String]) => {
           println(messageHolder.message)
        }
        
        topicCountMap.put(new EventHandler("MyTopic", cb), partitionSize)
        consumer.createMessageStreams(topicCountMap, new StringDecoder(), new StringDecoder())
        

        The code is part of the Kafka Web Console project and it can be found here: https://github.com/claudemamo/kafka-web-console/tree/master/app/kafka.

        Show
        Claude Mamo added a comment - I've been browsing for a non-blocking simple consumer library and I came across this issue. It so happens that I had modified the high-level consumer to use callbacks instead of blocking queues. I don't know if using non-blocking polling would be a better approach but here is a code example for those who are interested: val partitionSize = 4 val topicCountMap = new util.HashMap[EventHandler[ String , String ], Integer ]() val consumer = Consumer.create(...) val cb = (messageHolder: MessageAndMetadata[ String , String ]) => { println(messageHolder.message) } topicCountMap.put( new EventHandler( "MyTopic" , cb), partitionSize) consumer.createMessageStreams(topicCountMap, new StringDecoder(), new StringDecoder()) The code is part of the Kafka Web Console project and it can be found here: https://github.com/claudemamo/kafka-web-console/tree/master/app/kafka .
        Hide
        Guozhang Wang added a comment -

        Claude, in 0.9 we are going to rewrite our consumer client and it will use non-blocking poll APIs, do you want to check its design and see if it satisfies your use case?

        https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

        Show
        Guozhang Wang added a comment - Claude, in 0.9 we are going to rewrite our consumer client and it will use non-blocking poll APIs, do you want to check its design and see if it satisfies your use case? https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

          People

          • Assignee:
            Unassigned
            Reporter:
            Jay Kreps
          • Votes:
            2 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:

              Development