Kafka
  1. Kafka
  2. KAFKA-1510

Force offset commits when migrating consumer offsets from zookeeper to kafka

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.2.0
    • Fix Version/s: 0.8.2.0
    • Component/s: None
    • Labels:

      Description

      When migrating consumer offsets from ZooKeeper to kafka, we have to turn on dual-commit (i.e., the consumers will commit offsets to both zookeeper and kafka) in addition to setting offsets.storage to kafka. However, when we commit offsets we only commit offsets if they have changed (since the last commit). For low-volume topics or for topics that receive data in bursts offsets may not move for a long period of time. Therefore we may want to force the commit (even if offsets have not changed) when migrating (i.e., when dual-commit is enabled) - we can add a minimum interval threshold (say force commit after every 10 auto-commits) as well as on rebalance and shutdown.

      Also, I think it is safe to switch the default for offsets.storage from zookeeper to kafka and set the default to dual-commit (for people who have not migrated yet). We have deployed this to the largest consumers at linkedin and have not seen any issues so far (except for the migration caveat that this jira will resolve).

        Activity

        Hide
        Nicolae Marasoiu added a comment -

        Hi, it sounds clear and simple enough, I am going to try this.
        I will probably come back with some questions for the low level detail.

        Why is the offset management moving from zookeeper to kafka? To ease the consumer and favor language proliferation of consumers ? Is kafka managing them through zookeeper as well, behind the scenes, or is it using its own / other cluster / consensus mechanism to store the offsets in a HA manner?

        Show
        Nicolae Marasoiu added a comment - Hi, it sounds clear and simple enough, I am going to try this. I will probably come back with some questions for the low level detail. Why is the offset management moving from zookeeper to kafka? To ease the consumer and favor language proliferation of consumers ? Is kafka managing them through zookeeper as well, behind the scenes, or is it using its own / other cluster / consensus mechanism to store the offsets in a HA manner?
        Hide
        Guozhang Wang added a comment -

        Hi Nicolae,

        Thanks for taking this ticket. You can take a look at the offset management design proposal for the motivations of moving it away from ZK.

        https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management

        Show
        Guozhang Wang added a comment - Hi Nicolae, Thanks for taking this ticket. You can take a look at the offset management design proposal for the motivations of moving it away from ZK. https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management
        Hide
        Joel Koshy added a comment -

        I think it should be sufficient to force commit only on shutdown while dual-commit is enabled. i.e., no need to force commit at intervals.

        Show
        Joel Koshy added a comment - I think it should be sufficient to force commit only on shutdown while dual-commit is enabled. i.e., no need to force commit at intervals.
        Hide
        Joel Koshy added a comment -

        Nicolae Marasoiu do you think you will be able to take this on in the next couple days?

        Show
        Joel Koshy added a comment - Nicolae Marasoiu do you think you will be able to take this on in the next couple days?
        Hide
        Nicolae Marasoiu added a comment -

        Joel Koshy Yes I am going to tackle this these days, have a first patch proposal in the weekend or sooner.

        Show
        Nicolae Marasoiu added a comment - Joel Koshy Yes I am going to tackle this these days, have a first patch proposal in the weekend or sooner.
        Hide
        nicu marasoiu added a comment - - edited

        Hi,
        isAutoCommit argument works exactly the other way around, apparently it is "false" from the scheduled auto commit and to "true" from zkConsConn.commitOffsets()?

        So the migration of offsets from zk to kafka is to : set dual commit and kafka storage, restart consumers, wait for kafka to be copied on the offset commits, and take out dual commit.

        So currently kafka is copied with the offsets only when data flows, and for the purpose of this task, we need to add one or 2 more cases when it is getting the offset: when shutting down, or perhaps periodically.

        So this task applies only when storage==kafka and dualCommit ==true, right?

        I would first ask why the write to zookeeper the new offsets, only if the write to kafka was ok? My assumption is To make sure only one write to zookeeper, even though the process of writing to kafka may involve retries.

        I would write both directions at all time, and perhaps keep 2 checkpoint structures, one kafka one zookeeper.

        I create a patch now with: a forceCommit that will make that all offsets are commited to both kafka and zookeeper when shutting down in dual commit mode.

        The usefulness of committing all offsets not only to kafka but to zookeeper as well comes at least from one reason: the one I mentioned above, that if kafka offset write fails completely, zookeeper is never copied on that.

        Forcing all offsets to zk on shutdown too does indeed have the drawback that it will typically copy the same offsets again, and not only once but potentially several times (if kafka is retried).
        However the alternative is to commit to both kafka and zookeeper unconditionally in the normal flow (right now, the commit to zk happens only after a successful commit to kafka if any). That too poses the same risk of committing multiple times to a system (zk) if the other (kafka) needs retries. So a clean way here would be a completely different OffsetDAO implementation, one on kafka , one on zookeeper, and one on dual mode, and read, as now max(both), while write goes to the 2 implementations, each of them doing retries without affecting the other!

        Show
        nicu marasoiu added a comment - - edited Hi, isAutoCommit argument works exactly the other way around, apparently it is "false" from the scheduled auto commit and to "true" from zkConsConn.commitOffsets()? So the migration of offsets from zk to kafka is to : set dual commit and kafka storage, restart consumers, wait for kafka to be copied on the offset commits, and take out dual commit. So currently kafka is copied with the offsets only when data flows, and for the purpose of this task, we need to add one or 2 more cases when it is getting the offset: when shutting down, or perhaps periodically. So this task applies only when storage==kafka and dualCommit ==true, right? I would first ask why the write to zookeeper the new offsets, only if the write to kafka was ok? My assumption is To make sure only one write to zookeeper, even though the process of writing to kafka may involve retries. I would write both directions at all time, and perhaps keep 2 checkpoint structures, one kafka one zookeeper. I create a patch now with: a forceCommit that will make that all offsets are commited to both kafka and zookeeper when shutting down in dual commit mode. The usefulness of committing all offsets not only to kafka but to zookeeper as well comes at least from one reason: the one I mentioned above, that if kafka offset write fails completely, zookeeper is never copied on that. Forcing all offsets to zk on shutdown too does indeed have the drawback that it will typically copy the same offsets again, and not only once but potentially several times (if kafka is retried). However the alternative is to commit to both kafka and zookeeper unconditionally in the normal flow (right now, the commit to zk happens only after a successful commit to kafka if any). That too poses the same risk of committing multiple times to a system (zk) if the other (kafka) needs retries. So a clean way here would be a completely different OffsetDAO implementation, one on kafka , one on zookeeper, and one on dual mode, and read, as now max(both), while write goes to the 2 implementations, each of them doing retries without affecting the other!
        Hide
        nicu marasoiu added a comment -

        attached patch as per my interpretation and tradeoffs detailed in my comments

        Show
        nicu marasoiu added a comment - attached patch as per my interpretation and tradeoffs detailed in my comments
        Hide
        nicu marasoiu added a comment -

        Joel Koshy Can you please take a look at my comments+code, it will probably take one more iteration at least to make it.

        Show
        nicu marasoiu added a comment - Joel Koshy Can you please take a look at my comments+code, it will probably take one more iteration at least to make it.
        Hide
        nicu marasoiu added a comment -

        Jay Kreps Hi, can you please help me with feedback on my comment + code, or who can I ask, so that I can go in the right direction?

        Show
        nicu marasoiu added a comment - Jay Kreps Hi, can you please help me with feedback on my comment + code, or who can I ask, so that I can go in the right direction?
        Hide
        Joel Koshy added a comment -

        nicu marasoiu - sure thing. Will get back to you on this.

        Show
        Joel Koshy added a comment - nicu marasoiu - sure thing. Will get back to you on this.
        Hide
        Joel Koshy added a comment -

        Re: isAutoCommit:
        Yes you are right. Can you fix that?

        Re: migration steps:
        Yes that is correct. "Wait for kafka to be copied on commits" would essentially mean do one rolling bounce after your patch (since the shutdown guarantees that the offsets would have moved over).

        Re: applies only when storage==kafka and dual-commit==true.
        Yes that is correct.

        Re: why the write to zookeeper the new offsets, only if the write to kafka was ok?
        As you observed, avoid redundant writes to ZooKeeper, but also for consistency between ZK and Kafka (although this does not matter too much since while transitioning they are inconsistent to start with).

        Re: separate DAOs
        We probably don't need that since we plan to eventually remove support for ZooKeeper-based offsets from the server-side - i.e., a consumer can choose to store offsets elsewhere (including ZooKeeper) but support for doing that via the OffsetCommitRequest channel will be removed.

        Your patch looks good, but I'm unclear on why you need the "|| forceCommit" on line 318

        Show
        Joel Koshy added a comment - Re: isAutoCommit: Yes you are right. Can you fix that? Re: migration steps: Yes that is correct. "Wait for kafka to be copied on commits" would essentially mean do one rolling bounce after your patch (since the shutdown guarantees that the offsets would have moved over). Re: applies only when storage==kafka and dual-commit==true. Yes that is correct. Re: why the write to zookeeper the new offsets, only if the write to kafka was ok? As you observed, avoid redundant writes to ZooKeeper, but also for consistency between ZK and Kafka (although this does not matter too much since while transitioning they are inconsistent to start with). Re: separate DAOs We probably don't need that since we plan to eventually remove support for ZooKeeper-based offsets from the server-side - i.e., a consumer can choose to store offsets elsewhere (including ZooKeeper) but support for doing that via the OffsetCommitRequest channel will be removed. Your patch looks good, but I'm unclear on why you need the "|| forceCommit" on line 318
        Hide
        nicu marasoiu added a comment -

        Hi, the "|| forceCommit" on line 318 is meant to ensure write to zookeeper on shutdown even in the situation when kafka commits do not work.

        Show
        nicu marasoiu added a comment - Hi, the "|| forceCommit" on line 318 is meant to ensure write to zookeeper on shutdown even in the situation when kafka commits do not work.
        Hide
        Joel Koshy added a comment -

        I see - my thinking was since we retry indefinitely until a successful commit it only needs to be done at the end (once) after a successful commit to Kafka.

        So to summarize - let me know if you have any comments/questions:

        • Can you fix the issue you caught with the isAutoCommit flag?
        • Probably unnecessary to have the "|| forceCommit"
        • Also, as mentioned in the summary I think it is reasonable to switch the default offsets.storage to Kafka and set dual.commit to true.
        • Can you also run the unit tests and verify? It will be useful to also run the system tests (at least the mirror maker test suite). See https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests#KafkaSystemTests-RunningSystemTest for more information on this - it should be sufficient to just run the mirror maker tests.
        Show
        Joel Koshy added a comment - I see - my thinking was since we retry indefinitely until a successful commit it only needs to be done at the end (once) after a successful commit to Kafka. So to summarize - let me know if you have any comments/questions: Can you fix the issue you caught with the isAutoCommit flag? Probably unnecessary to have the "|| forceCommit" Also, as mentioned in the summary I think it is reasonable to switch the default offsets.storage to Kafka and set dual.commit to true. Can you also run the unit tests and verify? It will be useful to also run the system tests (at least the mirror maker test suite). See https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests#KafkaSystemTests-RunningSystemTest for more information on this - it should be sufficient to just run the mirror maker tests.
        Hide
        Nicolae Marasoiu added a comment -

        Where is the indefinite retry you mention? I don't think it is indefinite..

        Show
        Nicolae Marasoiu added a comment - Where is the indefinite retry you mention? I don't think it is indefinite..
        Hide
        Nicolae Marasoiu added a comment -

        It is a limited retry , and I found no easy way to determine if this is the
        last attempt, so that I write to zookeeper as well. This can be taken out
        to a different jira task too, but I would let the zookeeper ensure-commit
        here too, and regard this task as a "make all efforts to commit all offsets
        to both storages at shutdown" and rename as such.

        On Sat, Aug 2, 2014 at 3:28 PM, Nicolae Marasoiu <nicolae.marasoiu@gmail.com

        Show
        Nicolae Marasoiu added a comment - It is a limited retry , and I found no easy way to determine if this is the last attempt, so that I write to zookeeper as well. This can be taken out to a different jira task too, but I would let the zookeeper ensure-commit here too, and regard this task as a "make all efforts to commit all offsets to both storages at shutdown" and rename as such. On Sat, Aug 2, 2014 at 3:28 PM, Nicolae Marasoiu <nicolae.marasoiu@gmail.com
        Hide
        nicu marasoiu added a comment -

        Hi,

        I have given it more consideration, and indeed to "||force" on 318 it is a different concern, which can be taken to another task.
        The risk which it would solve, is that when kafka is out for the limited retry count during shutdown, at least zookeeper would get the offsets, and the consumer will not rewind. However it is low probability that both systems are down, so zookeeper would likely be up to date when kafka is down, for instance. The probability that zookeeper will get flooded with all offsets multiple times kafka is retried is comparable to that low probability.

        So, for this task, I take out that line 318 part of the patch, test went fine.

        I will create another task for isAutoCommit issue and analyze if the meaning is truly reversed, cause I feel it is only partially and perhaps used correctly with the reversed name, and it is mostly diffent thing.

        I will do the config changes, no prob - switch the default offsets.storage to Kafka and set dual.commit to true.

        Show
        nicu marasoiu added a comment - Hi, I have given it more consideration, and indeed to "||force" on 318 it is a different concern, which can be taken to another task. The risk which it would solve, is that when kafka is out for the limited retry count during shutdown, at least zookeeper would get the offsets, and the consumer will not rewind. However it is low probability that both systems are down, so zookeeper would likely be up to date when kafka is down, for instance. The probability that zookeeper will get flooded with all offsets multiple times kafka is retried is comparable to that low probability. So, for this task, I take out that line 318 part of the patch, test went fine. I will create another task for isAutoCommit issue and analyze if the meaning is truly reversed, cause I feel it is only partially and perhaps used correctly with the reversed name, and it is mostly diffent thing. I will do the config changes, no prob - switch the default offsets.storage to Kafka and set dual.commit to true.
        Hide
        nicu marasoiu added a comment -

        re-uploaded the patch

        Show
        nicu marasoiu added a comment - re-uploaded the patch
        Hide
        Joel Koshy added a comment -

        nicu marasoiu I realized later there is actually a flaw in how we get rid of offset commits from old (non-existent) consumers.

        As of now, the offset manager does the following: it periodically goes through its entire cache (i.e., hashtable of offsets) and extracts those entries that have a timestamp earlier than some staleness threshold. It then proceeds to add tombstones for those entries in the offsets commit log.

        The problem with this approach as it stands is similar to the original issue that this jira intended to address. A live consumer may be consuming a low volume topic and its offset may change infrequently. i.e., its offset may not move within the staleness threshold. If we delete the offset and a consumer rebalance occurs and fetches that offset, then depending on the auto.offset.reset configuration, it will pick up the new latest offset of the topic (in which case the consumer could lose some messages) or the earliest offset (in which case the consumer will see duplicates).

        I think the fix for this is the following and I'm backtracking to what I earlier wrote and later (incorrectly) thought was unnecessary:

        A consumer implementation can optionally choose to selectively commit only offsets that have changed since the last commit. HOWEVER, there should be a configurable interval at which the consumer should always commit ALL its offsets regardless of whether it has changed or not.

        Show
        Joel Koshy added a comment - nicu marasoiu I realized later there is actually a flaw in how we get rid of offset commits from old (non-existent) consumers. As of now, the offset manager does the following: it periodically goes through its entire cache (i.e., hashtable of offsets) and extracts those entries that have a timestamp earlier than some staleness threshold. It then proceeds to add tombstones for those entries in the offsets commit log. The problem with this approach as it stands is similar to the original issue that this jira intended to address. A live consumer may be consuming a low volume topic and its offset may change infrequently. i.e., its offset may not move within the staleness threshold. If we delete the offset and a consumer rebalance occurs and fetches that offset, then depending on the auto.offset.reset configuration, it will pick up the new latest offset of the topic (in which case the consumer could lose some messages) or the earliest offset (in which case the consumer will see duplicates). I think the fix for this is the following and I'm backtracking to what I earlier wrote and later (incorrectly) thought was unnecessary: A consumer implementation can optionally choose to selectively commit only offsets that have changed since the last commit. HOWEVER, there should be a configurable interval at which the consumer should always commit ALL its offsets regardless of whether it has changed or not.
        Hide
        Joel Koshy added a comment -

        Would you be able to update your patch to take into account the above?

        Show
        Joel Koshy added a comment - Would you be able to update your patch to take into account the above?
        Hide
        Jun Rao added a comment -

        Thinking about this a bit more, would it be more reliable to do the expiration of an offset based on the last connect time from the client, instead of the last time the offset is modified? In the new consumer, we will be tracking the set of consumers per consumer group on the broker. We can expire an offset if the time since the last time the partition was actively owned by a consumer exceeds the threshold. Handling consumer coordinator failover can be a bit tricky. We can probably just start doing the expiration countdown from the beginning during the failover. This means that the removal of some of the offsets may be delayed. This maybe ok since the consumer coordinator failover should be rare.

        Show
        Jun Rao added a comment - Thinking about this a bit more, would it be more reliable to do the expiration of an offset based on the last connect time from the client, instead of the last time the offset is modified? In the new consumer, we will be tracking the set of consumers per consumer group on the broker. We can expire an offset if the time since the last time the partition was actively owned by a consumer exceeds the threshold. Handling consumer coordinator failover can be a bit tricky. We can probably just start doing the expiration countdown from the beginning during the failover. This means that the removal of some of the offsets may be delayed. This maybe ok since the consumer coordinator failover should be rare.
        Hide
        nicu marasoiu added a comment -

        ok, so I have not fully understood, but what I think I did, is that for the moment there is no clear decision on any modifications to the patch as it is now, the way that i understand it

        Show
        nicu marasoiu added a comment - ok, so I have not fully understood, but what I think I did, is that for the moment there is no clear decision on any modifications to the patch as it is now, the way that i understand it
        Hide
        Joel Koshy added a comment -

        Jun Rao yes that would make sense. However, we don't have this tracking implemented in the existing consumer and I would rather not add that feature now just to fix an existing bug. i.e., I think we should just fix the current issue by forcing commits (either periodically or always) when using Kafka-based offset storage.

        Show
        Joel Koshy added a comment - Jun Rao yes that would make sense. However, we don't have this tracking implemented in the existing consumer and I would rather not add that feature now just to fix an existing bug. i.e., I think we should just fix the current issue by forcing commits (either periodically or always) when using Kafka-based offset storage.
        Hide
        Joel Koshy added a comment -

        After some discussion with Guozhang Wang and Jun Rao here are some additional comments to help clarify my earlier reasoning:

        In order to migrate offsets from ZooKeeper to Kafka, at minimum we need to force an unfiltered commit (regardless of whether offsets have changed or not) at some point - e.g., shut down of the consumer.

        An orthogonal issue is that of a consumer that consumes a low-volume topic. i.e., if the offsets don't change within the offset retention threshold on the offset manager (defaults to one day) then those offsets will be deleted. If the consumer fails for any reason and does an offset fetch, it will reset to earliest or latest. We have a couple of options:

        • One possible approach to address this is to configure the broker-side offset retention period to a large value - i.e., larger than the maximum retention period of any topic. This is not ideal because: (a) if there are short-lived (say, console-) consumers that come and go often then those offsets can sit around for a long time; (b) in general, you cannot really come up with a retention period for a compacted topics. So I would not want to do this, but I wrote this here for completeness.
        • Another approach is to do UN-filtered commits if offsets.storage is set to Kafka. i.e., commit everything always.
        • Yet another approach is to do unfiltered commits at a configurable interval.

        Thoughts?

        My preference after thinking about it is to go with the second approach.

        Show
        Joel Koshy added a comment - After some discussion with Guozhang Wang and Jun Rao here are some additional comments to help clarify my earlier reasoning: In order to migrate offsets from ZooKeeper to Kafka, at minimum we need to force an unfiltered commit (regardless of whether offsets have changed or not) at some point - e.g., shut down of the consumer. An orthogonal issue is that of a consumer that consumes a low-volume topic. i.e., if the offsets don't change within the offset retention threshold on the offset manager (defaults to one day) then those offsets will be deleted. If the consumer fails for any reason and does an offset fetch, it will reset to earliest or latest. We have a couple of options: One possible approach to address this is to configure the broker-side offset retention period to a large value - i.e., larger than the maximum retention period of any topic. This is not ideal because: (a) if there are short-lived (say, console-) consumers that come and go often then those offsets can sit around for a long time; (b) in general, you cannot really come up with a retention period for a compacted topics. So I would not want to do this, but I wrote this here for completeness. Another approach is to do UN-filtered commits if offsets.storage is set to Kafka. i.e., commit everything always. Yet another approach is to do unfiltered commits at a configurable interval. Thoughts? My preference after thinking about it is to go with the second approach.
        Hide
        Jun Rao added a comment -

        Yes, I agree that always do unfiltered commits when offset.storage is used is the simplest and is good enough.

        Show
        Jun Rao added a comment - Yes, I agree that always do unfiltered commits when offset.storage is used is the simplest and is good enough.
        Hide
        Joel Koshy added a comment -

        nicu marasoiu can you update your patch to do unfiltered commits if offsets.storage is kafka? Or if you have any other preference in approach, feel free to comment.

        Show
        Joel Koshy added a comment - nicu marasoiu can you update your patch to do unfiltered commits if offsets.storage is kafka? Or if you have any other preference in approach, feel free to comment.
        Hide
        Joel Koshy added a comment -

        nicu marasoiu do you think you will have time to wrap this up? If not, let me know or if you need any clarification.

        Show
        Joel Koshy added a comment - nicu marasoiu do you think you will have time to wrap this up? If not, let me know or if you need any clarification.
        Hide
        Nicolae Marasoiu added a comment -

        Hi, I will attach a patch with unfiltered commits today, or latest tomorrow;

        So to clarify the requirement, my understanding is that, when storage =
        kafka and dualcommit = enabled (or just one of those?), we are going to
        make each and every offsets commit to kafka an unfiltered one i.e. commit
        all offsets regardless of change or not; do we do this unfiltered commit
        also on zookeeper?

        Thanks
        Nicu

        Show
        Nicolae Marasoiu added a comment - Hi, I will attach a patch with unfiltered commits today, or latest tomorrow; So to clarify the requirement, my understanding is that, when storage = kafka and dualcommit = enabled (or just one of those?), we are going to make each and every offsets commit to kafka an unfiltered one i.e. commit all offsets regardless of change or not; do we do this unfiltered commit also on zookeeper? Thanks Nicu
        Hide
        Joel Koshy added a comment -

        nicu marasoiu that's right - whenever storage=kafka, we should to unfiltered commits to kafka. In other words, ALL offset commits sent to kafka should be unfiltered.

        As for commits to zookeeper: if storage=zookeeper then we can do filtered commits. If storage=kafka and dual.commit=enabled then if the code doesn't get too complicated we should continue to do filtered commits to zookeeper (but unfiltered to kafka).

        Show
        Joel Koshy added a comment - nicu marasoiu that's right - whenever storage=kafka, we should to unfiltered commits to kafka. In other words, ALL offset commits sent to kafka should be unfiltered. As for commits to zookeeper: if storage=zookeeper then we can do filtered commits. If storage=kafka and dual.commit=enabled then if the code doesn't get too complicated we should continue to do filtered commits to zookeeper (but unfiltered to kafka).
        Hide
        nicu marasoiu added a comment -

        Hi, I will do this tomorrow, Friday, so I hope you will have a patch Friday morning your time.

        So you say that the only condition to do unfiltered is kafka storage, regardless of dual commit mode or single commit mode, yes?

        Thanks,
        Nicu

        Show
        nicu marasoiu added a comment - Hi, I will do this tomorrow, Friday, so I hope you will have a patch Friday morning your time. So you say that the only condition to do unfiltered is kafka storage, regardless of dual commit mode or single commit mode, yes? Thanks, Nicu
        Hide
        Nicolae Marasoiu added a comment -

        Patch to push unfiltered offsets to both Kafka and potentially Zookeeper when Kafka is configured to be the offset storage

        Show
        Nicolae Marasoiu added a comment - Patch to push unfiltered offsets to both Kafka and potentially Zookeeper when Kafka is configured to be the offset storage
        Hide
        nicu marasoiu added a comment -

        Attached a patch - I am doing unfiltered commits to kafka and using offsets checkpoint Map for zookeeper incremental commits only (in both zk storage and dual commit modes) - its reads and mutations are now part of the commitToZk method exclusively in the suggested approach.

        Right now, the rearrangement of topic topicRegistry into offsettsToCommit, a different reified structure with no more filtering in the process of its reification seems a bit futile, but because we got .size if, and we got usage of the structure below, and to minimize changes brought by this task (and leave them for an obvious future need of refactoring on the bigger scale this class), I let it like this.

        The other optimization I could do, but not included in the patch, is to keep a state of the commit timestamp for each partition, and use that for filtering commits to kafka, based on a configurable maximum idleness of the partition offset commit for each partition.

        A more primitive form of the same optimization, that would only protect from repeatedly committing to good brokers because of the broken ones, I could have such a state in a local structure for the duration of the method, just to make sure we keep retrying only the failed commits.

        Show
        nicu marasoiu added a comment - Attached a patch - I am doing unfiltered commits to kafka and using offsets checkpoint Map for zookeeper incremental commits only (in both zk storage and dual commit modes) - its reads and mutations are now part of the commitToZk method exclusively in the suggested approach. Right now, the rearrangement of topic topicRegistry into offsettsToCommit, a different reified structure with no more filtering in the process of its reification seems a bit futile, but because we got .size if, and we got usage of the structure below, and to minimize changes brought by this task (and leave them for an obvious future need of refactoring on the bigger scale this class), I let it like this. The other optimization I could do, but not included in the patch, is to keep a state of the commit timestamp for each partition, and use that for filtering commits to kafka, based on a configurable maximum idleness of the partition offset commit for each partition. A more primitive form of the same optimization, that would only protect from repeatedly committing to good brokers because of the broken ones, I could have such a state in a local structure for the duration of the method, just to make sure we keep retrying only the failed commits.
        Hide
        nicu marasoiu added a comment -

        Joel Koshy Hi, can you check my patch + comments & provide feedback pls?

        Show
        nicu marasoiu added a comment - Joel Koshy Hi, can you check my patch + comments & provide feedback pls?
        Hide
        Joel Koshy added a comment -

        nicu marasoiu Your patch looks good to me - however, it does not cleanly apply on the latest trunk. Would you mind rebasing? If you don't have time I can take care of it as well.

        Show
        Joel Koshy added a comment - nicu marasoiu Your patch looks good to me - however, it does not cleanly apply on the latest trunk. Would you mind rebasing? If you don't have time I can take care of it as well.
        Hide
        nicu marasoiu added a comment -

        Attached rebased patch

        Show
        nicu marasoiu added a comment - Attached rebased patch
        Hide
        Joel Koshy added a comment -

        Thanks for the patch. +1 and committed to trunk.

        Show
        Joel Koshy added a comment - Thanks for the patch. +1 and committed to trunk.

          People

          • Assignee:
            Joel Koshy
            Reporter:
            Joel Koshy
            Reviewer:
            Joel Koshy
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development