This patch looks very good. Here are a few questions -
1. I like the way the expired requests are handled by implementing the logic inside the FetchRequestPurgatory. However, can we not do the same for satisfied requests by providing a satisfy() abstract API in RequestPurgatory ? That gets rid of the handling of fetch requests inside handleProducerRequest() in KafkaApis, which is a little awkward to read. When we have the ProduceRequestPurgatory, the same satisfy() operation can send responses for produce requests once the fetch responses for the followers come in.
2. I gave the RequestPurgatory data structure some thought. Not sure if this buys us anything over the current data structure. How about the following data structure for the RequestPurgatory -
2.1. The watchers would be a priority heap (PriorityQueue), with the head being the DelayedItem with the least delay value (earliest expiration time). So for each (topic, partition), we have a PQ of watchers.
2.2. The expiration data structure is another PQ of size n, where n is the number of keys in RequestPurgatory. This expiration PQ has the heads of each of the watcher lists above.
2.3. The expiration thread will await on a condition variable with a timeout = delay of the head of the expiration PQ. The condition also gets signaled whenever the head of any of the n watcher list changes.
2.4. When the expiration thread gets signaled, it removes its head element, expires it if its ready, ignores if its satisfied, and adds an element from the watch list it came from. It keeps doing this until its head has expiration time in the future. Then it goes back to awaiting on the condition variable.
2.5. The item to be expired gets removed from its watch list as well as expiration PQ in O(1).
2.6. The item that gets satisfied sets a flag and gets removed from its watcher list. If the satisfied item is the head of the watcher list, the expiration thread gets signaled to add new head to its PQ.
2.7.1. The watcher list doesn't maintain expired items, so doesn't need state-keeping for liveCount and maybePurge()
2.7.2. During a watch operation, items only enter the expiration PQ if they are the head of the watcher list
2.7.3. The expiration thread does a more informed get operation, instead of polling the queue in a loop.
2.8.1. watch operation is O(logn) where n is the number of DelayedItems for a key
2.8.2 The forcePurge() operation on the expiration data structure still needs to happen in O
Did I miss something here ? Thoughts ?
3. On the other hand, this is a huge non-trivial patch and you must be pretty tired of rebasing and working through unit tests. We could just discuss the above changes, and maybe file another JIRA to track it, instead of delaying this patch further. But that is your call.