Kafka
  1. Kafka
  2. KAFKA-48

Implement optional "long poll" support in fetch request

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Currently, the fetch request is non-blocking. If there is nothing on the broker for the consumer to retrieve, the broker simply returns an empty set to the consumer. This can be inefficient, if you want to ensure low-latency because you keep polling over and over. We should make a blocking version of the fetch request so that the fetch request is not returned until the broker has at least one message for the fetcher or some timeout passes.

      1. KAFKA-48.patch
        30 kB
        Jay Kreps
      2. KAFKA-48-v2.patch
        46 kB
        Jay Kreps
      3. KAFKA-48-v3.patch
        61 kB
        Jay Kreps
      4. kafka-48-v3-to-v4-changes.diff
        8 kB
        Jay Kreps
      5. KAFKA-48-v4.patch
        62 kB
        Jay Kreps

        Issue Links

          Activity

          Hide
          Jay Kreps added a comment -

          This latency issue is important for replication because the latency for the producer will now depend on the replication (fetching) on the followers. This means our current polling mechanism is not going to be good, because we have to either back off for a period of time to avoid busy waiting on the server. In addition with replication we need a similar ability to process requests asynchronously--we do not want to block any threads while waiting for acks from followers. This also breaks our simple request/response model.

          This is also important for the streaming use cases as they involve a large number of stacked topics, and hence the end-to-end latency is a multiple of the single-hop consumer latency.

          Fixing this is slightly tricky.

          The first thing I think we would need to do is move the execution of request handling out of the socket server threads. This would generally be a good thing to do anyway, as I/O currently blocks request handling for all sockets sharing a thread. This can add unnecessary latency.

          The design for this could be a request BlockingQueue that the SocketServer submits all requests to, and N response BlockingQueues, one for each socket thread. The request processing would happen in a separate threadpool that would feed off the request queue and send responses back to the response queue. For asynchronous requests, no response need be enqueued.

          The request handling would now be an ExecutorService with a fixed number of processes. Each process would poll the request queue, process any request it gets, and send back responses.

          Long poll requests from fetchers would either be handled immediately, or, if there is no available data, would add themselves to a list of watchers on the topic. When a request comes in on that topic, it would and responses for all watchers. The request would specify a max timeout after which the request would return empty, this could be implemented with a DelayQueue that was checked periodically for expired requests. A generalization of this would be to have the fetch request provide not only a max_wait_time but also a min_data_size, which would make the request block until the given number of bytes of data have accumulated. This would actually enable the opposite of simple long poll--instead of trying to minimize latency the fetcher would be able to ensure they got a good size chunk of data on each request to ensure good throughput and avoid lots of little requests each fetching only a few small messages.

          A similar mechanism would be possible for acknowledgements coming from followers. When a produce request occurs with a min_ack_count > 1, the request would go into a list of waiting requests for that topic/partition. When the ack request comes in from followers, we would check the waiting producers and add responses to the response queue for any newly unblocked request.

          I would like to do a round of refactoring on the SocketServer anyway, so let me know if anyone has comments on this before i go do anything too crazy. For example, I want someone else to validate the interaction with the replication design.

          Show
          Jay Kreps added a comment - This latency issue is important for replication because the latency for the producer will now depend on the replication (fetching) on the followers. This means our current polling mechanism is not going to be good, because we have to either back off for a period of time to avoid busy waiting on the server. In addition with replication we need a similar ability to process requests asynchronously--we do not want to block any threads while waiting for acks from followers. This also breaks our simple request/response model. This is also important for the streaming use cases as they involve a large number of stacked topics, and hence the end-to-end latency is a multiple of the single-hop consumer latency. Fixing this is slightly tricky. The first thing I think we would need to do is move the execution of request handling out of the socket server threads. This would generally be a good thing to do anyway, as I/O currently blocks request handling for all sockets sharing a thread. This can add unnecessary latency. The design for this could be a request BlockingQueue that the SocketServer submits all requests to, and N response BlockingQueues, one for each socket thread. The request processing would happen in a separate threadpool that would feed off the request queue and send responses back to the response queue. For asynchronous requests, no response need be enqueued. The request handling would now be an ExecutorService with a fixed number of processes. Each process would poll the request queue, process any request it gets, and send back responses. Long poll requests from fetchers would either be handled immediately, or, if there is no available data, would add themselves to a list of watchers on the topic. When a request comes in on that topic, it would and responses for all watchers. The request would specify a max timeout after which the request would return empty, this could be implemented with a DelayQueue that was checked periodically for expired requests. A generalization of this would be to have the fetch request provide not only a max_wait_time but also a min_data_size, which would make the request block until the given number of bytes of data have accumulated. This would actually enable the opposite of simple long poll--instead of trying to minimize latency the fetcher would be able to ensure they got a good size chunk of data on each request to ensure good throughput and avoid lots of little requests each fetching only a few small messages. A similar mechanism would be possible for acknowledgements coming from followers. When a produce request occurs with a min_ack_count > 1, the request would go into a list of waiting requests for that topic/partition. When the ack request comes in from followers, we would check the waiting producers and add responses to the response queue for any newly unblocked request. I would like to do a round of refactoring on the SocketServer anyway, so let me know if anyone has comments on this before i go do anything too crazy. For example, I want someone else to validate the interaction with the replication design.
          Hide
          Jun Rao added a comment -

          Jay, thanks for carefully thinking ahead. I agree that we will need to decouple the socket processor thread from the handler thread, to make it easy for producers to wait for acks from followers, and for the consumers to block until new data is produced. We probably need 1 request queue and 1 response queue per socket processor thread. That way, we can ensure that the response is always handled by the socket thread that has registered the needed socket key for the response.

          Show
          Jun Rao added a comment - Jay, thanks for carefully thinking ahead. I agree that we will need to decouple the socket processor thread from the handler thread, to make it easy for producers to wait for acks from followers, and for the consumers to block until new data is produced. We probably need 1 request queue and 1 response queue per socket processor thread. That way, we can ensure that the response is always handled by the socket thread that has registered the needed socket key for the response.
          Hide
          Jay Kreps added a comment -

          This is a draft patch that refactors the socket server to make requests and responses asynchronous. No need for a detailed review, it still needs a lot of cleanup, but I wanted to show people the idea in more detail.

          Show
          Jay Kreps added a comment - This is a draft patch that refactors the socket server to make requests and responses asynchronous. No need for a detailed review, it still needs a lot of cleanup, but I wanted to show people the idea in more detail.
          Hide
          Taylor Gautier added a comment -

          Hi - please keep in mind the use case where a consumer is interested in more than one topic.

          This feature if implemented only for one topic will not be useful for this use case - assuming it's infeasible to open multiple tcp connections.

          The first proposal I have is to allow the request to contain a list of topics. However, upon consideration, this would require the response to also be adjusted such that it would contain the name of the topic, otherwise it would be next to impossible to ascertain which topic the response corresponds to - well it could be done such that the response is returned in the same way as the request was requested, and for topics with no messages, an empty response is given, but this seems pretty bad from a network bandwidth standpoint.

          So my final proposal would be to introduce an epoll like request/response. The consumer would submit a request with a list of interested topics, and the response would be a topic and # of messages available on that topic when the topic(s) have messages.

          The advantage to this solution is that it would be entirely backward compatible, since you would simply introduce a new request/response pair and it would also allow the consumer to decide which topics to poll (or pull) from first, so that it could prioritize, if it wanted.

          Finally, I like the idea of allowing the consumer to specify a min # of messages required to trigger the poll, you might want to copy the pattern you already setup for log flushing, e.g. max time and/or min # of messages. So the request might look like:

          list-of : topic-name:min msgs:max time

          and the response might be:

          list-of : topic-name:# msgs available

          Show
          Taylor Gautier added a comment - Hi - please keep in mind the use case where a consumer is interested in more than one topic. This feature if implemented only for one topic will not be useful for this use case - assuming it's infeasible to open multiple tcp connections. The first proposal I have is to allow the request to contain a list of topics. However, upon consideration, this would require the response to also be adjusted such that it would contain the name of the topic, otherwise it would be next to impossible to ascertain which topic the response corresponds to - well it could be done such that the response is returned in the same way as the request was requested, and for topics with no messages, an empty response is given, but this seems pretty bad from a network bandwidth standpoint. So my final proposal would be to introduce an epoll like request/response. The consumer would submit a request with a list of interested topics, and the response would be a topic and # of messages available on that topic when the topic(s) have messages. The advantage to this solution is that it would be entirely backward compatible, since you would simply introduce a new request/response pair and it would also allow the consumer to decide which topics to poll (or pull) from first, so that it could prioritize, if it wanted. Finally, I like the idea of allowing the consumer to specify a min # of messages required to trigger the poll, you might want to copy the pattern you already setup for log flushing, e.g. max time and/or min # of messages. So the request might look like: list-of : topic-name:min msgs:max time and the response might be: list-of : topic-name:# msgs available
          Hide
          Jay Kreps added a comment -

          Yes, these are all good points. The work I have done so far just splits request processing into a separate thread pool and enables asynchronous handling. This is a fairly general thing we need for a few different use cases. Perhaps this should be broken into a separate JIRA.

          I have thought a little bit about how to do long poll, though. Logically what I want to do is make it possible to give a minimum byte size for the response and a maximum delay in ms; then have the server delay the response until we have at least min_bytes messages in the response OR we hit the maximum delay time. The goal is both to improve latency (by avoiding waiting in between poll requests), to reduce load on the server (by not polling), and to make it possible to improve throughput. If you set min_bytes = 0 or max_delay_ms = 0 you effectively get the current behavior. The throughput improvement comes if you set the min_bytes > 1; this would give a way to artificially increase the response size for requests to the topic (i.e. avoid fetching only a few messages at a time) while still giving hard latency guarantees. We have seen, the request size is one of the important things for network throughput.

          As you say, the only case to really consider is the multi-fetch case. The single topic fetch can just be seen as a special case of this. I think your first proposal is closer to what I had in mind. Having the response contain an empty message set for the topics that have no data has very little overhead since it is just positionally indexed, so it is like 4 bytes or something. I don't like doing a poll() style interface that just returns ready topics doesn't seem very useful to me because the only logical thing you can do is then initiate a fetch on those topics, right? So might as well just send back the data and have a single request type to worry about?

          One of the tricky questions for multifetch is what does the minimum byte size pertain to? A straight-forward implementation in the current system would be to add the min_bytes and timeout to the fetch request which would effectively bundle it up N times in the multi-fetch (currently multi-fetch is just N fetches glued together). This doesn't really make sense, though. Which of these minimum sizes would cause the single response to be sent? Would it be when all conditions were satisfied or when one was satisfied? I think the only thing that makes sense is to set these things at the request level. Ideally what I would like to do is remove the fetch request entirely because it is redundant and fix multi-fetch to have the following:
          [(topic1, partitions1), (topic2, partitions2),...], max_total_size, max_wait_ms
          This also fixes the weird thing in multifetch now where you have to specify the topic with each partition, so a request for 10 partitions on the same topic repeats the topic name 10 times. This is an invasive change, though, since it means request format changes.

          I am also not 100% sure how to implement the min_bytes parameter efficiently for multi-fetch. For the single fetch case it is pretty easy, the implementation would be to keep a sort of hybrid priority queue by timeout time (e.g. the unix timestamp at which we owe a response). When a fetch request came in we would try to service it immediately, and if we could meet its requirements we would immediately send a response. If we can't meet its min_bytes requirement then we would calculate the offset for that topic/partition at which the request would be unblocked (e.g. if the current offset is X and the min_bytes is M then the target size is X+M). We would insert new requests into this watchers list maintaining a sort by increasing target size. Each time a produce request is handled we would respond to all the watching requests whose target size is < then new offset, this would just require walking the list until we see a request with a target size greater than the current offset. All the newly unblocked requests would be added to the response queue. So this means the only work added to a produce request is the work of transferring newly unblocked requests to the response queue and at most we only need to examine one blocked request.

          The timeout could be implemented by keeping a priority queue of requests based on the unix timestamp of the latest allowable response (i.e. the ts the request came in, plus the max_wait_ms). We could add a background thread to remove items from this as their timeout occurs, and add them to the response queue with an empty response.

          For the multifetch case, things are harder to do efficiently. The timeouts can still work the same way. However the min_bytes is now over all the topics the request covers. The only way I can see to implement this is to keep a counter associated with each watcher, and have the watcher watch all the requested topics. But now on each produce request we need to increment ALL the watchers on the topic produced to.

          Dunno, maybe for practical numbers of blocked requests (a few hundred? a thousand?) this doesn't matter. Or maybe there is a more clever approach. Ideas welcome.

          Show
          Jay Kreps added a comment - Yes, these are all good points. The work I have done so far just splits request processing into a separate thread pool and enables asynchronous handling. This is a fairly general thing we need for a few different use cases. Perhaps this should be broken into a separate JIRA. I have thought a little bit about how to do long poll, though. Logically what I want to do is make it possible to give a minimum byte size for the response and a maximum delay in ms; then have the server delay the response until we have at least min_bytes messages in the response OR we hit the maximum delay time. The goal is both to improve latency (by avoiding waiting in between poll requests), to reduce load on the server (by not polling), and to make it possible to improve throughput. If you set min_bytes = 0 or max_delay_ms = 0 you effectively get the current behavior. The throughput improvement comes if you set the min_bytes > 1; this would give a way to artificially increase the response size for requests to the topic (i.e. avoid fetching only a few messages at a time) while still giving hard latency guarantees. We have seen, the request size is one of the important things for network throughput. As you say, the only case to really consider is the multi-fetch case. The single topic fetch can just be seen as a special case of this. I think your first proposal is closer to what I had in mind. Having the response contain an empty message set for the topics that have no data has very little overhead since it is just positionally indexed, so it is like 4 bytes or something. I don't like doing a poll() style interface that just returns ready topics doesn't seem very useful to me because the only logical thing you can do is then initiate a fetch on those topics, right? So might as well just send back the data and have a single request type to worry about? One of the tricky questions for multifetch is what does the minimum byte size pertain to? A straight-forward implementation in the current system would be to add the min_bytes and timeout to the fetch request which would effectively bundle it up N times in the multi-fetch (currently multi-fetch is just N fetches glued together). This doesn't really make sense, though. Which of these minimum sizes would cause the single response to be sent? Would it be when all conditions were satisfied or when one was satisfied? I think the only thing that makes sense is to set these things at the request level. Ideally what I would like to do is remove the fetch request entirely because it is redundant and fix multi-fetch to have the following: [(topic1, partitions1), (topic2, partitions2),...] , max_total_size, max_wait_ms This also fixes the weird thing in multifetch now where you have to specify the topic with each partition, so a request for 10 partitions on the same topic repeats the topic name 10 times. This is an invasive change, though, since it means request format changes. I am also not 100% sure how to implement the min_bytes parameter efficiently for multi-fetch. For the single fetch case it is pretty easy, the implementation would be to keep a sort of hybrid priority queue by timeout time (e.g. the unix timestamp at which we owe a response). When a fetch request came in we would try to service it immediately, and if we could meet its requirements we would immediately send a response. If we can't meet its min_bytes requirement then we would calculate the offset for that topic/partition at which the request would be unblocked (e.g. if the current offset is X and the min_bytes is M then the target size is X+M). We would insert new requests into this watchers list maintaining a sort by increasing target size. Each time a produce request is handled we would respond to all the watching requests whose target size is < then new offset, this would just require walking the list until we see a request with a target size greater than the current offset. All the newly unblocked requests would be added to the response queue. So this means the only work added to a produce request is the work of transferring newly unblocked requests to the response queue and at most we only need to examine one blocked request. The timeout could be implemented by keeping a priority queue of requests based on the unix timestamp of the latest allowable response (i.e. the ts the request came in, plus the max_wait_ms). We could add a background thread to remove items from this as their timeout occurs, and add them to the response queue with an empty response. For the multifetch case, things are harder to do efficiently. The timeouts can still work the same way. However the min_bytes is now over all the topics the request covers. The only way I can see to implement this is to keep a counter associated with each watcher, and have the watcher watch all the requested topics. But now on each produce request we need to increment ALL the watchers on the topic produced to. Dunno, maybe for practical numbers of blocked requests (a few hundred? a thousand?) this doesn't matter. Or maybe there is a more clever approach. Ideas welcome.
          Hide
          Taylor Gautier added a comment - - edited

          I can see how it would be reasonable to do the first approach. It does limit one use case I was considering, which is to allow the consumer to decide in which order to fetch the topics after the poll is triggered, however, this can be done at request time when the topics are requested.

          As you say, the response is 100% compatible, it's just the request that changes. Therefore it would make sense I think to go ahead and make a new request type that doesn't yet exist and then the current fetch request remains the same on the wire and the behavior of it is just a degenerate case of this new use case with delay and bytes set to 0.

          I think you might consider how useful is it to worry about user specified time/bytes? It will add a lot of complexity to your implementation, and frankly if I have just the ability to do a multi-fetch that will wait until something has arrived and send me whatever it has at the current moment that will be good enough. A minimum implementation should also probably provide a simple timeout that will respond with nothing if the timeout expires.

          I think the simple implementation by itself a huge win and you might consider – is that good enough?

          For me it is - I would prefer to get the simple thing in the short term and wait for the harder thing in the long-term.

          Show
          Taylor Gautier added a comment - - edited I can see how it would be reasonable to do the first approach. It does limit one use case I was considering, which is to allow the consumer to decide in which order to fetch the topics after the poll is triggered, however, this can be done at request time when the topics are requested. As you say, the response is 100% compatible, it's just the request that changes. Therefore it would make sense I think to go ahead and make a new request type that doesn't yet exist and then the current fetch request remains the same on the wire and the behavior of it is just a degenerate case of this new use case with delay and bytes set to 0. I think you might consider how useful is it to worry about user specified time/bytes? It will add a lot of complexity to your implementation, and frankly if I have just the ability to do a multi-fetch that will wait until something has arrived and send me whatever it has at the current moment that will be good enough. A minimum implementation should also probably provide a simple timeout that will respond with nothing if the timeout expires. I think the simple implementation by itself a huge win and you might consider – is that good enough? For me it is - I would prefer to get the simple thing in the short term and wait for the harder thing in the long-term.
          Hide
          Jay Kreps added a comment -

          Hi Taylor,

          Could you give a little more detail on your use case for ordering the fetches? I think you have a use case I haven't thought of, but I don't know if I understand it. Is your motivation some kind of quality of service over the topics?

          As you say, this would definitely be a new request type for compatibility, and we would probably try to deprecate the old format over the next few releases as we can get clients updated.

          Your point about complexity is valid. I think for our usage since we use kafka very heavily the pain of grandfathering in new APIs is the hardest part, and the socket server refactoring is next, so I was thinking the difficulty of implementing a few internal data structures is not too bad. I suppose it depends on if I work out a concrete plan there or not. If the best we can do is iterate over the full set of watchers it may not be worth it.

          Show
          Jay Kreps added a comment - Hi Taylor, Could you give a little more detail on your use case for ordering the fetches? I think you have a use case I haven't thought of, but I don't know if I understand it. Is your motivation some kind of quality of service over the topics? As you say, this would definitely be a new request type for compatibility, and we would probably try to deprecate the old format over the next few releases as we can get clients updated. Your point about complexity is valid. I think for our usage since we use kafka very heavily the pain of grandfathering in new APIs is the hardest part, and the socket server refactoring is next, so I was thinking the difficulty of implementing a few internal data structures is not too bad. I suppose it depends on if I work out a concrete plan there or not. If the best we can do is iterate over the full set of watchers it may not be worth it.
          Hide
          Taylor Gautier added a comment -

          Actually, I don't have a valid use case for priority fetches, I was just thinking ahead.

          I agree that it's painful to have message format upgrades. On the flip side of course we probably also agree it's bad to have parameters in the message header that don't correspond to real features.

          Can you make a trade-off and reserve some bytes for these two int (or long) parameters and/or a few others but just call the space reserved?

          Show
          Taylor Gautier added a comment - Actually, I don't have a valid use case for priority fetches, I was just thinking ahead. I agree that it's painful to have message format upgrades. On the flip side of course we probably also agree it's bad to have parameters in the message header that don't correspond to real features. Can you make a trade-off and reserve some bytes for these two int (or long) parameters and/or a few others but just call the space reserved?
          Hide
          Jun Rao added a comment -

          Just had a chance to look at the patch. Agree in principle this would work. It's probably better to create a separate jira for moving the requesthandler out of socket server. The long poll jira will depend on that jira.

          Show
          Jun Rao added a comment - Just had a chance to look at the patch. Agree in principle this would work. It's probably better to create a separate jira for moving the requesthandler out of socket server. The long poll jira will depend on that jira.
          Hide
          Jay Kreps added a comment -

          Cool, moved it.

          Show
          Jay Kreps added a comment - Cool, moved it.
          Hide
          Taylor Gautier added a comment -

          I've been staring at the code for a while - and I'm not sure I understand why you need KAFKA-202 to implement this feature.

          What I am thinking to do is:
          1) Every thread has to open a local socket for read/write
          2) Each thread puts the socket into the poll set for reading
          3) If a read request fails to read any messages, when it comes back to the handler, the handler adds a callback method to the appropriate log and puts the read request into a special queue. When that log gets messages for write, it calls the callback. The callback writes a byte into the special thread socket.
          4) The byte wakes up the thread, which sees that the special socket had a byte written to it, and so it goes and re-handles the read requests in the special queue as if they had just come in from the network. Thus if there are any messages available in the log for a given request, they are read just like normal and transferred out onto the channel. If not, they're re-queued as per step 3.

          I think there is some pieces I haven't quite got right - in particular, I think there can only be one active response at a time. Thus there will have to be some sort of response queue built up as each request generates a response, but I think that's simple - the handler just writes responses with non-zero messages into a response queue and the write logic of the socketserver is updated to drain this queue on write events (at the moment, it only deals with one response at a time, but now it may have many to send out queued up).

          Some other work that is probably going to be more difficult is that the binary protocol has to change to include the topic name or else there is no way to disambiguate the responses coming back.

          Show
          Taylor Gautier added a comment - I've been staring at the code for a while - and I'm not sure I understand why you need KAFKA-202 to implement this feature. What I am thinking to do is: 1) Every thread has to open a local socket for read/write 2) Each thread puts the socket into the poll set for reading 3) If a read request fails to read any messages, when it comes back to the handler, the handler adds a callback method to the appropriate log and puts the read request into a special queue. When that log gets messages for write, it calls the callback. The callback writes a byte into the special thread socket. 4) The byte wakes up the thread, which sees that the special socket had a byte written to it, and so it goes and re-handles the read requests in the special queue as if they had just come in from the network. Thus if there are any messages available in the log for a given request, they are read just like normal and transferred out onto the channel. If not, they're re-queued as per step 3. I think there is some pieces I haven't quite got right - in particular, I think there can only be one active response at a time. Thus there will have to be some sort of response queue built up as each request generates a response, but I think that's simple - the handler just writes responses with non-zero messages into a response queue and the write logic of the socketserver is updated to drain this queue on write events (at the moment, it only deals with one response at a time, but now it may have many to send out queued up). Some other work that is probably going to be more difficult is that the binary protocol has to change to include the topic name or else there is no way to disambiguate the responses coming back.
          Hide
          Jun Rao added a comment -

          Taylor,

          Sorry for the late response. I am not sure that I understand your proposal.

          a. Why do we need a local socket? It seems that the same thing can be achieved by just turning on the write_interesting bit in the socket key corresponding to a client request.

          b. It's not clear to me how you correlate a queued client request with the corresponding client socket.

          Show
          Jun Rao added a comment - Taylor, Sorry for the late response. I am not sure that I understand your proposal. a. Why do we need a local socket? It seems that the same thing can be achieved by just turning on the write_interesting bit in the socket key corresponding to a client request. b. It's not clear to me how you correlate a queued client request with the corresponding client socket.
          Hide
          Jay Kreps added a comment -

          This is a very rough draft of long poll support. It appears to work. Here are some remaining issues:
          1. I need the updated request objects to properly get the new fields (min_bytes, max_wait). Currently I am just hard-coding some made-up values.
          2. This patch is very specific to long poll support for fetch requests, it will require more generalization to support our other async case, namely delaying produce requests until a certain number of slaves are caught up.
          3. There are still some unit test problems.
          4. Code is a little rough still.

          Take a look if interested, I will discuss with a few people and clean up a little more before asking for a real review.

          Show
          Jay Kreps added a comment - This is a very rough draft of long poll support. It appears to work. Here are some remaining issues: 1. I need the updated request objects to properly get the new fields (min_bytes, max_wait). Currently I am just hard-coding some made-up values. 2. This patch is very specific to long poll support for fetch requests, it will require more generalization to support our other async case, namely delaying produce requests until a certain number of slaves are caught up. 3. There are still some unit test problems. 4. Code is a little rough still. Take a look if interested, I will discuss with a few people and clean up a little more before asking for a real review.
          Hide
          Taylor Gautier added a comment -

          Jay - that's great to hear!! Would you mind summarizing the way that the long-poll works? I know that several different implementations were suggested here on the thread and I wanted to know which one you ultimately decided to go with.

          Show
          Taylor Gautier added a comment - Jay - that's great to hear!! Would you mind summarizing the way that the long-poll works? I know that several different implementations were suggested here on the thread and I wanted to know which one you ultimately decided to go with.
          Hide
          Jay Kreps added a comment -

          Hey Taylor, here are the nitty gritty details:

          • When a fetch request comes in we immediately check if we have sufficient data to satisfy it
          • if so we respond immediately
          • If not we add a "watch" on the topics that the fetch is for, and add it to a delay queue to expire it after the given timeout
          • There is a background thread that checks the delay queue for expired requests and responds to them with whatever data is available
          • When a produce request comes in we update the watchers for all the topics it produces to, and increment their byte count. Any requests that have been satisfied by this produce, are then executed and responses are sent.

          So one of the earlier questions was how to support polling on a very large number of topics AND wants very low latency, I think as you described it would be possible to implement this by simply multiplexing the requests on the single socket and letting the server respond to these as possible.

          Show
          Jay Kreps added a comment - Hey Taylor, here are the nitty gritty details: When a fetch request comes in we immediately check if we have sufficient data to satisfy it if so we respond immediately If not we add a "watch" on the topics that the fetch is for, and add it to a delay queue to expire it after the given timeout There is a background thread that checks the delay queue for expired requests and responds to them with whatever data is available When a produce request comes in we update the watchers for all the topics it produces to, and increment their byte count. Any requests that have been satisfied by this produce, are then executed and responses are sent. So one of the earlier questions was how to support polling on a very large number of topics AND wants very low latency, I think as you described it would be possible to implement this by simply multiplexing the requests on the single socket and letting the server respond to these as possible.
          Hide
          Jay Kreps added a comment -

          Two other issues with this patch, I forgot to mention:

          • There is a race condition between checking the available bytes, and adding the watchers for the topics. I think this is okay since the min_bytes is a minimum not a maximum, so in the rare case that a produce comes in before the watchers are added we will just wait slightly longer than we should have. I think this is probably better than properly synchronizing and locking out all produces on that partition.
          • The other issues is that the delay queue is only emptied right now when the delay expires. If the request is fulfilled before the delay expires, the request is marked completed, but it remains in the delay queue until it expires. This is a problem and needs to be fixed. The problem is that if the client sets a low min_bytes and a high max_wait these requests may accumulate. Currently we would have to do an O(N) walk of the waiting requests to fix this. I am going to try to come up with an improved set of data structures to fix this without requiring that.
          Show
          Jay Kreps added a comment - Two other issues with this patch, I forgot to mention: There is a race condition between checking the available bytes, and adding the watchers for the topics. I think this is okay since the min_bytes is a minimum not a maximum, so in the rare case that a produce comes in before the watchers are added we will just wait slightly longer than we should have. I think this is probably better than properly synchronizing and locking out all produces on that partition. The other issues is that the delay queue is only emptied right now when the delay expires. If the request is fulfilled before the delay expires, the request is marked completed, but it remains in the delay queue until it expires. This is a problem and needs to be fixed. The problem is that if the client sets a low min_bytes and a high max_wait these requests may accumulate. Currently we would have to do an O(N) walk of the waiting requests to fix this. I am going to try to come up with an improved set of data structures to fix this without requiring that.
          Hide
          Jun Rao added a comment -

          Overall, the patch looks good. Some comments:

          1. DelayedItem.compareTo: yourEnd should be delayed.createdMs + delayed.delayMs
          2. Suppose that a client issues MultiFetch requests on a hot topic and a cold topic. What can happen is that the watcher list for the cold topic won't be cleaned up for a long time. One solution is to have a cleaner thread that periodically wakes up to remove satisfied items. The cleaner thread can be used to clean up the DelayQueue too.
          3. MessageSetSend.empty is not used.

          Show
          Jun Rao added a comment - Overall, the patch looks good. Some comments: 1. DelayedItem.compareTo: yourEnd should be delayed.createdMs + delayed.delayMs 2. Suppose that a client issues MultiFetch requests on a hot topic and a cold topic. What can happen is that the watcher list for the cold topic won't be cleaned up for a long time. One solution is to have a cleaner thread that periodically wakes up to remove satisfied items. The cleaner thread can be used to clean up the DelayQueue too. 3. MessageSetSend.empty is not used.
          Hide
          Jay Kreps added a comment -

          This version of the patch updates the code to work with the new request objects and correctly respect the min_bytes and max_fetch_wait settings.

          Please review the new configs and make sure we are happy with the naming.

          Show
          Jay Kreps added a comment - This version of the patch updates the code to work with the new request objects and correctly respect the min_bytes and max_fetch_wait settings. Please review the new configs and make sure we are happy with the naming.
          Hide
          Jay Kreps added a comment -

          Oops, missing about a bazillion files on that last patch.

          Show
          Jay Kreps added a comment - Oops, missing about a bazillion files on that last patch.
          Hide
          Jun Rao added a comment -

          Thanks for patch v3. Some comments:

          31. DelayedFetch is keyed off topic. It should be keyed off (topic, partition) since a consumer may be interested in only a subset of partitions within a topic.

          32. KafkaApis: The following 3 lines are duplicated in 2 places.
          val topicData = readMessageSets(delayed.fetch.offsetInfo)
          val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
          requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
          Should we put them in a private method and share the code?

          33. ExpiredRequestReaper.purgeExpired(): We need to decrement unsatisfied count here.

          34. FetchRequest: Can we have the default constants for correlationId, clientid, etc defined and shared btw the constructor and the request builder?

          35. MessageSetSend.empty is unused. Should we remove it?

          Show
          Jun Rao added a comment - Thanks for patch v3. Some comments: 31. DelayedFetch is keyed off topic. It should be keyed off (topic, partition) since a consumer may be interested in only a subset of partitions within a topic. 32. KafkaApis: The following 3 lines are duplicated in 2 places. val topicData = readMessageSets(delayed.fetch.offsetInfo) val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response, ErrorMapping.NoError), -1)) Should we put them in a private method and share the code? 33. ExpiredRequestReaper.purgeExpired(): We need to decrement unsatisfied count here. 34. FetchRequest: Can we have the default constants for correlationId, clientid, etc defined and shared btw the constructor and the request builder? 35. MessageSetSend.empty is unused. Should we remove it?
          Hide
          Jay Kreps added a comment -

          Jun, thanks for the feedback. This patch hopefully addresses your comments:
          1. I removed the empty flag, as you suggested from MessageSetSend
          2. I would like to leave the ugly duplicate code for now. Making a seperate method for this doesn't really make sense as it isn't really a stand alone piece of code. I think the root problem is that action you do when the request is satisfied can be done either synchronously (if possible), asynchronously when the criteria are satisfied, or asychronously when the request expires. I think the right way to do this is to refactor RequestPurgatory a bit and somehow always use the same callback for all three cases. I would like to address this as a seperate patch because this idea is not fully baked yet.
          3. The default values are now shared between the builder and constructor.
          4. I changed the key to be (topic, partition) for FetchRequestPurgatory. That was a major oversite.
          5. The purgeExpired method is actually misnamed it is really purgeSatisfied, so it doesn't need to decrement the satisfied count. However there is a major bug in that count, it wasn't getting decremented by the processing thread. I added a new method to cover this.

          Show
          Jay Kreps added a comment - Jun, thanks for the feedback. This patch hopefully addresses your comments: 1. I removed the empty flag, as you suggested from MessageSetSend 2. I would like to leave the ugly duplicate code for now. Making a seperate method for this doesn't really make sense as it isn't really a stand alone piece of code. I think the root problem is that action you do when the request is satisfied can be done either synchronously (if possible), asynchronously when the criteria are satisfied, or asychronously when the request expires. I think the right way to do this is to refactor RequestPurgatory a bit and somehow always use the same callback for all three cases. I would like to address this as a seperate patch because this idea is not fully baked yet. 3. The default values are now shared between the builder and constructor. 4. I changed the key to be (topic, partition) for FetchRequestPurgatory. That was a major oversite. 5. The purgeExpired method is actually misnamed it is really purgeSatisfied, so it doesn't need to decrement the satisfied count. However there is a major bug in that count, it wasn't getting decremented by the processing thread. I added a new method to cover this.
          Hide
          Jay Kreps added a comment -

          I attached a diff that just shows the changes between v3 and v4 for folks who already looked at v3.

          Show
          Jay Kreps added a comment - I attached a diff that just shows the changes between v3 and v4 for folks who already looked at v3.
          Hide
          Jun Rao added a comment -

          Patch v4 looks good. Just one more comment.

          41. RequestPurgatory.update(): if(w == null), could we return a singleton empty array, instead of creating a new one every time?

          Show
          Jun Rao added a comment - Patch v4 looks good. Just one more comment. 41. RequestPurgatory.update(): if(w == null), could we return a singleton empty array, instead of creating a new one every time?
          Hide
          Jay Kreps added a comment -

          Good point Jun, now it is
          if(w == null)
          Seq.empty
          else
          w.collectSatisfiedRequests(request)

          I will wait for more feedback before making a new patch since this is a pretty trivial change.

          Show
          Jay Kreps added a comment - Good point Jun, now it is if(w == null) Seq.empty else w.collectSatisfiedRequests(request) I will wait for more feedback before making a new patch since this is a pretty trivial change.
          Hide
          Neha Narkhede added a comment -

          This patch looks very good. Here are a few questions -

          1. I like the way the expired requests are handled by implementing the logic inside the FetchRequestPurgatory. However, can we not do the same for satisfied requests by providing a satisfy() abstract API in RequestPurgatory ? That gets rid of the handling of fetch requests inside handleProducerRequest() in KafkaApis, which is a little awkward to read. When we have the ProduceRequestPurgatory, the same satisfy() operation can send responses for produce requests once the fetch responses for the followers come in.

          2. I gave the RequestPurgatory data structure some thought. Not sure if this buys us anything over the current data structure. How about the following data structure for the RequestPurgatory -

          2.1. The watchers would be a priority heap (PriorityQueue), with the head being the DelayedItem with the least delay value (earliest expiration time). So for each (topic, partition), we have a PQ of watchers.

          2.2. The expiration data structure is another PQ of size n, where n is the number of keys in RequestPurgatory. This expiration PQ has the heads of each of the watcher lists above.

          2.3. The expiration thread will await on a condition variable with a timeout = delay of the head of the expiration PQ. The condition also gets signaled whenever the head of any of the n watcher list changes.

          2.4. When the expiration thread gets signaled, it removes its head element, expires it if its ready, ignores if its satisfied, and adds an element from the watch list it came from. It keeps doing this until its head has expiration time in the future. Then it goes back to awaiting on the condition variable.

          2.5. The item to be expired gets removed from its watch list as well as expiration PQ in O(1).

          2.6. The item that gets satisfied sets a flag and gets removed from its watcher list. If the satisfied item is the head of the watcher list, the expiration thread gets signaled to add new head to its PQ.

          2.7 Pros
          2.7.1. The watcher list doesn't maintain expired items, so doesn't need state-keeping for liveCount and maybePurge()
          2.7.2. During a watch operation, items only enter the expiration PQ if they are the head of the watcher list
          2.7.3. The expiration thread does a more informed get operation, instead of polling the queue in a loop.

          2.8. Cons
          2.8.1. watch operation is O(logn) where n is the number of DelayedItems for a key
          2.8.2 The forcePurge() operation on the expiration data structure still needs to happen in O

          Did I miss something here ? Thoughts ?

          3. On the other hand, this is a huge non-trivial patch and you must be pretty tired of rebasing and working through unit tests. We could just discuss the above changes, and maybe file another JIRA to track it, instead of delaying this patch further. But that is your call.

          Show
          Neha Narkhede added a comment - This patch looks very good. Here are a few questions - 1. I like the way the expired requests are handled by implementing the logic inside the FetchRequestPurgatory. However, can we not do the same for satisfied requests by providing a satisfy() abstract API in RequestPurgatory ? That gets rid of the handling of fetch requests inside handleProducerRequest() in KafkaApis, which is a little awkward to read. When we have the ProduceRequestPurgatory, the same satisfy() operation can send responses for produce requests once the fetch responses for the followers come in. 2. I gave the RequestPurgatory data structure some thought. Not sure if this buys us anything over the current data structure. How about the following data structure for the RequestPurgatory - 2.1. The watchers would be a priority heap (PriorityQueue), with the head being the DelayedItem with the least delay value (earliest expiration time). So for each (topic, partition), we have a PQ of watchers. 2.2. The expiration data structure is another PQ of size n, where n is the number of keys in RequestPurgatory. This expiration PQ has the heads of each of the watcher lists above. 2.3. The expiration thread will await on a condition variable with a timeout = delay of the head of the expiration PQ. The condition also gets signaled whenever the head of any of the n watcher list changes. 2.4. When the expiration thread gets signaled, it removes its head element, expires it if its ready, ignores if its satisfied, and adds an element from the watch list it came from. It keeps doing this until its head has expiration time in the future. Then it goes back to awaiting on the condition variable. 2.5. The item to be expired gets removed from its watch list as well as expiration PQ in O(1). 2.6. The item that gets satisfied sets a flag and gets removed from its watcher list. If the satisfied item is the head of the watcher list, the expiration thread gets signaled to add new head to its PQ. 2.7 Pros 2.7.1. The watcher list doesn't maintain expired items, so doesn't need state-keeping for liveCount and maybePurge() 2.7.2. During a watch operation, items only enter the expiration PQ if they are the head of the watcher list 2.7.3. The expiration thread does a more informed get operation, instead of polling the queue in a loop. 2.8. Cons 2.8.1. watch operation is O(logn) where n is the number of DelayedItems for a key 2.8.2 The forcePurge() operation on the expiration data structure still needs to happen in O Did I miss something here ? Thoughts ? 3. On the other hand, this is a huge non-trivial patch and you must be pretty tired of rebasing and working through unit tests. We could just discuss the above changes, and maybe file another JIRA to track it, instead of delaying this patch further. But that is your call.
          Hide
          Jay Kreps added a comment -

          Hey Neha, yes, my hope is to get the patch evaluated as is, and then take another pass at cleaning up the way we handle the satisfaction action as Jun and you requested and try out other approaches to the purgatory data structure asynchronously. That should take these cleanup/polishing items out of the critical path.

          I like your idea of the dual priority queues, but I need to work through it more to fully understand it.

          Show
          Jay Kreps added a comment - Hey Neha, yes, my hope is to get the patch evaluated as is, and then take another pass at cleaning up the way we handle the satisfaction action as Jun and you requested and try out other approaches to the purgatory data structure asynchronously. That should take these cleanup/polishing items out of the critical path. I like your idea of the dual priority queues, but I need to work through it more to fully understand it.
          Hide
          Joel Koshy added a comment -

          +1 on the patch. I have a few minor comments:

          KafkaRequestHandlers :

          • requestLogger unused.

          ConsumerConfig:

          • maxFetchWait -> rename the prop to max.fetch.wait.ms and the val to
            maxFetchWaitMs
          • Can we get rid of fetcherBackoffMs? It says it is deprecated, but had a
            reference in FetcherRunnable which you removed.
          • May want to have an explicit constraint that consumerTimeoutMs <=
            maxFetchWait

          RequestPurgatory:

          • Unused import.
          • The parameterized types and overall tricky nature of this component make
            it somewhat difficult to follow. I (think) I understood it only after
            looking at its usage in KafkaApis, so the comments and javadocs (including
            class' summary on top) can only go so far. Even so, I think the comments
            seem slightly out of sync with the code and can be improved a bit. E.g.,
            what is "given size" in the update method's comment? current keys in the
            comment for watch == the given request's keys. and so on.
          • Also, it may be easier to follow if we do some renaming, but it's a matter
            of taste and I may have misunderstood the code to begin with:
          • I find it confusing that there's a map called watchers which is a map
            from keys to Watcher objects, and the Watcher class itself has a
            linked-list of delayed requests called watchers. May be unwieldy, but
            how about renaming:
          • RequestPurgatory.watchers to watchedRequestsForKey
          • Watchers to WatchedRequests
          • Watchers.watchers to requests
          • Rename DelayedRequest.satisfied to satisfiedOrExpired (I find it weird
            that the reaper marks expired requests as satisfied.)
          • update -> maybeNotify?
          • In collectSatisfiedRequests, the comment on "another thread has satisfied
            this request". That can only be the ExpiredRequestReaper thread right?
          • It is slightly odd that we have to call the reaper's satisfyRequest method
            from Watcher. Would it work to move the unsatisfied counter up to
            RequestPurgatory?
          Show
          Joel Koshy added a comment - +1 on the patch. I have a few minor comments: KafkaRequestHandlers : requestLogger unused. ConsumerConfig: maxFetchWait -> rename the prop to max.fetch.wait.ms and the val to maxFetchWaitMs Can we get rid of fetcherBackoffMs? It says it is deprecated, but had a reference in FetcherRunnable which you removed. May want to have an explicit constraint that consumerTimeoutMs <= maxFetchWait RequestPurgatory: Unused import. The parameterized types and overall tricky nature of this component make it somewhat difficult to follow. I (think) I understood it only after looking at its usage in KafkaApis, so the comments and javadocs (including class' summary on top) can only go so far. Even so, I think the comments seem slightly out of sync with the code and can be improved a bit. E.g., what is "given size" in the update method's comment? current keys in the comment for watch == the given request's keys. and so on. Also, it may be easier to follow if we do some renaming, but it's a matter of taste and I may have misunderstood the code to begin with: I find it confusing that there's a map called watchers which is a map from keys to Watcher objects, and the Watcher class itself has a linked-list of delayed requests called watchers. May be unwieldy, but how about renaming: RequestPurgatory.watchers to watchedRequestsForKey Watchers to WatchedRequests Watchers.watchers to requests Rename DelayedRequest.satisfied to satisfiedOrExpired (I find it weird that the reaper marks expired requests as satisfied.) update -> maybeNotify? In collectSatisfiedRequests, the comment on "another thread has satisfied this request". That can only be the ExpiredRequestReaper thread right? It is slightly odd that we have to call the reaper's satisfyRequest method from Watcher. Would it work to move the unsatisfied counter up to RequestPurgatory?
          Hide
          Jay Kreps added a comment -

          Joel, this is great feedback. I will address these issues in the commit since most are naming/documentation related.

          Show
          Jay Kreps added a comment - Joel, this is great feedback. I will address these issues in the commit since most are naming/documentation related.
          Hide
          Jay Kreps added a comment -

          Included most of Joel's comments, and fixed a few lagging unit tests (in particular refactored AutoOffsetResetTest).

          Comments on the general structure of request purgatory I am going to put off until we have our second use case ready to implement--the producer acks. When we have that I am going to look at refactoring so that the "satisfaction action" is a function included with the DelayedRequest which is executed regardless of whether the request is satsified or times out. But I want to put this off until we can check it against the specifics of the second use case.

          Show
          Jay Kreps added a comment - Included most of Joel's comments, and fixed a few lagging unit tests (in particular refactored AutoOffsetResetTest). Comments on the general structure of request purgatory I am going to put off until we have our second use case ready to implement--the producer acks. When we have that I am going to look at refactoring so that the "satisfaction action" is a function included with the DelayedRequest which is executed regardless of whether the request is satsified or times out. But I want to put this off until we can check it against the specifics of the second use case.

            People

            • Assignee:
              Jay Kreps
              Reporter:
              Jun Rao
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development