Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.8.0, 0.8.1, 0.8.1.1
    • 0.9.0.0
    • replication

    Description

      Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster.
      For the low-volume topic it will take a very long time to detect a lagging
      replica, and for the high-volume topic it will have false-positives.
      One approach to making this easier would be to have the configuration
      be something like replica.lag.max.ms and translate this into a number
      of messages dynamically based on the throughput of the partition.

      Attachments

        1. documentation.diff
          3 kB
          Aditya Auradkar
        2. documentation.diff
          3 kB
          Aditya Auradkar
        3. KAFKA-1546_2015-03-11_18:48:09.patch
          36 kB
          Aditya A Auradkar
        4. KAFKA-1546_2015-03-12_13:42:01.patch
          37 kB
          Aditya A Auradkar
        5. KAFKA-1546_2015-03-16_11:31:39.patch
          54 kB
          Aditya A Auradkar
        6. KAFKA-1546_2015-03-17_14:46:10.patch
          63 kB
          Aditya A Auradkar
        7. KAFKA-1546_2015-03-25_13:27:40.patch
          63 kB
          Aditya A Auradkar
        8. KAFKA-1546_2015-03-26_17:44:08.patch
          79 kB
          Aditya A Auradkar
        9. KAFKA-1546_2015-03-27_11:57:56.patch
          81 kB
          Aditya A Auradkar
        10. KAFKA-1546.patch
          34 kB
          Aditya A Auradkar

        Activity

          jkreps Jay Kreps added a comment -

          I think this is actually a really important thing to get right to make replication reliable. There are some subtleties. It would be good to work out the basics of how this could work on this JIRA.

          For example the throughput on a partition might be 1 msg/sec. But that is because only 1 msg/sec is being written by the producer. However if someone writes a batch of 1000 messages, that doesn't mean we are necessarily 1000 seconds behind.

          We already track the time since the last fetch request. So if the fetcher stops entirely for too long it will be caught.

          I think the other condition we want to be able to catch is one where the fetcher is still fetching but it is behind and likely won't catch up. One way to make "caught-up" concrete is to say that the last fetch went to the end of the log. We potentially reduce this to one config and just have replica.lag.time.ms which would both be the maximum time since a fetch or the maximum amount of time without catching up to the leader. The implementation would be that every time a fetch didn't go to the logEndOffset we would set the lag clock and it would only reset when a fetch request finally went all the way to the logEndOffset.

          jkreps Jay Kreps added a comment - I think this is actually a really important thing to get right to make replication reliable. There are some subtleties. It would be good to work out the basics of how this could work on this JIRA. For example the throughput on a partition might be 1 msg/sec. But that is because only 1 msg/sec is being written by the producer. However if someone writes a batch of 1000 messages, that doesn't mean we are necessarily 1000 seconds behind. We already track the time since the last fetch request. So if the fetcher stops entirely for too long it will be caught. I think the other condition we want to be able to catch is one where the fetcher is still fetching but it is behind and likely won't catch up. One way to make "caught-up" concrete is to say that the last fetch went to the end of the log. We potentially reduce this to one config and just have replica.lag.time.ms which would both be the maximum time since a fetch or the maximum amount of time without catching up to the leader. The implementation would be that every time a fetch didn't go to the logEndOffset we would set the lag clock and it would only reset when a fetch request finally went all the way to the logEndOffset.
          junrao Jun Rao added a comment -

          Jay, I think what you proposed is nice and simple. I think it works. One subtlety is dealing with max.wait.ms in the follower fetch request. Imagine that a follower has caught up and its fetch request is sitting in the purgatory. The last caught up time won't be updated for max.wait.ms if no new messages come in. When a message does come in, if max.wait.ms is larger than replica.lag.time.ms, the follower is now considered out of sync. Perhaps we should use the timestamp of when the first message is appended to the leader after the last caught up time. If the amount of time since then is more than replica.lag.time.ms, the replica is considered out of sync.

          junrao Jun Rao added a comment - Jay, I think what you proposed is nice and simple. I think it works. One subtlety is dealing with max.wait.ms in the follower fetch request. Imagine that a follower has caught up and its fetch request is sitting in the purgatory. The last caught up time won't be updated for max.wait.ms if no new messages come in. When a message does come in, if max.wait.ms is larger than replica.lag.time.ms, the follower is now considered out of sync. Perhaps we should use the timestamp of when the first message is appended to the leader after the last caught up time. If the amount of time since then is more than replica.lag.time.ms, the replica is considered out of sync.
          jkreps Jay Kreps added a comment - - edited

          I think I was being a little vague. What I was trying to say is this. When each fetch is serviced we check

            if(!fetchedData.readToEndOfLog)
               this.lagBegin = System.currentTimeMillis()
            else
               this.lagBegin = -1
          

          Then the liveness criteria is

           partitionLagging = this.lagBegin > 0 && System.currentTimeMillis() - this.lagBegin > REPLICA_LAG_TIME_MS
          
          jkreps Jay Kreps added a comment - - edited I think I was being a little vague. What I was trying to say is this. When each fetch is serviced we check if (!fetchedData.readToEndOfLog) this .lagBegin = System .currentTimeMillis() else this .lagBegin = -1 Then the liveness criteria is partitionLagging = this .lagBegin > 0 && System .currentTimeMillis() - this .lagBegin > REPLICA_LAG_TIME_MS
          junrao Jun Rao added a comment -

          Yes, that works. So we will have to return enough info in Log.read to derive fetchedData.readToEndOfLog.

          junrao Jun Rao added a comment - Yes, that works. So we will have to return enough info in Log.read to derive fetchedData.readToEndOfLog.
          sriramsub Sriram added a comment -

          the lagBegin does not persist across shutdowns or leader transitions. A safe assumption to make is that all fetchers are lagging when a node becomes a leader till we get the first fetch. This would ensure we don't assume there is no lag when a fetcher is down and a new leader is elected.

          sriramsub Sriram added a comment - the lagBegin does not persist across shutdowns or leader transitions. A safe assumption to make is that all fetchers are lagging when a node becomes a leader till we get the first fetch. This would ensure we don't assume there is no lag when a fetcher is down and a new leader is elected.

          nmarasoi are you actively working on this? I was planning on picking it up.

          aauradkar Aditya Auradkar added a comment - nmarasoi are you actively working on this? I was planning on picking it up.

          nmarasoi I'm going to assign this to myself since I haven't heard back.

          aauradkar Aditya Auradkar added a comment - nmarasoi I'm going to assign this to myself since I haven't heard back.

          I do have a concern about the heuristic. jkreps Using your example:

          "if(!fetchedData.readToEndOfLog)
          this.lagBegin = System.currentTimeMillis()
          else
          this.lagBegin = -1

          Then the liveness criteria is
          partitionLagging = this.lagBegin > 0 && System.currentTimeMillis() - this.lagBegin > REPLICA_LAG_TIME_MS"

          The time counter starts when the read doesn't go the end of log and only stops when it does reach the end. In this case, the lag measures the absolute duration of time for which this replica is lagging but not how far behind it is in terms of applying commits. For example a replica could be catching up quickly but the "replica.lag.max.ms" counter would still increase until it fully catches up and then it will abruptly drop to zero.

          aauradkar Aditya Auradkar added a comment - I do have a concern about the heuristic. jkreps Using your example: "if(!fetchedData.readToEndOfLog) this.lagBegin = System.currentTimeMillis() else this.lagBegin = -1 Then the liveness criteria is partitionLagging = this.lagBegin > 0 && System.currentTimeMillis() - this.lagBegin > REPLICA_LAG_TIME_MS" The time counter starts when the read doesn't go the end of log and only stops when it does reach the end. In this case, the lag measures the absolute duration of time for which this replica is lagging but not how far behind it is in terms of applying commits. For example a replica could be catching up quickly but the "replica.lag.max.ms" counter would still increase until it fully catches up and then it will abruptly drop to zero.
          jkreps Jay Kreps added a comment -

          Yeah that is true. I think we are in agreement that we want to express this in terms of time not # messages.

          The criteria I was proposing is "not caught up for N ms" where the definition of not caught up is "reads to the end of the log".

          I think what you are proposing is "will take more than N ms to catch up." Originally I had thought a little about this. However this criteria is a lot harder to calculate. In order to predict the time to catch up you need to estimate the rate at which messages will be read in the future (e.g. if I am 1000 messages behind and reading at 500 msg/sec then it will take 2 seconds). I was concerned that any estimate would be really fragile since the whole point of a failure is that it changes this kind of rate in some way (because a replica is slow, or messages got bigger, or whatever) so predictions off past rates may be wrong once the (possibly) soft failure happens.

          I think the motivation for the criteria I was proposing was that any caught up reader should always be at the end of the log (that is the definition of caught up) and if you go for a period of time without being at the end then likely you won't get to the end soon. You could imagine some situation in which somehow the follower was able to exactly keep up but was always one message behind the end of the log in which case we would falsely failure detect the follower. However I think this would be unlikely and failure detecting is actually probably okay since you are exactly on the verge of overwhelmed (one byte per second more throughput and you will be dead).

          Let me know if you think that makes sense, I could definitely be convinced it would be better a different way.

          jkreps Jay Kreps added a comment - Yeah that is true. I think we are in agreement that we want to express this in terms of time not # messages. The criteria I was proposing is "not caught up for N ms" where the definition of not caught up is "reads to the end of the log". I think what you are proposing is "will take more than N ms to catch up." Originally I had thought a little about this. However this criteria is a lot harder to calculate. In order to predict the time to catch up you need to estimate the rate at which messages will be read in the future (e.g. if I am 1000 messages behind and reading at 500 msg/sec then it will take 2 seconds). I was concerned that any estimate would be really fragile since the whole point of a failure is that it changes this kind of rate in some way (because a replica is slow, or messages got bigger, or whatever) so predictions off past rates may be wrong once the (possibly) soft failure happens. I think the motivation for the criteria I was proposing was that any caught up reader should always be at the end of the log (that is the definition of caught up) and if you go for a period of time without being at the end then likely you won't get to the end soon. You could imagine some situation in which somehow the follower was able to exactly keep up but was always one message behind the end of the log in which case we would falsely failure detect the follower. However I think this would be unlikely and failure detecting is actually probably okay since you are exactly on the verge of overwhelmed (one byte per second more throughput and you will be dead). Let me know if you think that makes sense, I could definitely be convinced it would be better a different way.
          nehanarkhede Neha Narkhede added a comment -

          For example a replica could be catching up quickly but the "replica.lag.max.ms" counter would still increase until it fully catches up and then it will abruptly drop to zero.

          What we want to measure is not exactly how slow it is but express lag in terms of maximum time spent not catching up with the leader. The check that Jay mentions can be improved a little. Basically, if the replica is at the log end offset, then we don't want to check for lagBegin at all. The first time a replica starts lagging, you set the lagBegin to current time. From there on, you only reset it to -1 if it reaches log end offset.

          This will remove a replica that keeps fetching but is unable to catch up with the leader for "replica.lag.max.ms".

          So the check is more like-

          if(!fetchedData.readToEndOfLog) {
             if(lagBegin == -1) { 
               this.lagBegin = System.currentTimeMillis() 
             }
          } else {
            this.lagBegin = -1 
          }
          

          Then the liveness criteria is
          partitionLagging = this.lagBegin > 0 && System.currentTimeMillis() - this.lagBegin > REPLICA_LAG_TIME_MS

          In order to do this, LogReadResult might have to return the log end offset as well.

          nehanarkhede Neha Narkhede added a comment - For example a replica could be catching up quickly but the "replica.lag.max.ms" counter would still increase until it fully catches up and then it will abruptly drop to zero. What we want to measure is not exactly how slow it is but express lag in terms of maximum time spent not catching up with the leader. The check that Jay mentions can be improved a little. Basically, if the replica is at the log end offset, then we don't want to check for lagBegin at all. The first time a replica starts lagging, you set the lagBegin to current time. From there on, you only reset it to -1 if it reaches log end offset. This will remove a replica that keeps fetching but is unable to catch up with the leader for "replica.lag.max.ms". So the check is more like- if (!fetchedData.readToEndOfLog) { if (lagBegin == -1) { this .lagBegin = System .currentTimeMillis() } } else { this .lagBegin = -1 } Then the liveness criteria is partitionLagging = this.lagBegin > 0 && System.currentTimeMillis() - this.lagBegin > REPLICA_LAG_TIME_MS In order to do this, LogReadResult might have to return the log end offset as well.
          becket_qin Jiangjie Qin added a comment -

          I'm not sure if my concern is valid. If we have many producers producing messages to a partition, it's possible that after we fulfill a fetch request from replica fetcher but before we check if the log is caught up to log end or not, some new messages are appended. In this case, we will never be able to really caught up to log end.
          Maybe I understood it wrong, but I think what nehanarkhede proposed before seems work, which is
          1. Have a time criteria, a fetch request must be received from the follower in 10 secs.
          2. Instead of a fixed number of max message lag, say 4000, use the number of (message-in-rate * maxLagMs) as the max message lag threshold.
          This way we can handle both busy topics and low-volume topics.

          becket_qin Jiangjie Qin added a comment - I'm not sure if my concern is valid. If we have many producers producing messages to a partition, it's possible that after we fulfill a fetch request from replica fetcher but before we check if the log is caught up to log end or not, some new messages are appended. In this case, we will never be able to really caught up to log end. Maybe I understood it wrong, but I think what nehanarkhede proposed before seems work, which is 1. Have a time criteria, a fetch request must be received from the follower in 10 secs. 2. Instead of a fixed number of max message lag, say 4000, use the number of (message-in-rate * maxLagMs) as the max message lag threshold. This way we can handle both busy topics and low-volume topics.
          aauradkar Aditya Auradkar added a comment - - edited

          I agree we should model this in terms of time and not in terms of messages. While I think it is a bit more natural to model replication lag in terms of "will take more than N ms to catch up.", I also agree it is tricky to implement correctly.

          One possible way to model it is to associate an offset with a commit timestamp at the source. For example, assume that a message with offset O is produced on the leader for partition X at timestamp T1. If the time now is T2 and a replica's log end offset is O (i.e. it is has consumed till O), then the lag can be (T2-T1). Is there any easy way to obtain the source timestamp given an offset?

          If this isn't feasible, then I do think that the heuristic proposed in Neha's comment is a good one.. and I will submit a patch for it.

          Also, there are currently 2 checks for replica lag (in ISR).
          a. keepInSyncMessages - This tracks replica lag as a function of the number of messages it is trailing behind. I believe we will remove this entirely regardless of the approach we choose.
          b. keepInSyncTimeMs - This tracks the amount of time between fetch requests. I think we can remove this as well.

          aauradkar Aditya Auradkar added a comment - - edited I agree we should model this in terms of time and not in terms of messages. While I think it is a bit more natural to model replication lag in terms of "will take more than N ms to catch up.", I also agree it is tricky to implement correctly. One possible way to model it is to associate an offset with a commit timestamp at the source. For example, assume that a message with offset O is produced on the leader for partition X at timestamp T1. If the time now is T2 and a replica's log end offset is O (i.e. it is has consumed till O), then the lag can be (T2-T1). Is there any easy way to obtain the source timestamp given an offset? If this isn't feasible, then I do think that the heuristic proposed in Neha's comment is a good one.. and I will submit a patch for it. Also, there are currently 2 checks for replica lag (in ISR). a. keepInSyncMessages - This tracks replica lag as a function of the number of messages it is trailing behind. I believe we will remove this entirely regardless of the approach we choose. b. keepInSyncTimeMs - This tracks the amount of time between fetch requests. I think we can remove this as well.

          Re: your concern - yes that does seem to be valid. For it to work the LogReadResult should probably return the log-end-offset just before the read starts. i.e., a sort of before image.

          jjkoshy Joel Jacob Koshy added a comment - Re: your concern - yes that does seem to be valid. For it to work the LogReadResult should probably return the log-end-offset just before the read starts. i.e., a sort of before image.

          No, we don't have any timestamp currently associated with messages/offsets.

          jjkoshy Joel Jacob Koshy added a comment - No, we don't have any timestamp currently associated with messages/offsets.
          nehanarkhede Neha Narkhede added a comment -

          If this isn't feasible, then I do think that the heuristic proposed in Neha's comment is a good one.. and I will submit a patch for it.

          Sounds good. Will help you review it.

          a. keepInSyncMessages - This tracks replica lag as a function of the number of messages it is trailing behind. I believe we will remove this entirely regardless of the approach we choose.

          Correct.

          b. keepInSyncTimeMs - This tracks the amount of time between fetch requests. I think we can remove this as well.

          Hmm, depends. There are 2 things we need to check - dead replicas and slow replicas. The dead replica check is to remove a replica that hasn't sent a fetch request to the leader for some time. Take the example of a replica that is in sync with the leader (lagBegin is -1), there aren't new messages coming in and it stops fetching entirely. We can remove the replica when there are new messages based on the lagBegin logic but really that replica should've been removed long before that, because it stopped fetching and was dead.

          The logic we have above works pretty well for slow replicas, but I think we still need to handle dead replicas for low-volume topics.

          nehanarkhede Neha Narkhede added a comment - If this isn't feasible, then I do think that the heuristic proposed in Neha's comment is a good one.. and I will submit a patch for it. Sounds good. Will help you review it. a. keepInSyncMessages - This tracks replica lag as a function of the number of messages it is trailing behind. I believe we will remove this entirely regardless of the approach we choose. Correct. b. keepInSyncTimeMs - This tracks the amount of time between fetch requests. I think we can remove this as well. Hmm, depends. There are 2 things we need to check - dead replicas and slow replicas. The dead replica check is to remove a replica that hasn't sent a fetch request to the leader for some time. Take the example of a replica that is in sync with the leader (lagBegin is -1), there aren't new messages coming in and it stops fetching entirely. We can remove the replica when there are new messages based on the lagBegin logic but really that replica should've been removed long before that, because it stopped fetching and was dead. The logic we have above works pretty well for slow replicas, but I think we still need to handle dead replicas for low-volume topics.
          jkreps Jay Kreps added a comment -

          Yes, this was exactly what I was thinking...

          jkreps Jay Kreps added a comment - Yes, this was exactly what I was thinking...

          Created reviewboard https://reviews.apache.org/r/31967/diff/
          against branch origin/trunk

          auradkar Aditya A Auradkar added a comment - Created reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk

          Updated reviewboard https://reviews.apache.org/r/31967/diff/
          against branch origin/trunk

          auradkar Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk
          joestein Joe Stein added a comment -

          Shouldn't we have a KIP for this? It seems like we are changing/adding public features that will affects folks.

          joestein Joe Stein added a comment - Shouldn't we have a KIP for this? It seems like we are changing/adding public features that will affects folks.
          jkreps Jay Kreps added a comment -

          Personally I don't think it really needs a KIP, it subtly changes the meaning of one config, but it actually changes it to mean what everyone thinks it currently means. What do you think? I think this one is less about user expectations or our opinions and more about "does it actually work". Speaking of which...

          auradkar What is the test plan for this? It is trivially easy to reproduce the problems with the old approach. Start a server with default settings and 1-2 replicas and use the perf test to generate a ton of load with itty bitty messages and just watch the replicas drop in and out of sync. We should concoct the most brutal case of this and validate that unless the follower actually falls behind it never failure detects out of the ISR. But we also need to check the reverse condition, that both a soft death and a lag are still detected. You can cause a soft death by setting the zk session timeout to something massive and just using unix signals to pause the process. You can cause lag by just running some commands on one of the followers to eat up all the cpu or I/O while a load test is running until the follower falls behind. Both cases should get caught.

          Anyhow, awesome job getting this done. I think this is one of the biggest stability issues in Kafka right now. The patch lgtm, but it would be good for junrao and nehanarkhede to take a look.

          jkreps Jay Kreps added a comment - Personally I don't think it really needs a KIP, it subtly changes the meaning of one config, but it actually changes it to mean what everyone thinks it currently means. What do you think? I think this one is less about user expectations or our opinions and more about "does it actually work". Speaking of which... auradkar What is the test plan for this? It is trivially easy to reproduce the problems with the old approach. Start a server with default settings and 1-2 replicas and use the perf test to generate a ton of load with itty bitty messages and just watch the replicas drop in and out of sync. We should concoct the most brutal case of this and validate that unless the follower actually falls behind it never failure detects out of the ISR. But we also need to check the reverse condition, that both a soft death and a lag are still detected. You can cause a soft death by setting the zk session timeout to something massive and just using unix signals to pause the process. You can cause lag by just running some commands on one of the followers to eat up all the cpu or I/O while a load test is running until the follower falls behind. Both cases should get caught. Anyhow, awesome job getting this done. I think this is one of the biggest stability issues in Kafka right now. The patch lgtm, but it would be good for junrao and nehanarkhede to take a look.
          joestein Joe Stein added a comment -

          if folks are going to read the KIP to understand for a release what features went in and why and this isn't there I think that would be odd. How will they know what to-do with the setting? .... granted that is what JIRA is for too if folks read the JIRA for a release but that isn't how the KIP have been discussed and working so far regardless about what I think here for this.

          joestein Joe Stein added a comment - if folks are going to read the KIP to understand for a release what features went in and why and this isn't there I think that would be odd. How will they know what to-do with the setting? .... granted that is what JIRA is for too if folks read the JIRA for a release but that isn't how the KIP have been discussed and working so far regardless about what I think here for this.
          joestein Joe Stein added a comment -

          we could also mark the JIRA as a bug instead of improvment

          joestein Joe Stein added a comment - we could also mark the JIRA as a bug instead of improvment
          jkreps Jay Kreps added a comment -

          Well iiuc the wonderfulness of this feature is that it actually doesn't add any new configs, it removes an old one that was impossible to set correctly and slightly modifies the meaning of an existing one to do what it sounds like it does. So I do think that for 99.5% of the world this will be like, wow, Kafka replication is much more robust.

          I do think it is definitely a bug fix not a feature. But hey, I love me some KIPs, so I can't object to a nice write-up if you think it would be good to have.

          jkreps Jay Kreps added a comment - Well iiuc the wonderfulness of this feature is that it actually doesn't add any new configs, it removes an old one that was impossible to set correctly and slightly modifies the meaning of an existing one to do what it sounds like it does. So I do think that for 99.5% of the world this will be like, wow, Kafka replication is much more robust. I do think it is definitely a bug fix not a feature. But hey, I love me some KIPs, so I can't object to a nice write-up if you think it would be good to have.

          I'll write a short KIP on this and circulate it tomorrow. In the meantime, I guess Jun/Neha can also review it since the actual fix has been discussed in enough detail on this jira.

          aauradkar Aditya Auradkar added a comment - I'll write a short KIP on this and circulate it tomorrow. In the meantime, I guess Jun/Neha can also review it since the actual fix has been discussed in enough detail on this jira.

          Updated reviewboard https://reviews.apache.org/r/31967/diff/
          against branch origin/trunk

          auradkar Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk

          jkrepsI'm going to test it like you suggested. In the meantime, I've published KIP-16 for everyone to read.

          aauradkar Aditya Auradkar added a comment - jkreps I'm going to test it like you suggested. In the meantime, I've published KIP-16 for everyone to read.

          I ran a bunch of tests on my patch for KAFKA-1546. I started a cluster and used the PerformanceTest class to throw a ton of load.

          1. Verify that the process stays in ISR for a large volume of messages. Generated lots of load with small messages and very high throughout. I noticed that the replica did not fall out of ISR. The previous solution would have fluctuated in and out of ISR.
          bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000000 100 -1 acks=1 bootstrap.servers=localhost:9092 buffer.memory=67108864 batch.size=8196

          2. Stuck follower - Generated some load and paused the follower process using SIGSTOP. I raised the zk session timeout so the process stayed registered with ZK but did not send a fetch request for 'n' seconds. This threw it out of ISR as expected.

          3. Lagging follower - I was able to to do this by reducing the max fetch size on the follower instance. This made it impossible for the follower to catch up causing it to be removed from ISR.

          4. I also simulated the case where the follower was down for a long time and the leader had accumulated a significant amount of data. On starting the follower, it stayed out of ISR until it caught up to the log end offset.

          aauradkar Aditya Auradkar added a comment - I ran a bunch of tests on my patch for KAFKA-1546 . I started a cluster and used the PerformanceTest class to throw a ton of load. 1. Verify that the process stays in ISR for a large volume of messages. Generated lots of load with small messages and very high throughout. I noticed that the replica did not fall out of ISR. The previous solution would have fluctuated in and out of ISR. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000000 100 -1 acks=1 bootstrap.servers=localhost:9092 buffer.memory=67108864 batch.size=8196 2. Stuck follower - Generated some load and paused the follower process using SIGSTOP. I raised the zk session timeout so the process stayed registered with ZK but did not send a fetch request for 'n' seconds. This threw it out of ISR as expected. 3. Lagging follower - I was able to to do this by reducing the max fetch size on the follower instance. This made it impossible for the follower to catch up causing it to be removed from ISR. 4. I also simulated the case where the follower was down for a long time and the leader had accumulated a significant amount of data. On starting the follower, it stayed out of ISR until it caught up to the log end offset.
          nehanarkhede Neha Narkhede added a comment -

          aauradkar Thanks for the patch. Overall, the changes look correct. I left a few review comments. And thanks for sharing the test results. Look forward to the updated patch.

          nehanarkhede Neha Narkhede added a comment - aauradkar Thanks for the patch. Overall, the changes look correct. I left a few review comments. And thanks for sharing the test results. Look forward to the updated patch.

          Updated reviewboard https://reviews.apache.org/r/31967/diff/
          against branch origin/trunk

          auradkar Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk

          Updated reviewboard https://reviews.apache.org/r/31967/diff/
          against branch origin/trunk

          auradkar Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk

          nehanarkhedejunrao I've incorporated your comments on the patch. Can you take a look?

          aauradkar Aditya Auradkar added a comment - nehanarkhede junrao I've incorporated your comments on the patch. Can you take a look?

          Updated reviewboard https://reviews.apache.org/r/31967/diff/
          against branch origin/trunk

          auradkar Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk

          Updated reviewboard https://reviews.apache.org/r/31967/diff/
          against branch origin/trunk

          auradkar Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk

          Patch looks good. This is another ticket for which a doc (html) update will be required.

          jjkoshy Joel Jacob Koshy added a comment - Patch looks good. This is another ticket for which a doc (html) update will be required.

          Updated reviewboard https://reviews.apache.org/r/31967/diff/
          against branch origin/trunk

          auradkar Aditya A Auradkar added a comment - Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk
          junrao Jun Rao added a comment -

          Since the broker has moved to ConfigDef, we can generate all config docs automatically. So, we probably don't need to update individual broker config changes manually.

          junrao Jun Rao added a comment - Since the broker has moved to ConfigDef, we can generate all config docs automatically. So, we probably don't need to update individual broker config changes manually.

          Actually, I'm referring to the documentation html - there are direct references to the two existing replica lag configs. Those need to be amended/removed.

          jjkoshy Joel Jacob Koshy added a comment - Actually, I'm referring to the documentation html - there are direct references to the two existing replica lag configs. Those need to be amended/removed.

          jjkoshy Where can I find the html to change?

          aauradkar Aditya Auradkar added a comment - jjkoshy Where can I find the html to change?
          jjkoshy Joel Jacob Koshy added a comment - https://svn.apache.org/repos/asf/kafka/site/083/design.html https://svn.apache.org/repos/asf/kafka/site/083/ops.html

          junrao I found references to this config in a few places. Do I have to make changes to "configuration.html" for 0.8.3?

          $ grep -r "replica.lag.max.messages" *
          configuration.html: <td>replica.lag.max.messages</td>
          design.html:We refer to nodes satisfying these two conditions as being "in sync" to avoid the vagueness of "alive" or "failed". The leader keeps track of the set of "in sync" nodes. If a follower dies, gets stuck, or falls behind, the leader will remove it from the list of in sync replicas. The definition of, how far behind is too far, is controlled by the replica.lag.max.messages configuration and the definition of a stuck replica is controlled by the replica.lag.time.max.ms configuration.
          ops.html:replica.lag.max.messages=4000
          ops.html: <td>&lt replica.lag.max.messages</td>
          ops.html: <td>&lt replica.lag.max.messages</td>

          aauradkar Aditya Auradkar added a comment - junrao I found references to this config in a few places. Do I have to make changes to "configuration.html" for 0.8.3? $ grep -r "replica.lag.max.messages" * configuration.html: <td>replica.lag.max.messages</td> design.html:We refer to nodes satisfying these two conditions as being "in sync" to avoid the vagueness of "alive" or "failed". The leader keeps track of the set of "in sync" nodes. If a follower dies, gets stuck, or falls behind, the leader will remove it from the list of in sync replicas. The definition of, how far behind is too far, is controlled by the replica.lag.max.messages configuration and the definition of a stuck replica is controlled by the replica.lag.time.max.ms configuration. ops.html:replica.lag.max.messages=4000 ops.html: <td>&lt replica.lag.max.messages</td> ops.html: <td>&lt replica.lag.max.messages</td>

          Added a patch for the HTML changes.

          aauradkar Aditya Auradkar added a comment - Added a patch for the HTML changes.
          junrao Jun Rao added a comment -

          For the doc change, do we need to make the following changes? We didn't remove the MaxLag jmx, right?

          • <td>Max lag in messages btw follower and leader replicas</td>
          • <td>kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica</td>
          • <td>&lt replica.lag.max.messages</td>
          • </tr>
          • <tr>
          • <td>Lag in messages per follower replica</td>
          • <td>kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]),topic=([-.\w]),partition=([0-9]+)</td>
          • <td>&lt replica.lag.max.messages</td>
          • </tr>
          • <tr>
            <td>Requests waiting in the producer purgatory</td>
          junrao Jun Rao added a comment - For the doc change, do we need to make the following changes? We didn't remove the MaxLag jmx, right? <td>Max lag in messages btw follower and leader replicas</td> <td>kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica</td> <td>&lt replica.lag.max.messages</td> </tr> <tr> <td>Lag in messages per follower replica</td> <td>kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=( [-.\w] ),topic=( [-.\w] ),partition=( [0-9] +)</td> <td>&lt replica.lag.max.messages</td> </tr> <tr> <td>Requests waiting in the producer purgatory</td>

          Good point Jun. I've added those metrics back and have changed the "Normal Value" section to remove references to "replica.lag.max.messages".

          aauradkar Aditya Auradkar added a comment - Good point Jun. I've added those metrics back and have changed the "Normal Value" section to remove references to "replica.lag.max.messages".

          +1 on the latest doc updates. I can check in the doc and code patch later today.

          jjkoshy Joel Jacob Koshy added a comment - +1 on the latest doc updates. I can check in the doc and code patch later today.

          Thanks for the patches.

          Committed the rb patch to trunk.
          Committed the doc update to svn as well.

          jjkoshy Joel Jacob Koshy added a comment - Thanks for the patches. Committed the rb patch to trunk. Committed the doc update to svn as well.

          People

            aauradkar Aditya Auradkar
            nehanarkhede Neha Narkhede
            Votes:
            0 Vote for this issue
            Watchers:
            12 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: