My latest patch has an unprocessedMessages map, which it uses as a buffer. When the SystemConsumers class needs more messages for a set of SystemStreamPartitions, it asks the underlying SystemConsumer for all messages for these SystemStreamPartitions. The messages are then buffered in the unprocessedMessages map in SystemConsumers, and doled out to the MessageChooser over time.
An alternative implementation would be shove all of the buffering logic into the SystemConsumer implementations. In this case, you'd have the SystemConsumer.poll API look like this:
List<IncomingMessageEnvelope> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException;
Rather than having the poll method return all available messages for the supplied SystemStreamPartitions, it would only return at most one IncomingMessageEnvelope per SystemStreamPartition.
This tweak simplifies the SystemConsumers implementation slightly (LoC goes from 315 to 280) because we no longer need an unprocessedMessage buffer in it. We simply call SystemConsumer.poll() immediately whenever the MessageChooser needs a new message, rather than first taking from the unprocessedMessages buffer. It also allows us to eliminate the "update" method.
The trade off for this slightly simpler SystemConsumers implementation is that it pushes the buffering complexity into individual SystemConsumer implementations. This is a double edged sword. On the one hand, it allows us to potentially write different buffering strategies for different systems. I can't think of any use cases for this off the top of my head, but there might be some. On the other hand, it means that each individual system has its own way of configuring its buffer (and buffer refresh rates), which seems like it would be confusing to end-users. In addition, it also seems a bit scary, since poll() might be high overhead for some system implementations. If we poll aggressively every time we need a message, this could lead to bad performance. To get around this, again, the underlying system would have to buffer messages to return avoid the cost of polling frequently.
Right now, the trade off doesn't matter much, since all known implementations use BlockingEnvelopeMap, which already contains the buffering logic. When we convert to the 0.9 new Kafka consumer, this won't be the case anymore. In that case, we'll have to re-implement the unprocessedMessages logic inside the Kafka consumer, since the consumer will fetch for N messages, but only be able to return one at a time via the poll() method.
After implementing a bit of this alternative approach, I don't think the tradeoffs are worth it, so I'm sticking with the existing patch. I didn't get as far as doing any performance tests with the tweaked patch because I'm not comfortable with the tradeoffs.