Kafka
  1. Kafka
  2. KAFKA-598

decouple fetch size from max message size

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Blocker Blocker
    • Resolution: Unresolved
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:

      Description

      Currently, a consumer has to set fetch size larger than the max message size. This increases the memory footprint on the consumer, especially when a large number of topic/partition is subscribed. By decoupling the fetch size from max message size, we can use a smaller fetch size for normal consumption and when hitting a large message (hopefully rare), we automatically increase fetch size to max message size temporarily.

      1. KAFKA-598-v1.patch
        16 kB
        Joel Koshy
      2. KAFKA-598-v2.patch
        16 kB
        Joel Koshy
      3. KAFKA-598-v3.patch
        44 kB
        Joel Koshy

        Activity

        Neha Narkhede made changes -
        Labels p4
        Hide
        Joel Koshy added a comment -

        The full scope should probably move out of 0.8 - i.e., as described above bounding the consumers memory is
        basically a packing problem without knowledge of the message-size on the broker. One possibility is for the broker
        to somehow communicate the size of the large message back to the client, but that would break our zero-copy
        property wrt fetches.

        So I would suggest we don't do the full patch (i.e., bounding consumer memory && handling large messages).
        Instead we can go with the simpler implementation that requires a new config (which is not ideal, but better IMO than
        trying to half-implement the above packing problem.).

        I haven't had time to look at this lately, but if people are okay with the above, then I can revisit one of the
        earlier revisions of the patches.

        Show
        Joel Koshy added a comment - The full scope should probably move out of 0.8 - i.e., as described above bounding the consumers memory is basically a packing problem without knowledge of the message-size on the broker. One possibility is for the broker to somehow communicate the size of the large message back to the client, but that would break our zero-copy property wrt fetches. So I would suggest we don't do the full patch (i.e., bounding consumer memory && handling large messages). Instead we can go with the simpler implementation that requires a new config (which is not ideal, but better IMO than trying to half-implement the above packing problem.). I haven't had time to look at this lately, but if people are okay with the above, then I can revisit one of the earlier revisions of the patches.
        Hide
        Neha Narkhede added a comment -

        Do we want to move this out of 0.8 ?

        Show
        Neha Narkhede added a comment - Do we want to move this out of 0.8 ?
        Hide
        Joel Koshy added a comment -

        33.2: not really, because that would violate the new config's semantics - i.e., each thread shouldn't exceed it's allocated amount of memory.
        That said, I just realized that this implementation has couple flaws and may need to be refactored or have its scope reduced.
        The max mem config is not always respected in this implementation. When we do the serial fetches the queue will have larger chunks
        (than the fair partition fetch size). However, the function that computes the fair partition fetch size assumes that the blocking queue only
        has chunks of the fair fetch size. I think we can take care of this, but will think about it a bit more. Another problem is that the aggregate fetch
        size is the fair size * num partitions assigned to the thread. So for example partition assignment happens to be very skewed and a thread
        happens to have only one partition; a serial refetch will be pointless since it can't use a larger fetch size.

        Show
        Joel Koshy added a comment - 33.2: not really, because that would violate the new config's semantics - i.e., each thread shouldn't exceed it's allocated amount of memory. That said, I just realized that this implementation has couple flaws and may need to be refactored or have its scope reduced. The max mem config is not always respected in this implementation. When we do the serial fetches the queue will have larger chunks (than the fair partition fetch size). However, the function that computes the fair partition fetch size assumes that the blocking queue only has chunks of the fair fetch size. I think we can take care of this, but will think about it a bit more. Another problem is that the aggregate fetch size is the fair size * num partitions assigned to the thread. So for example partition assignment happens to be very skewed and a thread happens to have only one partition; a serial refetch will be pointless since it can't use a larger fetch size.
        Hide
        Jun Rao added a comment -

        I took a look at the patch again. Yes, you are right. Your logic is correct. I have a couple of other minor comments.

        33. AbstractFetcherThread:
        33.1 Could we rename partitionFetchSize to sth that indicates it is a function?
        33.2 Instead of computing aggregateFetchSize each time, could we pass in max memory?

        Show
        Jun Rao added a comment - I took a look at the patch again. Yes, you are right. Your logic is correct. I have a couple of other minor comments. 33. AbstractFetcherThread: 33.1 Could we rename partitionFetchSize to sth that indicates it is a function? 33.2 Instead of computing aggregateFetchSize each time, could we pass in max memory?
        Hide
        Joel Koshy added a comment -

        Thanks for taking the time to review.

        30. I think what I have is correct but may be missing something - can you check this again and confirm? The passed-in value is (supposed to be) the fetch size per partition. Only the fetcher manager can compute the fetch size per partition (since it has access to the partition count for each fetcher thread) and that function is "wired in" through the constructor of the abstract fetcher thread.
        31. Sure - will make that change.
        32. It is true that map size is O(1). However, here I need to take each value of the map (which is a fetcher thread) and get the number of partitions that it owns and then sum up the counts. (This is required to compute the partition fetch size). The iteration over the map values seemed expensive (and it is done for each fetch request) so I went with the volatile int.
        Anyway, I think a better approach would be to maintain a Set of partitions that the fetcher manager is currently managing and just return the size of the set.

        Show
        Joel Koshy added a comment - Thanks for taking the time to review. 30. I think what I have is correct but may be missing something - can you check this again and confirm? The passed-in value is (supposed to be) the fetch size per partition. Only the fetcher manager can compute the fetch size per partition (since it has access to the partition count for each fetcher thread) and that function is "wired in" through the constructor of the abstract fetcher thread. 31. Sure - will make that change. 32. It is true that map size is O(1). However, here I need to take each value of the map (which is a fetcher thread) and get the number of partitions that it owns and then sum up the counts. (This is required to compute the partition fetch size). The iteration over the map values seemed expensive (and it is done for each fetch request) so I went with the volatile int. Anyway, I think a better approach would be to maintain a Set of partitions that the fetcher manager is currently managing and just return the size of the set.
        Hide
        Jun Rao added a comment -

        Thanks for patch v3. Some comments.

        30. AbstractFetcherThread: The input parameter partitionFetchSize is mis-leading. The passed-in value is not really the fetch size per partition. Instead, this is the max memory size allocated to this fetcher thread (which handles multiple partitions). So, when calculating the partition fetch size, we need to divide this number by the # of partitions in this fetcher thread.

        31. ConsumerFetcherThread: The following logic seems to be handling overflow when converting from long to int. This is a bit hard to understand. Could we do the check explicitly (ie. if longVal > Int.MaxValue set requested to Int.MaxValue)?

        val requested = (config.maxMemory / (partitionCount * (1/* current fetch */ + config.maxQueuedChunks)).max(1))
        .toInt
        if (requested < 0)
        Int.MaxValue

        32. AbstractFetcherManager: Do we really need to maintain partitionCount? Scala doc didn't say HashMap.size() is linear to # of entries. If this is just a limitation in scala 2.8. I suggest that we just pay the overhead of traversing the map for now. Typically, the # of fetcher threads is small.

        Show
        Jun Rao added a comment - Thanks for patch v3. Some comments. 30. AbstractFetcherThread: The input parameter partitionFetchSize is mis-leading. The passed-in value is not really the fetch size per partition. Instead, this is the max memory size allocated to this fetcher thread (which handles multiple partitions). So, when calculating the partition fetch size, we need to divide this number by the # of partitions in this fetcher thread. 31. ConsumerFetcherThread: The following logic seems to be handling overflow when converting from long to int. This is a bit hard to understand. Could we do the check explicitly (ie. if longVal > Int.MaxValue set requested to Int.MaxValue)? val requested = (config.maxMemory / (partitionCount * (1/* current fetch */ + config.maxQueuedChunks)).max(1)) .toInt if (requested < 0) Int.MaxValue 32. AbstractFetcherManager: Do we really need to maintain partitionCount? Scala doc didn't say HashMap.size() is linear to # of entries. If this is just a limitation in scala 2.8. I suggest that we just pay the overhead of traversing the map for now. Typically, the # of fetcher threads is small.
        Joel Koshy made changes -
        Attachment KAFKA-598-v3.patch [ 12561081 ]
        Hide
        Joel Koshy added a comment -

        Quick overview of revised patch:

        1 - Addressed your comment about the previous behavior in ConsumerIterator
        (good catch on that!) and the config defaults.
        2 - Changed semantics of fetch size to max memory. Max mem is a long (as int
        would currently limit to 2G). The actual partition fetch size is checked
        for overflow (in which case it is set to Int.MaxValue).
        3 - Also introduced a DeprecatedProperties convenience class that will be
        checked upon config verification. I added this because i think max.memory
        is a more meaningful config than fetch.size and we can use this to
        deprecate other configs if needed.
        4 - The partition count is a volatile int - I chose that over a method only to
        avoid traversal (for each request) to determine the count.

        Show
        Joel Koshy added a comment - Quick overview of revised patch: 1 - Addressed your comment about the previous behavior in ConsumerIterator (good catch on that!) and the config defaults. 2 - Changed semantics of fetch size to max memory. Max mem is a long (as int would currently limit to 2G). The actual partition fetch size is checked for overflow (in which case it is set to Int.MaxValue). 3 - Also introduced a DeprecatedProperties convenience class that will be checked upon config verification. I added this because i think max.memory is a more meaningful config than fetch.size and we can use this to deprecate other configs if needed. 4 - The partition count is a volatile int - I chose that over a method only to avoid traversal (for each request) to determine the count.
        Jun Rao made changes -
        Priority Major [ 3 ] Blocker [ 1 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v2. Looks good overhead. A couple of comments.

        1. There is one subtle issue. In ConsumerIterator, we throw a MessageSizeTooLargeException if validBytes is 0. This is sort of our way to inform the consumer client that the consumption is stuck. With this patch, if we get a really large message, we will just keep issuing fetch requests in AbstractFetcherThread and never return the fetched data to the consumer queue. So the consumer client won't get the MessageSizeTooLargeException. Not sure what's the best way to solve this. One way is that on retry (with a larger fetch size), we will pass the data to processPartitionData even if validBytes is 0.

        2. It's possible that when validBytes is 0, we are only fetching 1 partition. So, retry won't help. But the current patch probably still works. So this may not be an issue.

        3. I would make the default FetchSize 100MB and the default MaxQueuedChunks 1.

        Show
        Jun Rao added a comment - Thanks for patch v2. Looks good overhead. A couple of comments. 1. There is one subtle issue. In ConsumerIterator, we throw a MessageSizeTooLargeException if validBytes is 0. This is sort of our way to inform the consumer client that the consumption is stuck. With this patch, if we get a really large message, we will just keep issuing fetch requests in AbstractFetcherThread and never return the fetched data to the consumer queue. So the consumer client won't get the MessageSizeTooLargeException. Not sure what's the best way to solve this. One way is that on retry (with a larger fetch size), we will pass the data to processPartitionData even if validBytes is 0. 2. It's possible that when validBytes is 0, we are only fetching 1 partition. So, retry won't help. But the current patch probably still works. So this may not be an issue. 3. I would make the default FetchSize 100MB and the default MaxQueuedChunks 1.
        Joel Koshy made changes -
        Attachment KAFKA-598-v2.patch [ 12556060 ]
        Hide
        Joel Koshy added a comment -

        Here is an updated patch. After discussing with Jay, here is a proposal that
        the patch implements:

        • fetch.size now means the maximum fetch size that will be used across all
          partitions in any given multi-fetch attempt.
        • If there are incomplete partitions, then log a warning and then serially
          attempt fetching each of those partitions with the fetch.size.

        The main drawbacks are as discussed earlier - i.e., there is a change in the
        semantics of fetch.size, so many clients may need to be aware of this and
        reconfigure their fetch size. If there are several incomplete partitions,
        the serial fetches could cause the consumer to start lagging. However, this
        is slightly better (and no worse) than the current behavior of getting
        wedged.

        Couple other comments:

        • In the patch, partitionFetchSize (in AbstractFetcherThread) may be very
          small or even zero - but I think that's fine since there is no sensible
          "minMessageSize" and not increasing the partitionFetchSize when consuming
          a large number of partitions would be a client misconfiguration.
        • In this patch, the check on whether a partition is incomplete or not is
          accomplished by fetching with the configured fetch.size and then measuring
          validBytes. This is wasteful and can be addressed by preventing the broker
          from responding with an incomplete message, and setting a new error code
          (say, FetchSizeTooSmall) in the FetchResponse. I think we can defer this
          to trunk.
        • Another thing that Jay and I discussed a bit: the use of folds vs.
          mutables. I had refactored the code to use foldLeft to avoid using mutable
          sets. Folding and immutables forces you to check that you have accounted
          for all possible paths to the end result - which is helpful especially
          when there are multiple such paths (e.g., cases in a match). Personally I
          think it is more readable as well, but that is a matter of preference. So
          if people think we're better off with mutables let me know. In this case,
          actually Set.reduce would be more suitable, but it is unavailable in scala
          2.8.x.
        • I would like to further divide the fetch size in ConsumerFetcherThread by
          (queuedchunks.max * numBrokersToFetchFrom) - so regular consumers can also
          bound memory usage. Let me know if there are any objections to that. For
          one, consumer.max.mem may be a better config name than repurposing the
          fetch.size config but at the same time I would prefer not changing any
          configs. I can post that diff separately.
        • The above also raises the separate question of whether we want to queue
          chunks for the replica fetchers. The main issue with this is that advance
          fetch requests from followers could (currently) cause the leader the move
          the HW prematurely. i.e., we would have to handle for that but it might be
          useful to implement this in the future. Right now, all partitions must be
          appended before the next fetch is issued - if we "fetch ahead" then the
          fetch request network I/O can proceed in parallel with disk I/O.

        So this is an 0.8 patch, that can be applied to trunk as well (unless we
        decide it is not a "blocker" that should go only into trunk). After this, we
        can add the additional error code in the FetchResponse if people are okay
        with the overall approach.

        Show
        Joel Koshy added a comment - Here is an updated patch. After discussing with Jay, here is a proposal that the patch implements: fetch.size now means the maximum fetch size that will be used across all partitions in any given multi-fetch attempt. If there are incomplete partitions, then log a warning and then serially attempt fetching each of those partitions with the fetch.size. The main drawbacks are as discussed earlier - i.e., there is a change in the semantics of fetch.size, so many clients may need to be aware of this and reconfigure their fetch size. If there are several incomplete partitions, the serial fetches could cause the consumer to start lagging. However, this is slightly better (and no worse) than the current behavior of getting wedged. Couple other comments: In the patch, partitionFetchSize (in AbstractFetcherThread) may be very small or even zero - but I think that's fine since there is no sensible "minMessageSize" and not increasing the partitionFetchSize when consuming a large number of partitions would be a client misconfiguration. In this patch, the check on whether a partition is incomplete or not is accomplished by fetching with the configured fetch.size and then measuring validBytes. This is wasteful and can be addressed by preventing the broker from responding with an incomplete message, and setting a new error code (say, FetchSizeTooSmall) in the FetchResponse. I think we can defer this to trunk. Another thing that Jay and I discussed a bit: the use of folds vs. mutables. I had refactored the code to use foldLeft to avoid using mutable sets. Folding and immutables forces you to check that you have accounted for all possible paths to the end result - which is helpful especially when there are multiple such paths (e.g., cases in a match). Personally I think it is more readable as well, but that is a matter of preference. So if people think we're better off with mutables let me know. In this case, actually Set.reduce would be more suitable, but it is unavailable in scala 2.8.x. I would like to further divide the fetch size in ConsumerFetcherThread by (queuedchunks.max * numBrokersToFetchFrom) - so regular consumers can also bound memory usage. Let me know if there are any objections to that. For one, consumer.max.mem may be a better config name than repurposing the fetch.size config but at the same time I would prefer not changing any configs. I can post that diff separately. The above also raises the separate question of whether we want to queue chunks for the replica fetchers. The main issue with this is that advance fetch requests from followers could (currently) cause the leader the move the HW prematurely. i.e., we would have to handle for that but it might be useful to implement this in the future. Right now, all partitions must be appended before the next fetch is issued - if we "fetch ahead" then the fetch request network I/O can proceed in parallel with disk I/O. So this is an 0.8 patch, that can be applied to trunk as well (unless we decide it is not a "blocker" that should go only into trunk). After this, we can add the additional error code in the FetchResponse if people are okay with the overall approach.
        Hide
        Jay Kreps added a comment -

        So I guess the one hard requirement is that we have to be able to tell people how much memory our client will use. People have to set a heap size, and if we can't tell them how much memory we will use without crashing their app they will be unhappy.

        Let's consider a hard case: 5 consumer processes, 100 topics, with 5 partitions each, and queue size of 5. With fetch size of 1MB and upper fetch size of 50MB what is a safe heap size for this person to configure and be assured we won't crash their app?

        This is why I don't really see pipelined fetches helping. They have to stich it together into a ByteBuffer in the end, so fetching that in pieces doesn't really help. Supporting non-memory resident messages is possible but would be a massive re-architecture of almost everything.

        Another option I don't think you covered would be to change the fetch request so that it takes a single size rather than one per partition. That would solve one dilemma we currently have--manu topics could have no new data but we have to budget space for them as if they would (since they might), doing this on the server side we could be a little bit smarter. However we would need to ensure that one partition that has infinite data to read can't starve out other partitions.

        Show
        Jay Kreps added a comment - So I guess the one hard requirement is that we have to be able to tell people how much memory our client will use. People have to set a heap size, and if we can't tell them how much memory we will use without crashing their app they will be unhappy. Let's consider a hard case: 5 consumer processes, 100 topics, with 5 partitions each, and queue size of 5. With fetch size of 1MB and upper fetch size of 50MB what is a safe heap size for this person to configure and be assured we won't crash their app? This is why I don't really see pipelined fetches helping. They have to stich it together into a ByteBuffer in the end, so fetching that in pieces doesn't really help. Supporting non-memory resident messages is possible but would be a massive re-architecture of almost everything. Another option I don't think you covered would be to change the fetch request so that it takes a single size rather than one per partition. That would solve one dilemma we currently have--manu topics could have no new data but we have to budget space for them as if they would (since they might), doing this on the server side we could be a little bit smarter. However we would need to ensure that one partition that has infinite data to read can't starve out other partitions.
        Hide
        Joel Koshy added a comment -

        Thanks for bringing this up. I had considered a slight variant of your
        suggestion (although I was thinking of issuing requests with upper fetch
        size for incomplete partitions one partition at a time, instead of another
        multi-fetch on all incomplete partitions as done in this patch). I didn't go
        with it mainly due to the concern (1) that you raise - i.e., to avoid doing
        sequential fetches, although I don't think that is too much of an issue.

        Whether having an upper fetch size config is better than the current set up
        depends on the use case: e.g., if there are only a few partitions that have
        large messages, then the upper fetch size approach would work well. In the
        worst case of all partitions being incomplete that would lead to a large
        allocation - which is also why I felt the "pipelined fetches of incomplete
        partitions" approach added no real value (since it is equivalent in terms of
        the net memory the client is expected to handle).

        Anyway I think the above then leads naturally to your suggestion of using a
        single fetch size and dividing that across all partitions. I like that
        approach - especially since there are no new configs to deal with. I would
        think the memory allocation concerns are valid but tolerable from the
        client's perspective - i.e., the heavy memory allocations only kick in when
        there are incomplete partitions in which case I think most clients would
        want to consume anyway (along with a log warning indicating a large
        message). One minor drawback is that there isn't really a clear default
        value for fetch size - right now, it is reasonable to say with a fetch size
        of 1MB that is also the (approximate) max size of a message. With the above
        re-design we can no longer map the config to messages since there is no
        prior knowledge of number of partitions consumed by each consumer thread,
        but I don't think that's a big deal.

        So as I see it the, choices are:

        1) Change the semantics of fetch size to be net, across all partitions.
        If/when incomplete partitions are encountered, For each incomplete
        partition, issue a fetch request of size fetch.size. I think if we do
        this we should also include the total memory that would be used - i.e.,
        including the queued chunks.

        2) Introduce a new config called upper fetch size that kicks in whenever
        there are incomplete partitions - for which:
        a) issue a multi-fetch request with size upper fetch size for all
        incomplete partitions.
        OR
        b) issue sequential fetch requests of upper fetch size, one incomplete
        partition at a time.

        3) If we had byte-addressability for fetches (which I really think we should
        allow at least for these kinds of internal APIs) we could consider a
        third option: keep fetch size as is, and issue pipelined fetch requests
        to build up and complete incomplete partition, one at a time.

        What do people think?

        Show
        Joel Koshy added a comment - Thanks for bringing this up. I had considered a slight variant of your suggestion (although I was thinking of issuing requests with upper fetch size for incomplete partitions one partition at a time, instead of another multi-fetch on all incomplete partitions as done in this patch). I didn't go with it mainly due to the concern (1) that you raise - i.e., to avoid doing sequential fetches, although I don't think that is too much of an issue. Whether having an upper fetch size config is better than the current set up depends on the use case: e.g., if there are only a few partitions that have large messages, then the upper fetch size approach would work well. In the worst case of all partitions being incomplete that would lead to a large allocation - which is also why I felt the "pipelined fetches of incomplete partitions" approach added no real value (since it is equivalent in terms of the net memory the client is expected to handle). Anyway I think the above then leads naturally to your suggestion of using a single fetch size and dividing that across all partitions. I like that approach - especially since there are no new configs to deal with. I would think the memory allocation concerns are valid but tolerable from the client's perspective - i.e., the heavy memory allocations only kick in when there are incomplete partitions in which case I think most clients would want to consume anyway (along with a log warning indicating a large message). One minor drawback is that there isn't really a clear default value for fetch size - right now, it is reasonable to say with a fetch size of 1MB that is also the (approximate) max size of a message. With the above re-design we can no longer map the config to messages since there is no prior knowledge of number of partitions consumed by each consumer thread, but I don't think that's a big deal. So as I see it the, choices are: 1) Change the semantics of fetch size to be net, across all partitions. If/when incomplete partitions are encountered, For each incomplete partition, issue a fetch request of size fetch.size. I think if we do this we should also include the total memory that would be used - i.e., including the queued chunks. 2) Introduce a new config called upper fetch size that kicks in whenever there are incomplete partitions - for which: a) issue a multi-fetch request with size upper fetch size for all incomplete partitions. OR b) issue sequential fetch requests of upper fetch size, one incomplete partition at a time. 3) If we had byte-addressability for fetches (which I really think we should allow at least for these kinds of internal APIs) we could consider a third option: keep fetch size as is, and issue pipelined fetch requests to build up and complete incomplete partition, one at a time. What do people think?
        Hide
        Jun Rao added a comment -

        That's an interesting alternative. The main drawback that I see is the first problem that you raised. Any consumer that subscribes to a wildcard (e.g., mirror maker) could be consuming a growing # of topics over time. This means that one has to know the number of topic/partitions in order to set max.fetch.size properly and one has to keep tweaking it as the number of topic/partitions changes.

        Show
        Jun Rao added a comment - That's an interesting alternative. The main drawback that I see is the first problem that you raised. Any consumer that subscribes to a wildcard (e.g., mirror maker) could be consuming a growing # of topics over time. This means that one has to know the number of topic/partitions in order to set max.fetch.size properly and one has to keep tweaking it as the number of topic/partitions changes.
        Hide
        Jay Kreps added a comment -

        I have some minor stylistic feedback, but first I think it would be good to discuss the model this implements and get consensus on that.

        My understand of this patch is that it does the following:
        1. multifetch all partitions using a per-partition fetch size based on the configuration the user provides in the consumer config (fetch.size)
        2. check if there are any incomplete fetches and re-fetch these partitions using the consumer config (upper.fetch.size)

        I think this may not be the best approach, but I am not sure. Here is my argument for why this configuration isn't really better than the current setup. Let's say you want to configure your consumer to be reliable and not crash, you need to align your jvm heap settings with your kafka memory usage. How much memory will this configuration use? Well in the worst case all partitions will come back incomplete so you need enough memory for upper.fetch.size * num_partitions. Actually since we have a queue of chunks, it is a multiple of this, but I think we need to fix that as a separate issue, so ignore that for now. Two conclusions from this: (1) the only parameter that matters is upper.fetch.size and if I have sufficient memory for that why not fetch more? (2) the memory I need depends on the number of partitions I am assigned, but this is out of my control (if a consumer dies it will increase) so it is almost impossible to set this right.

        Here is an alternative. Have only one configuration: max.fetch.size which bounds the per-request memory allocation. Instead of using this for ever partition, instead use max.fetch.size/num_partitions so that increasing the number of partitions decreases the fetch size but does not increase memory usage. For incomplete fetches, follow up by doing a sequential fetch for each incomplete partition using the full max.fetch.size for just that partition. The reason I think this is better is that you get a hard bound on memory usage (which in practice you MUST have to run reliably) and this same bound also acts as the limit on the largest message you can handle. The two counter-arguments against this approach are (1) rather than crashing if you add partitions this approach will get slower (due to smaller fetches and eventually sequential fetches) you could argue that crashing is better than slow, (2) there could potentially be memory allocation downsides to doing large allocations in the common case (though there are definitely I/O benefits).

        Let's figure this out and then I will do a more detailed review of the patch.

        Show
        Jay Kreps added a comment - I have some minor stylistic feedback, but first I think it would be good to discuss the model this implements and get consensus on that. My understand of this patch is that it does the following: 1. multifetch all partitions using a per-partition fetch size based on the configuration the user provides in the consumer config (fetch.size) 2. check if there are any incomplete fetches and re-fetch these partitions using the consumer config (upper.fetch.size) I think this may not be the best approach, but I am not sure. Here is my argument for why this configuration isn't really better than the current setup. Let's say you want to configure your consumer to be reliable and not crash, you need to align your jvm heap settings with your kafka memory usage. How much memory will this configuration use? Well in the worst case all partitions will come back incomplete so you need enough memory for upper.fetch.size * num_partitions. Actually since we have a queue of chunks, it is a multiple of this, but I think we need to fix that as a separate issue, so ignore that for now. Two conclusions from this: (1) the only parameter that matters is upper.fetch.size and if I have sufficient memory for that why not fetch more? (2) the memory I need depends on the number of partitions I am assigned, but this is out of my control (if a consumer dies it will increase) so it is almost impossible to set this right. Here is an alternative. Have only one configuration: max.fetch.size which bounds the per-request memory allocation. Instead of using this for ever partition, instead use max.fetch.size/num_partitions so that increasing the number of partitions decreases the fetch size but does not increase memory usage. For incomplete fetches, follow up by doing a sequential fetch for each incomplete partition using the full max.fetch.size for just that partition. The reason I think this is better is that you get a hard bound on memory usage (which in practice you MUST have to run reliably) and this same bound also acts as the limit on the largest message you can handle. The two counter-arguments against this approach are (1) rather than crashing if you add partitions this approach will get slower (due to smaller fetches and eventually sequential fetches) you could argue that crashing is better than slow, (2) there could potentially be memory allocation downsides to doing large allocations in the common case (though there are definitely I/O benefits). Let's figure this out and then I will do a more detailed review of the patch.
        Joel Koshy made changes -
        Attachment KAFKA-598-v1.patch [ 12555619 ]
        Hide
        Joel Koshy added a comment -

        I spent the better part of a day rebasing - serves me right for letting this
        patch sit for so long.

        The basic idea is to keep track of incomplete partitions in a multi-fetch
        request and reissue a fetch request with a higher fetch size for those
        incomplete partitions.

        I had considered the possibility of "pipelining" fetch requests for
        incomplete partitions - i.e., without using an "upper" fetch size. That
        would entail issuing fetch requests at increasing offsets (with the same
        fetch size) until the message is complete - during which the (partial)
        message would be buffered. With this approach we would probably add an
        additional "maxFetchMem" config. However, with logical offsets we don't
        have byte-addressability anymore - so it is not possible right now.
        Furthermore, with a maxFetchMem param it becomes somewhat similar to the
        upperFetchSize approach (in the sense that the client has to be prepared to
        handle a certain amount of memory) - so we don't really gain much. The ideal
        case would be to support streaming over a single fetch request but this is
        obviously a much more complex feature to implement.

        Also fixed a bug in the use of partitionMapLock - i.e., one line synchronized
        on the reentrant lock instead of locking it.

        BTW, for the ReplicaFetchTest change to make sense I could have it expect to
        "fail" with a smaller upper fetch size, and then repeat with a higher upper
        fetch size, but that would add to the test duration - and it's not mocked
        out.

        Show
        Joel Koshy added a comment - I spent the better part of a day rebasing - serves me right for letting this patch sit for so long. The basic idea is to keep track of incomplete partitions in a multi-fetch request and reissue a fetch request with a higher fetch size for those incomplete partitions. I had considered the possibility of "pipelining" fetch requests for incomplete partitions - i.e., without using an "upper" fetch size. That would entail issuing fetch requests at increasing offsets (with the same fetch size) until the message is complete - during which the (partial) message would be buffered. With this approach we would probably add an additional "maxFetchMem" config. However, with logical offsets we don't have byte-addressability anymore - so it is not possible right now. Furthermore, with a maxFetchMem param it becomes somewhat similar to the upperFetchSize approach (in the sense that the client has to be prepared to handle a certain amount of memory) - so we don't really gain much. The ideal case would be to support streaming over a single fetch request but this is obviously a much more complex feature to implement. Also fixed a bug in the use of partitionMapLock - i.e., one line synchronized on the reentrant lock instead of locking it. BTW, for the ReplicaFetchTest change to make sense I could have it expect to "fail" with a smaller upper fetch size, and then repeat with a higher upper fetch size, but that would add to the test duration - and it's not mocked out.
        Joel Koshy made changes -
        Field Original Value New Value
        Assignee Joel Koshy [ jjkoshy ]
        Jun Rao created issue -

          People

          • Assignee:
            Joel Koshy
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:

              Development