Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0, 0.8.1, 0.8.1.1
    • Fix Version/s: 0.8.3
    • Component/s: replication
    • Labels:

      Description

      Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster.
      For the low-volume topic it will take a very long time to detect a lagging
      replica, and for the high-volume topic it will have false-positives.
      One approach to making this easier would be to have the configuration
      be something like replica.lag.max.ms and translate this into a number
      of messages dynamically based on the throughput of the partition.

      1. KAFKA-1546.patch
        34 kB
        Aditya A Auradkar
      2. KAFKA-1546_2015-03-11_18:48:09.patch
        36 kB
        Aditya A Auradkar
      3. KAFKA-1546_2015-03-12_13:42:01.patch
        37 kB
        Aditya A Auradkar
      4. KAFKA-1546_2015-03-16_11:31:39.patch
        54 kB
        Aditya A Auradkar
      5. KAFKA-1546_2015-03-17_14:46:10.patch
        63 kB
        Aditya A Auradkar
      6. KAFKA-1546_2015-03-25_13:27:40.patch
        63 kB
        Aditya A Auradkar
      7. KAFKA-1546_2015-03-26_17:44:08.patch
        79 kB
        Aditya A Auradkar
      8. KAFKA-1546_2015-03-27_11:57:56.patch
        81 kB
        Aditya A Auradkar
      9. documentation.diff
        3 kB
        Aditya Auradkar
      10. documentation.diff
        3 kB
        Aditya Auradkar

        Activity

        Hide
        Jay Kreps added a comment -

        I think this is actually a really important thing to get right to make replication reliable. There are some subtleties. It would be good to work out the basics of how this could work on this JIRA.

        For example the throughput on a partition might be 1 msg/sec. But that is because only 1 msg/sec is being written by the producer. However if someone writes a batch of 1000 messages, that doesn't mean we are necessarily 1000 seconds behind.

        We already track the time since the last fetch request. So if the fetcher stops entirely for too long it will be caught.

        I think the other condition we want to be able to catch is one where the fetcher is still fetching but it is behind and likely won't catch up. One way to make "caught-up" concrete is to say that the last fetch went to the end of the log. We potentially reduce this to one config and just have replica.lag.time.ms which would both be the maximum time since a fetch or the maximum amount of time without catching up to the leader. The implementation would be that every time a fetch didn't go to the logEndOffset we would set the lag clock and it would only reset when a fetch request finally went all the way to the logEndOffset.

        Show
        Jay Kreps added a comment - I think this is actually a really important thing to get right to make replication reliable. There are some subtleties. It would be good to work out the basics of how this could work on this JIRA. For example the throughput on a partition might be 1 msg/sec. But that is because only 1 msg/sec is being written by the producer. However if someone writes a batch of 1000 messages, that doesn't mean we are necessarily 1000 seconds behind. We already track the time since the last fetch request. So if the fetcher stops entirely for too long it will be caught. I think the other condition we want to be able to catch is one where the fetcher is still fetching but it is behind and likely won't catch up. One way to make "caught-up" concrete is to say that the last fetch went to the end of the log. We potentially reduce this to one config and just have replica.lag.time.ms which would both be the maximum time since a fetch or the maximum amount of time without catching up to the leader. The implementation would be that every time a fetch didn't go to the logEndOffset we would set the lag clock and it would only reset when a fetch request finally went all the way to the logEndOffset.
        Hide
        Jun Rao added a comment -

        Jay, I think what you proposed is nice and simple. I think it works. One subtlety is dealing with max.wait.ms in the follower fetch request. Imagine that a follower has caught up and its fetch request is sitting in the purgatory. The last caught up time won't be updated for max.wait.ms if no new messages come in. When a message does come in, if max.wait.ms is larger than replica.lag.time.ms, the follower is now considered out of sync. Perhaps we should use the timestamp of when the first message is appended to the leader after the last caught up time. If the amount of time since then is more than replica.lag.time.ms, the replica is considered out of sync.

        Show
        Jun Rao added a comment - Jay, I think what you proposed is nice and simple. I think it works. One subtlety is dealing with max.wait.ms in the follower fetch request. Imagine that a follower has caught up and its fetch request is sitting in the purgatory. The last caught up time won't be updated for max.wait.ms if no new messages come in. When a message does come in, if max.wait.ms is larger than replica.lag.time.ms, the follower is now considered out of sync. Perhaps we should use the timestamp of when the first message is appended to the leader after the last caught up time. If the amount of time since then is more than replica.lag.time.ms, the replica is considered out of sync.
        Hide
        Jay Kreps added a comment - - edited

        I think I was being a little vague. What I was trying to say is this. When each fetch is serviced we check

          if(!fetchedData.readToEndOfLog)
             this.lagBegin = System.currentTimeMillis()
          else
             this.lagBegin = -1
        

        Then the liveness criteria is

         partitionLagging = this.lagBegin > 0 && System.currentTimeMillis() - this.lagBegin > REPLICA_LAG_TIME_MS
        
        Show
        Jay Kreps added a comment - - edited I think I was being a little vague. What I was trying to say is this. When each fetch is serviced we check if (!fetchedData.readToEndOfLog) this .lagBegin = System .currentTimeMillis() else this .lagBegin = -1 Then the liveness criteria is partitionLagging = this .lagBegin > 0 && System .currentTimeMillis() - this .lagBegin > REPLICA_LAG_TIME_MS
        Hide
        Jun Rao added a comment -

        Yes, that works. So we will have to return enough info in Log.read to derive fetchedData.readToEndOfLog.

        Show
        Jun Rao added a comment - Yes, that works. So we will have to return enough info in Log.read to derive fetchedData.readToEndOfLog.
        Hide
        Sriram Subramanian added a comment -

        the lagBegin does not persist across shutdowns or leader transitions. A safe assumption to make is that all fetchers are lagging when a node becomes a leader till we get the first fetch. This would ensure we don't assume there is no lag when a fetcher is down and a new leader is elected.

        Show
        Sriram Subramanian added a comment - the lagBegin does not persist across shutdowns or leader transitions. A safe assumption to make is that all fetchers are lagging when a node becomes a leader till we get the first fetch. This would ensure we don't assume there is no lag when a fetcher is down and a new leader is elected.
        Hide
        Aditya Auradkar added a comment -

        nicu marasoiu are you actively working on this? I was planning on picking it up.

        Show
        Aditya Auradkar added a comment - nicu marasoiu are you actively working on this? I was planning on picking it up.
        Hide
        Aditya Auradkar added a comment -

        nicu marasoiu I'm going to assign this to myself since I haven't heard back.

        Show
        Aditya Auradkar added a comment - nicu marasoiu I'm going to assign this to myself since I haven't heard back.
        Hide
        Aditya Auradkar added a comment -

        I do have a concern about the heuristic. Jay Kreps Using your example:

        "if(!fetchedData.readToEndOfLog)
        this.lagBegin = System.currentTimeMillis()
        else
        this.lagBegin = -1

        Then the liveness criteria is
        partitionLagging = this.lagBegin > 0 && System.currentTimeMillis() - this.lagBegin > REPLICA_LAG_TIME_MS"

        The time counter starts when the read doesn't go the end of log and only stops when it does reach the end. In this case, the lag measures the absolute duration of time for which this replica is lagging but not how far behind it is in terms of applying commits. For example a replica could be catching up quickly but the "replica.lag.max.ms" counter would still increase until it fully catches up and then it will abruptly drop to zero.

        Show
        Aditya Auradkar added a comment - I do have a concern about the heuristic. Jay Kreps Using your example: "if(!fetchedData.readToEndOfLog) this.lagBegin = System.currentTimeMillis() else this.lagBegin = -1 Then the liveness criteria is partitionLagging = this.lagBegin > 0 && System.currentTimeMillis() - this.lagBegin > REPLICA_LAG_TIME_MS" The time counter starts when the read doesn't go the end of log and only stops when it does reach the end. In this case, the lag measures the absolute duration of time for which this replica is lagging but not how far behind it is in terms of applying commits. For example a replica could be catching up quickly but the "replica.lag.max.ms" counter would still increase until it fully catches up and then it will abruptly drop to zero.
        Hide
        Jay Kreps added a comment -

        Yeah that is true. I think we are in agreement that we want to express this in terms of time not # messages.

        The criteria I was proposing is "not caught up for N ms" where the definition of not caught up is "reads to the end of the log".

        I think what you are proposing is "will take more than N ms to catch up." Originally I had thought a little about this. However this criteria is a lot harder to calculate. In order to predict the time to catch up you need to estimate the rate at which messages will be read in the future (e.g. if I am 1000 messages behind and reading at 500 msg/sec then it will take 2 seconds). I was concerned that any estimate would be really fragile since the whole point of a failure is that it changes this kind of rate in some way (because a replica is slow, or messages got bigger, or whatever) so predictions off past rates may be wrong once the (possibly) soft failure happens.

        I think the motivation for the criteria I was proposing was that any caught up reader should always be at the end of the log (that is the definition of caught up) and if you go for a period of time without being at the end then likely you won't get to the end soon. You could imagine some situation in which somehow the follower was able to exactly keep up but was always one message behind the end of the log in which case we would falsely failure detect the follower. However I think this would be unlikely and failure detecting is actually probably okay since you are exactly on the verge of overwhelmed (one byte per second more throughput and you will be dead).

        Let me know if you think that makes sense, I could definitely be convinced it would be better a different way.

        Show
        Jay Kreps added a comment - Yeah that is true. I think we are in agreement that we want to express this in terms of time not # messages. The criteria I was proposing is "not caught up for N ms" where the definition of not caught up is "reads to the end of the log". I think what you are proposing is "will take more than N ms to catch up." Originally I had thought a little about this. However this criteria is a lot harder to calculate. In order to predict the time to catch up you need to estimate the rate at which messages will be read in the future (e.g. if I am 1000 messages behind and reading at 500 msg/sec then it will take 2 seconds). I was concerned that any estimate would be really fragile since the whole point of a failure is that it changes this kind of rate in some way (because a replica is slow, or messages got bigger, or whatever) so predictions off past rates may be wrong once the (possibly) soft failure happens. I think the motivation for the criteria I was proposing was that any caught up reader should always be at the end of the log (that is the definition of caught up) and if you go for a period of time without being at the end then likely you won't get to the end soon. You could imagine some situation in which somehow the follower was able to exactly keep up but was always one message behind the end of the log in which case we would falsely failure detect the follower. However I think this would be unlikely and failure detecting is actually probably okay since you are exactly on the verge of overwhelmed (one byte per second more throughput and you will be dead). Let me know if you think that makes sense, I could definitely be convinced it would be better a different way.
        Hide
        Neha Narkhede added a comment -

        For example a replica could be catching up quickly but the "replica.lag.max.ms" counter would still increase until it fully catches up and then it will abruptly drop to zero.

        What we want to measure is not exactly how slow it is but express lag in terms of maximum time spent not catching up with the leader. The check that Jay mentions can be improved a little. Basically, if the replica is at the log end offset, then we don't want to check for lagBegin at all. The first time a replica starts lagging, you set the lagBegin to current time. From there on, you only reset it to -1 if it reaches log end offset.

        This will remove a replica that keeps fetching but is unable to catch up with the leader for "replica.lag.max.ms".

        So the check is more like-

        if(!fetchedData.readToEndOfLog) {
           if(lagBegin == -1) { 
             this.lagBegin = System.currentTimeMillis() 
           }
        } else {
          this.lagBegin = -1 
        }
        

        Then the liveness criteria is
        partitionLagging = this.lagBegin > 0 && System.currentTimeMillis() - this.lagBegin > REPLICA_LAG_TIME_MS

        In order to do this, LogReadResult might have to return the log end offset as well.

        Show
        Neha Narkhede added a comment - For example a replica could be catching up quickly but the "replica.lag.max.ms" counter would still increase until it fully catches up and then it will abruptly drop to zero. What we want to measure is not exactly how slow it is but express lag in terms of maximum time spent not catching up with the leader. The check that Jay mentions can be improved a little. Basically, if the replica is at the log end offset, then we don't want to check for lagBegin at all. The first time a replica starts lagging, you set the lagBegin to current time. From there on, you only reset it to -1 if it reaches log end offset. This will remove a replica that keeps fetching but is unable to catch up with the leader for "replica.lag.max.ms". So the check is more like- if (!fetchedData.readToEndOfLog) { if (lagBegin == -1) { this .lagBegin = System .currentTimeMillis() } } else { this .lagBegin = -1 } Then the liveness criteria is partitionLagging = this.lagBegin > 0 && System.currentTimeMillis() - this.lagBegin > REPLICA_LAG_TIME_MS In order to do this, LogReadResult might have to return the log end offset as well.
        Hide
        Jiangjie Qin added a comment -

        I'm not sure if my concern is valid. If we have many producers producing messages to a partition, it's possible that after we fulfill a fetch request from replica fetcher but before we check if the log is caught up to log end or not, some new messages are appended. In this case, we will never be able to really caught up to log end.
        Maybe I understood it wrong, but I think what Neha Narkhede proposed before seems work, which is
        1. Have a time criteria, a fetch request must be received from the follower in 10 secs.
        2. Instead of a fixed number of max message lag, say 4000, use the number of (message-in-rate * maxLagMs) as the max message lag threshold.
        This way we can handle both busy topics and low-volume topics.

        Show
        Jiangjie Qin added a comment - I'm not sure if my concern is valid. If we have many producers producing messages to a partition, it's possible that after we fulfill a fetch request from replica fetcher but before we check if the log is caught up to log end or not, some new messages are appended. In this case, we will never be able to really caught up to log end. Maybe I understood it wrong, but I think what Neha Narkhede proposed before seems work, which is 1. Have a time criteria, a fetch request must be received from the follower in 10 secs. 2. Instead of a fixed number of max message lag, say 4000, use the number of (message-in-rate * maxLagMs) as the max message lag threshold. This way we can handle both busy topics and low-volume topics.
        Hide
        Aditya Auradkar added a comment - - edited

        I agree we should model this in terms of time and not in terms of messages. While I think it is a bit more natural to model replication lag in terms of "will take more than N ms to catch up.", I also agree it is tricky to implement correctly.

        One possible way to model it is to associate an offset with a commit timestamp at the source. For example, assume that a message with offset O is produced on the leader for partition X at timestamp T1. If the time now is T2 and a replica's log end offset is O (i.e. it is has consumed till O), then the lag can be (T2-T1). Is there any easy way to obtain the source timestamp given an offset?

        If this isn't feasible, then I do think that the heuristic proposed in Neha's comment is a good one.. and I will submit a patch for it.

        Also, there are currently 2 checks for replica lag (in ISR).
        a. keepInSyncMessages - This tracks replica lag as a function of the number of messages it is trailing behind. I believe we will remove this entirely regardless of the approach we choose.
        b. keepInSyncTimeMs - This tracks the amount of time between fetch requests. I think we can remove this as well.

        Show
        Aditya Auradkar added a comment - - edited I agree we should model this in terms of time and not in terms of messages. While I think it is a bit more natural to model replication lag in terms of "will take more than N ms to catch up.", I also agree it is tricky to implement correctly. One possible way to model it is to associate an offset with a commit timestamp at the source. For example, assume that a message with offset O is produced on the leader for partition X at timestamp T1. If the time now is T2 and a replica's log end offset is O (i.e. it is has consumed till O), then the lag can be (T2-T1). Is there any easy way to obtain the source timestamp given an offset? If this isn't feasible, then I do think that the heuristic proposed in Neha's comment is a good one.. and I will submit a patch for it. Also, there are currently 2 checks for replica lag (in ISR). a. keepInSyncMessages - This tracks replica lag as a function of the number of messages it is trailing behind. I believe we will remove this entirely regardless of the approach we choose. b. keepInSyncTimeMs - This tracks the amount of time between fetch requests. I think we can remove this as well.
        Hide
        Joel Koshy added a comment -

        Re: your concern - yes that does seem to be valid. For it to work the LogReadResult should probably return the log-end-offset just before the read starts. i.e., a sort of before image.

        Show
        Joel Koshy added a comment - Re: your concern - yes that does seem to be valid. For it to work the LogReadResult should probably return the log-end-offset just before the read starts. i.e., a sort of before image.
        Hide
        Joel Koshy added a comment -

        No, we don't have any timestamp currently associated with messages/offsets.

        Show
        Joel Koshy added a comment - No, we don't have any timestamp currently associated with messages/offsets.
        Hide
        Neha Narkhede added a comment -

        If this isn't feasible, then I do think that the heuristic proposed in Neha's comment is a good one.. and I will submit a patch for it.

        Sounds good. Will help you review it.

        a. keepInSyncMessages - This tracks replica lag as a function of the number of messages it is trailing behind. I believe we will remove this entirely regardless of the approach we choose.

        Correct.

        b. keepInSyncTimeMs - This tracks the amount of time between fetch requests. I think we can remove this as well.

        Hmm, depends. There are 2 things we need to check - dead replicas and slow replicas. The dead replica check is to remove a replica that hasn't sent a fetch request to the leader for some time. Take the example of a replica that is in sync with the leader (lagBegin is -1), there aren't new messages coming in and it stops fetching entirely. We can remove the replica when there are new messages based on the lagBegin logic but really that replica should've been removed long before that, because it stopped fetching and was dead.

        The logic we have above works pretty well for slow replicas, but I think we still need to handle dead replicas for low-volume topics.

        Show
        Neha Narkhede added a comment - If this isn't feasible, then I do think that the heuristic proposed in Neha's comment is a good one.. and I will submit a patch for it. Sounds good. Will help you review it. a. keepInSyncMessages - This tracks replica lag as a function of the number of messages it is trailing behind. I believe we will remove this entirely regardless of the approach we choose. Correct. b. keepInSyncTimeMs - This tracks the amount of time between fetch requests. I think we can remove this as well. Hmm, depends. There are 2 things we need to check - dead replicas and slow replicas. The dead replica check is to remove a replica that hasn't sent a fetch request to the leader for some time. Take the example of a replica that is in sync with the leader (lagBegin is -1), there aren't new messages coming in and it stops fetching entirely. We can remove the replica when there are new messages based on the lagBegin logic but really that replica should've been removed long before that, because it stopped fetching and was dead. The logic we have above works pretty well for slow replicas, but I think we still need to handle dead replicas for low-volume topics.
        Hide
        Jay Kreps added a comment -

        Yes, this was exactly what I was thinking...

        Show
        Jay Kreps added a comment - Yes, this was exactly what I was thinking...
        Hide
        Aditya A Auradkar added a comment -

        Created reviewboard https://reviews.apache.org/r/31967/diff/
        against branch origin/trunk

        Show
        Aditya A Auradkar added a comment - Created reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk
        Hide
        Aditya A Auradkar added a comment -

        Updated reviewboard https://reviews.apache.org/r/31967/diff/
        against branch origin/trunk

        Show
        Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk
        Hide
        Joe Stein added a comment -

        Shouldn't we have a KIP for this? It seems like we are changing/adding public features that will affects folks.

        Show
        Joe Stein added a comment - Shouldn't we have a KIP for this? It seems like we are changing/adding public features that will affects folks.
        Hide
        Jay Kreps added a comment -

        Personally I don't think it really needs a KIP, it subtly changes the meaning of one config, but it actually changes it to mean what everyone thinks it currently means. What do you think? I think this one is less about user expectations or our opinions and more about "does it actually work". Speaking of which...

        Aditya A Auradkar What is the test plan for this? It is trivially easy to reproduce the problems with the old approach. Start a server with default settings and 1-2 replicas and use the perf test to generate a ton of load with itty bitty messages and just watch the replicas drop in and out of sync. We should concoct the most brutal case of this and validate that unless the follower actually falls behind it never failure detects out of the ISR. But we also need to check the reverse condition, that both a soft death and a lag are still detected. You can cause a soft death by setting the zk session timeout to something massive and just using unix signals to pause the process. You can cause lag by just running some commands on one of the followers to eat up all the cpu or I/O while a load test is running until the follower falls behind. Both cases should get caught.

        Anyhow, awesome job getting this done. I think this is one of the biggest stability issues in Kafka right now. The patch lgtm, but it would be good for Jun Rao and Neha Narkhede to take a look.

        Show
        Jay Kreps added a comment - Personally I don't think it really needs a KIP, it subtly changes the meaning of one config, but it actually changes it to mean what everyone thinks it currently means. What do you think? I think this one is less about user expectations or our opinions and more about "does it actually work". Speaking of which... Aditya A Auradkar What is the test plan for this? It is trivially easy to reproduce the problems with the old approach. Start a server with default settings and 1-2 replicas and use the perf test to generate a ton of load with itty bitty messages and just watch the replicas drop in and out of sync. We should concoct the most brutal case of this and validate that unless the follower actually falls behind it never failure detects out of the ISR. But we also need to check the reverse condition, that both a soft death and a lag are still detected. You can cause a soft death by setting the zk session timeout to something massive and just using unix signals to pause the process. You can cause lag by just running some commands on one of the followers to eat up all the cpu or I/O while a load test is running until the follower falls behind. Both cases should get caught. Anyhow, awesome job getting this done. I think this is one of the biggest stability issues in Kafka right now. The patch lgtm, but it would be good for Jun Rao and Neha Narkhede to take a look.
        Hide
        Joe Stein added a comment -

        if folks are going to read the KIP to understand for a release what features went in and why and this isn't there I think that would be odd. How will they know what to-do with the setting? .... granted that is what JIRA is for too if folks read the JIRA for a release but that isn't how the KIP have been discussed and working so far regardless about what I think here for this.

        Show
        Joe Stein added a comment - if folks are going to read the KIP to understand for a release what features went in and why and this isn't there I think that would be odd. How will they know what to-do with the setting? .... granted that is what JIRA is for too if folks read the JIRA for a release but that isn't how the KIP have been discussed and working so far regardless about what I think here for this.
        Hide
        Joe Stein added a comment -

        we could also mark the JIRA as a bug instead of improvment

        Show
        Joe Stein added a comment - we could also mark the JIRA as a bug instead of improvment
        Hide
        Jay Kreps added a comment -

        Well iiuc the wonderfulness of this feature is that it actually doesn't add any new configs, it removes an old one that was impossible to set correctly and slightly modifies the meaning of an existing one to do what it sounds like it does. So I do think that for 99.5% of the world this will be like, wow, Kafka replication is much more robust.

        I do think it is definitely a bug fix not a feature. But hey, I love me some KIPs, so I can't object to a nice write-up if you think it would be good to have.

        Show
        Jay Kreps added a comment - Well iiuc the wonderfulness of this feature is that it actually doesn't add any new configs, it removes an old one that was impossible to set correctly and slightly modifies the meaning of an existing one to do what it sounds like it does. So I do think that for 99.5% of the world this will be like, wow, Kafka replication is much more robust. I do think it is definitely a bug fix not a feature. But hey, I love me some KIPs, so I can't object to a nice write-up if you think it would be good to have.
        Hide
        Aditya Auradkar added a comment -

        I'll write a short KIP on this and circulate it tomorrow. In the meantime, I guess Jun/Neha can also review it since the actual fix has been discussed in enough detail on this jira.

        Show
        Aditya Auradkar added a comment - I'll write a short KIP on this and circulate it tomorrow. In the meantime, I guess Jun/Neha can also review it since the actual fix has been discussed in enough detail on this jira.
        Hide
        Aditya A Auradkar added a comment -

        Updated reviewboard https://reviews.apache.org/r/31967/diff/
        against branch origin/trunk

        Show
        Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk
        Hide
        Aditya Auradkar added a comment -

        Jay KrepsI'm going to test it like you suggested. In the meantime, I've published KIP-16 for everyone to read.

        Show
        Aditya Auradkar added a comment - Jay Kreps I'm going to test it like you suggested. In the meantime, I've published KIP-16 for everyone to read.
        Hide
        Aditya Auradkar added a comment -

        I ran a bunch of tests on my patch for KAFKA-1546. I started a cluster and used the PerformanceTest class to throw a ton of load.

        1. Verify that the process stays in ISR for a large volume of messages. Generated lots of load with small messages and very high throughout. I noticed that the replica did not fall out of ISR. The previous solution would have fluctuated in and out of ISR.
        bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000000 100 -1 acks=1 bootstrap.servers=localhost:9092 buffer.memory=67108864 batch.size=8196

        2. Stuck follower - Generated some load and paused the follower process using SIGSTOP. I raised the zk session timeout so the process stayed registered with ZK but did not send a fetch request for 'n' seconds. This threw it out of ISR as expected.

        3. Lagging follower - I was able to to do this by reducing the max fetch size on the follower instance. This made it impossible for the follower to catch up causing it to be removed from ISR.

        4. I also simulated the case where the follower was down for a long time and the leader had accumulated a significant amount of data. On starting the follower, it stayed out of ISR until it caught up to the log end offset.

        Show
        Aditya Auradkar added a comment - I ran a bunch of tests on my patch for KAFKA-1546 . I started a cluster and used the PerformanceTest class to throw a ton of load. 1. Verify that the process stays in ISR for a large volume of messages. Generated lots of load with small messages and very high throughout. I noticed that the replica did not fall out of ISR. The previous solution would have fluctuated in and out of ISR. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000000 100 -1 acks=1 bootstrap.servers=localhost:9092 buffer.memory=67108864 batch.size=8196 2. Stuck follower - Generated some load and paused the follower process using SIGSTOP. I raised the zk session timeout so the process stayed registered with ZK but did not send a fetch request for 'n' seconds. This threw it out of ISR as expected. 3. Lagging follower - I was able to to do this by reducing the max fetch size on the follower instance. This made it impossible for the follower to catch up causing it to be removed from ISR. 4. I also simulated the case where the follower was down for a long time and the leader had accumulated a significant amount of data. On starting the follower, it stayed out of ISR until it caught up to the log end offset.
        Hide
        Neha Narkhede added a comment -

        Aditya Auradkar Thanks for the patch. Overall, the changes look correct. I left a few review comments. And thanks for sharing the test results. Look forward to the updated patch.

        Show
        Neha Narkhede added a comment - Aditya Auradkar Thanks for the patch. Overall, the changes look correct. I left a few review comments. And thanks for sharing the test results. Look forward to the updated patch.
        Hide
        Aditya A Auradkar added a comment -

        Updated reviewboard https://reviews.apache.org/r/31967/diff/
        against branch origin/trunk

        Show
        Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk
        Hide
        Aditya A Auradkar added a comment -

        Updated reviewboard https://reviews.apache.org/r/31967/diff/
        against branch origin/trunk

        Show
        Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk
        Hide
        Aditya Auradkar added a comment -

        Neha NarkhedeJun Rao I've incorporated your comments on the patch. Can you take a look?

        Show
        Aditya Auradkar added a comment - Neha Narkhede Jun Rao I've incorporated your comments on the patch. Can you take a look?
        Hide
        Aditya A Auradkar added a comment -

        Updated reviewboard https://reviews.apache.org/r/31967/diff/
        against branch origin/trunk

        Show
        Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk
        Hide
        Aditya A Auradkar added a comment -

        Updated reviewboard https://reviews.apache.org/r/31967/diff/
        against branch origin/trunk

        Show
        Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk
        Hide
        Joel Koshy added a comment -

        Patch looks good. This is another ticket for which a doc (html) update will be required.

        Show
        Joel Koshy added a comment - Patch looks good. This is another ticket for which a doc (html) update will be required.
        Hide
        Aditya A Auradkar added a comment -

        Updated reviewboard https://reviews.apache.org/r/31967/diff/
        against branch origin/trunk

        Show
        Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk
        Hide
        Jun Rao added a comment -

        Since the broker has moved to ConfigDef, we can generate all config docs automatically. So, we probably don't need to update individual broker config changes manually.

        Show
        Jun Rao added a comment - Since the broker has moved to ConfigDef, we can generate all config docs automatically. So, we probably don't need to update individual broker config changes manually.
        Hide
        Joel Koshy added a comment -

        Actually, I'm referring to the documentation html - there are direct references to the two existing replica lag configs. Those need to be amended/removed.

        Show
        Joel Koshy added a comment - Actually, I'm referring to the documentation html - there are direct references to the two existing replica lag configs. Those need to be amended/removed.
        Hide
        Aditya Auradkar added a comment -

        Joel Koshy Where can I find the html to change?

        Show
        Aditya Auradkar added a comment - Joel Koshy Where can I find the html to change?
        Show
        Joel Koshy added a comment - https://svn.apache.org/repos/asf/kafka/site/083/design.html https://svn.apache.org/repos/asf/kafka/site/083/ops.html
        Hide
        Aditya Auradkar added a comment -

        Jun Rao I found references to this config in a few places. Do I have to make changes to "configuration.html" for 0.8.3?

        $ grep -r "replica.lag.max.messages" *
        configuration.html: <td>replica.lag.max.messages</td>
        design.html:We refer to nodes satisfying these two conditions as being "in sync" to avoid the vagueness of "alive" or "failed". The leader keeps track of the set of "in sync" nodes. If a follower dies, gets stuck, or falls behind, the leader will remove it from the list of in sync replicas. The definition of, how far behind is too far, is controlled by the replica.lag.max.messages configuration and the definition of a stuck replica is controlled by the replica.lag.time.max.ms configuration.
        ops.html:replica.lag.max.messages=4000
        ops.html: <td>&lt replica.lag.max.messages</td>
        ops.html: <td>&lt replica.lag.max.messages</td>

        Show
        Aditya Auradkar added a comment - Jun Rao I found references to this config in a few places. Do I have to make changes to "configuration.html" for 0.8.3? $ grep -r "replica.lag.max.messages" * configuration.html: <td>replica.lag.max.messages</td> design.html:We refer to nodes satisfying these two conditions as being "in sync" to avoid the vagueness of "alive" or "failed". The leader keeps track of the set of "in sync" nodes. If a follower dies, gets stuck, or falls behind, the leader will remove it from the list of in sync replicas. The definition of, how far behind is too far, is controlled by the replica.lag.max.messages configuration and the definition of a stuck replica is controlled by the replica.lag.time.max.ms configuration. ops.html:replica.lag.max.messages=4000 ops.html: <td>&lt replica.lag.max.messages</td> ops.html: <td>&lt replica.lag.max.messages</td>
        Hide
        Aditya Auradkar added a comment -

        Added a patch for the HTML changes.

        Show
        Aditya Auradkar added a comment - Added a patch for the HTML changes.
        Hide
        Jun Rao added a comment -

        For the doc change, do we need to make the following changes? We didn't remove the MaxLag jmx, right?

        • <td>Max lag in messages btw follower and leader replicas</td>
        • <td>kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica</td>
        • <td>&lt replica.lag.max.messages</td>
        • </tr>
        • <tr>
        • <td>Lag in messages per follower replica</td>
        • <td>kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]),topic=([-.\w]),partition=([0-9]+)</td>
        • <td>&lt replica.lag.max.messages</td>
        • </tr>
        • <tr>
          <td>Requests waiting in the producer purgatory</td>
        Show
        Jun Rao added a comment - For the doc change, do we need to make the following changes? We didn't remove the MaxLag jmx, right? <td>Max lag in messages btw follower and leader replicas</td> <td>kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica</td> <td>&lt replica.lag.max.messages</td> </tr> <tr> <td>Lag in messages per follower replica</td> <td>kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=( [-.\w] ),topic=( [-.\w] ),partition=( [0-9] +)</td> <td>&lt replica.lag.max.messages</td> </tr> <tr> <td>Requests waiting in the producer purgatory</td>
        Hide
        Aditya Auradkar added a comment -

        Good point Jun. I've added those metrics back and have changed the "Normal Value" section to remove references to "replica.lag.max.messages".

        Show
        Aditya Auradkar added a comment - Good point Jun. I've added those metrics back and have changed the "Normal Value" section to remove references to "replica.lag.max.messages".
        Hide
        Joel Koshy added a comment -

        +1 on the latest doc updates. I can check in the doc and code patch later today.

        Show
        Joel Koshy added a comment - +1 on the latest doc updates. I can check in the doc and code patch later today.
        Hide
        Joel Koshy added a comment -

        Thanks for the patches.

        Committed the rb patch to trunk.
        Committed the doc update to svn as well.

        Show
        Joel Koshy added a comment - Thanks for the patches. Committed the rb patch to trunk. Committed the doc update to svn as well.

          People

          • Assignee:
            Aditya Auradkar
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            12 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development