Thanks for bringing this up. I had considered a slight variant of your
suggestion (although I was thinking of issuing requests with upper fetch
size for incomplete partitions one partition at a time, instead of another
multi-fetch on all incomplete partitions as done in this patch). I didn't go
with it mainly due to the concern (1) that you raise - i.e., to avoid doing
sequential fetches, although I don't think that is too much of an issue.
Whether having an upper fetch size config is better than the current set up
depends on the use case: e.g., if there are only a few partitions that have
large messages, then the upper fetch size approach would work well. In the
worst case of all partitions being incomplete that would lead to a large
allocation - which is also why I felt the "pipelined fetches of incomplete
partitions" approach added no real value (since it is equivalent in terms of
the net memory the client is expected to handle).
Anyway I think the above then leads naturally to your suggestion of using a
single fetch size and dividing that across all partitions. I like that
approach - especially since there are no new configs to deal with. I would
think the memory allocation concerns are valid but tolerable from the
client's perspective - i.e., the heavy memory allocations only kick in when
there are incomplete partitions in which case I think most clients would
want to consume anyway (along with a log warning indicating a large
message). One minor drawback is that there isn't really a clear default
value for fetch size - right now, it is reasonable to say with a fetch size
of 1MB that is also the (approximate) max size of a message. With the above
re-design we can no longer map the config to messages since there is no
prior knowledge of number of partitions consumed by each consumer thread,
but I don't think that's a big deal.
So as I see it the, choices are:
1) Change the semantics of fetch size to be net, across all partitions.
If/when incomplete partitions are encountered, For each incomplete
partition, issue a fetch request of size fetch.size. I think if we do
this we should also include the total memory that would be used - i.e.,
including the queued chunks.
2) Introduce a new config called upper fetch size that kicks in whenever
there are incomplete partitions - for which:
a) issue a multi-fetch request with size upper fetch size for all
b) issue sequential fetch requests of upper fetch size, one incomplete
partition at a time.
3) If we had byte-addressability for fetches (which I really think we should
allow at least for these kinds of internal APIs) we could consider a
third option: keep fetch size as is, and issue pipelined fetch requests
to build up and complete incomplete partition, one at a time.
What do people think?