Kafka
  1. Kafka
  2. KAFKA-1028

per topic configuration of preference for consistency over availability

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.2
    • Component/s: None
    • Labels:
      None

      Description

      As discussed with Neha on the ML.

      It should be possible to configure a topic to disallow unclean leader election, thus preventing the situation where committed messages can be discarded once a failed leader comes back online in a situation where it was the only ISR.

      This would open kafka to additional usecases where the possibility of committted messages being discarded is unacceptable.

      1. KAFKA-1028.patch
        28 kB
        Andrew Olson
      2. KAFKA-1028_2014-03-17_09:39:05.patch
        35 kB
        Andrew Olson
      3. KAFKA-1028_2014-03-03_18:48:43.patch
        34 kB
        Andrew Olson
      4. KAFKA-1028_2014-01-30_13:45:30.patch
        30 kB
        Andrew Olson

        Activity

        Hide
        Andrew Olson added a comment -

        Thanks for the info. Hopefully I'll have some time in the next week or two to tackle this, should be a fun project.

        Show
        Andrew Olson added a comment - Thanks for the info. Hopefully I'll have some time in the next week or two to tackle this, should be a fun project.
        Hide
        Jay Kreps added a comment -

        I added KAFKA-1347 for this.

        Show
        Jay Kreps added a comment - I added KAFKA-1347 for this.
        Hide
        Jay Kreps added a comment -

        There is some information on writing system tests for kafka here:
        https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests

        I don't know that jepsen itself is actually needed. I think what Jepsen does to perturb the network is just use iptables and tc (traffic control).

        If you can read lisp you can see it here:
        https://github.com/aphyr/jepsen/blob/master/src/jepsen/control/net.clj

        Show
        Jay Kreps added a comment - There is some information on writing system tests for kafka here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests I don't know that jepsen itself is actually needed. I think what Jepsen does to perturb the network is just use iptables and tc (traffic control). If you can read lisp you can see it here: https://github.com/aphyr/jepsen/blob/master/src/jepsen/control/net.clj
        Hide
        Neha Narkhede added a comment -

        Thanks Andrew Olson, appreciate your contributions!

        Show
        Neha Narkhede added a comment - Thanks Andrew Olson , appreciate your contributions!
        Hide
        Andrew Olson added a comment -

        The tests included with the patch do verify consistency and non-availability, but they just shutdown and restart brokers rather than simulating a temporary network partition. A few months ago I tried to get Jepsen+Kafka up and running, without much success. I agree that a system test would be good to have. I'll take a look at that framework to see how feasible it would be to add some tests for this.

        Show
        Andrew Olson added a comment - The tests included with the patch do verify consistency and non-availability, but they just shutdown and restart brokers rather than simulating a temporary network partition. A few months ago I tried to get Jepsen+Kafka up and running, without much success. I agree that a system test would be good to have. I'll take a look at that framework to see how feasible it would be to add some tests for this.
        Hide
        Jay Kreps added a comment -

        Andrew Olson This should theoretically make Kafka consistent and non-available in the presence of network partitions. I wonder if it actually does in practice? That is, if we run a cluster locally and subject it to network partitions does it retain consistency? We could try this out using Kyle's Jepsen stuff or (perhaps better) by adding coverage in the system test framework so we run such a test regularly. Any chance you'd be up for trying this out? It would make a good blog post...

        Show
        Jay Kreps added a comment - Andrew Olson This should theoretically make Kafka consistent and non-available in the presence of network partitions. I wonder if it actually does in practice? That is, if we run a cluster locally and subject it to network partitions does it retain consistency? We could try this out using Kyle's Jepsen stuff or (perhaps better) by adding coverage in the system test framework so we run such a test regularly. Any chance you'd be up for trying this out? It would make a good blog post...
        Hide
        Andrew Olson added a comment -

        Sounds good, thanks.

        Show
        Andrew Olson added a comment - Sounds good, thanks.
        Hide
        Jay Kreps added a comment -

        Yes, we will do another release in the next few days (0.8.1.1) with bugfixes for 0.8.1.0 and then in a month or so we will do 0.8.2 with everything in trunk.

        Show
        Jay Kreps added a comment - Yes, we will do another release in the next few days (0.8.1.1) with bugfixes for 0.8.1.0 and then in a month or so we will do 0.8.2 with everything in trunk.
        Hide
        Andrew Olson added a comment -

        Jay Kreps/Neha Narkhede Would it be possible for this to be included in the 0.8.2 release?

        Show
        Andrew Olson added a comment - Jay Kreps / Neha Narkhede Would it be possible for this to be included in the 0.8.2 release?
        Hide
        Neha Narkhede added a comment -

        Thanks for the patch! Committed to trunk

        Show
        Neha Narkhede added a comment - Thanks for the patch! Committed to trunk
        Hide
        Scott Clasen added a comment -

        Yes! Thanks Andrew!

        Show
        Scott Clasen added a comment - Yes! Thanks Andrew!
        Hide
        Jay Kreps added a comment -

        No, I'm +1 too. Thanks Andrew!

        Show
        Jay Kreps added a comment - No, I'm +1 too. Thanks Andrew!
        Hide
        Neha Narkhede added a comment -

        Andrew Olson Your latest patch looks good to me. +1
        Jay Kreps Do you have any more comments on the latest patch before I check it in?

        Show
        Neha Narkhede added a comment - Andrew Olson Your latest patch looks good to me. +1 Jay Kreps Do you have any more comments on the latest patch before I check it in?
        Hide
        Andrew Olson added a comment -

        Updated reviewboard https://reviews.apache.org/r/17537/
        against branch origin/trunk

        Show
        Andrew Olson added a comment - Updated reviewboard https://reviews.apache.org/r/17537/ against branch origin/trunk
        Hide
        Jay Kreps added a comment - - edited

        Andrew Olson I made a few very minor comments.

        Neha Narkhede I had a question for you in the rb about the perf impact of adding these zk queries in critical sections.

        Show
        Jay Kreps added a comment - - edited Andrew Olson I made a few very minor comments. Neha Narkhede I had a question for you in the rb about the perf impact of adding these zk queries in critical sections.
        Hide
        Neha Narkhede added a comment -

        Andrew Olson The change you made in your latest patch make a lot of sense. Overall, the patch looks good. Just one concern -

        The patch leaks a per topic config relevant only to replication to the LogConfig module. This is a side effect of the way we've designed per topic configs since we only allow log configs to be overriden per topic. Since Jay Kreps is the owner of the log module, I'd wait for his review as well.

        Meanwhile, I think, besides the above concern and the need to rebase, the patch is ready for checkin. Thanks for your contribution!

        Show
        Neha Narkhede added a comment - Andrew Olson The change you made in your latest patch make a lot of sense. Overall, the patch looks good. Just one concern - The patch leaks a per topic config relevant only to replication to the LogConfig module. This is a side effect of the way we've designed per topic configs since we only allow log configs to be overriden per topic. Since Jay Kreps is the owner of the log module, I'd wait for his review as well. Meanwhile, I think, besides the above concern and the need to rebase, the patch is ready for checkin. Thanks for your contribution!
        Hide
        Andrew Olson added a comment -

        Patch has been updated with the suggested changes.

        Note that I made one additional change which was found to be necessary after rebasing against the latest trunk code (detected by integration test failures). It appears that the recent KAFKA-1235 changes result in a Leader of -1 (none) and empty ISR being saved for all partitions after all brokers in the cluster have been shutdown using a controlled shutdown. This essentially forces an unclean leader election to occur when a broker is subsequently restarted. If we have the configuration set to disallow unclean elections, then we have permanently blocked our ability to restore a leader.

        To address this, I've added logic that prevents the ISR from being updated to an empty set for any topics that do not allow unclean election. The last surviving member of the ISR will be preserved in Zookeeper in the event that a final, lone leader broker goes offline – allowing this previous leader to resume its leader role when it comes back online. The partition leader is still updated to -1, which is not problematic.

        Please review lines 975-986 of KafkaController and let me know if this change makes sense.

        Show
        Andrew Olson added a comment - Patch has been updated with the suggested changes. Note that I made one additional change which was found to be necessary after rebasing against the latest trunk code (detected by integration test failures). It appears that the recent KAFKA-1235 changes result in a Leader of -1 (none) and empty ISR being saved for all partitions after all brokers in the cluster have been shutdown using a controlled shutdown. This essentially forces an unclean leader election to occur when a broker is subsequently restarted. If we have the configuration set to disallow unclean elections, then we have permanently blocked our ability to restore a leader. To address this, I've added logic that prevents the ISR from being updated to an empty set for any topics that do not allow unclean election. The last surviving member of the ISR will be preserved in Zookeeper in the event that a final, lone leader broker goes offline – allowing this previous leader to resume its leader role when it comes back online. The partition leader is still updated to -1, which is not problematic. Please review lines 975-986 of KafkaController and let me know if this change makes sense.
        Hide
        Andrew Olson added a comment -

        Updated reviewboard https://reviews.apache.org/r/17537/
        against branch origin/trunk

        Show
        Andrew Olson added a comment - Updated reviewboard https://reviews.apache.org/r/17537/ against branch origin/trunk
        Hide
        Guozhang Wang added a comment -

        Sorry for the late reply Andrew, add some more comments on the RB.

        Show
        Guozhang Wang added a comment - Sorry for the late reply Andrew, add some more comments on the RB.
        Hide
        Andrew Olson added a comment -

        Neha Narkhede, can you respond to my latest comment on the reviewboard? Thanks!

        Show
        Andrew Olson added a comment - Neha Narkhede , can you respond to my latest comment on the reviewboard? Thanks!
        Hide
        Andrew Olson added a comment -

        Updated reviewboard https://reviews.apache.org/r/17537/
        against branch origin/trunk

        Show
        Andrew Olson added a comment - Updated reviewboard https://reviews.apache.org/r/17537/ against branch origin/trunk
        Hide
        Andrew Olson added a comment -

        Reviewboard has been created. I'll make the suggested changes and then update the review.

        Show
        Andrew Olson added a comment - Reviewboard has been created. I'll make the suggested changes and then update the review.
        Hide
        Andrew Olson added a comment -

        Created reviewboard https://reviews.apache.org/r/17537/
        against branch origin/trunk

        Show
        Andrew Olson added a comment - Created reviewboard https://reviews.apache.org/r/17537/ against branch origin/trunk
        Hide
        Neha Narkhede added a comment -

        It will be easier to do the review if you can submit a reviewboard request (Look at https://cwiki.apache.org/confluence/display/KAFKA/Patch+submission+and+review#Patchsubmissionandreview-Kafkapatchreviewtool)

        For now, here is my initial review -

        1. KafkaConfig.scala

        • Per topic config overrides are handled differently in 0.8.1 through the command line topic tool which writes the configs to zookeeper. So let's get rid of the topic overrides here.
        • The hope is to pick a config that will convey the impact of turning it on. As such, I'm not sure if 'unclean leader election' is well understood. But nevertheless, can we rename it from "replica.unclean.election.enable" to "unclean.leader.election.enable"?
        • The isUncleanElectionEnabled() will need to be removed from here since topic overrides are not handed through the config object.

        2. PartitionLeaderSelector

        • We need a way for the controller to load the per topic config overrides for "unclean.leader.election.enable" from zookeeper. It will use this to mark the partition offline if it notices that the replicas in ISR are not alive anymore.
        Show
        Neha Narkhede added a comment - It will be easier to do the review if you can submit a reviewboard request (Look at https://cwiki.apache.org/confluence/display/KAFKA/Patch+submission+and+review#Patchsubmissionandreview-Kafkapatchreviewtool ) For now, here is my initial review - 1. KafkaConfig.scala Per topic config overrides are handled differently in 0.8.1 through the command line topic tool which writes the configs to zookeeper. So let's get rid of the topic overrides here. The hope is to pick a config that will convey the impact of turning it on. As such, I'm not sure if 'unclean leader election' is well understood. But nevertheless, can we rename it from "replica.unclean.election.enable" to "unclean.leader.election.enable"? The isUncleanElectionEnabled() will need to be removed from here since topic overrides are not handed through the config object. 2. PartitionLeaderSelector We need a way for the controller to load the per topic config overrides for "unclean.leader.election.enable" from zookeeper. It will use this to mark the partition offline if it notices that the replicas in ISR are not alive anymore.
        Hide
        Andrew Olson added a comment -

        Attaching patch for consideration.

        Here's a summary of the new broker configuration properties included in this patch.

        replica.unclean.election.enable
        Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.
        default: true

        replica.unclean.election.topics
        Regular expression specifying override topics for which unclean leader election is enabled; only applicable if replica.unclean.election.enable=false.
        default value: ""

        replica.clean.election.topics
        Regular expression specifying override topics for which unclean leader election is disabled; only applicable if replica.unclean.election.enable=true.
        default value: ""

        Show
        Andrew Olson added a comment - Attaching patch for consideration. Here's a summary of the new broker configuration properties included in this patch. replica.unclean.election.enable Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss. default: true replica.unclean.election.topics Regular expression specifying override topics for which unclean leader election is enabled; only applicable if replica.unclean.election.enable=false. default value: "" replica.clean.election.topics Regular expression specifying override topics for which unclean leader election is disabled; only applicable if replica.unclean.election.enable=true. default value: ""
        Hide
        Andrew Olson added a comment -

        Right, it's a broker config property (default behavior plus topic regex override as you suggested), and request.required.acks can still be set to whatever value is deemed appropriate.

        With this new safeguard in place, when recovering from a situation when all brokers are down, the topic is available again once the last broker to die has been restarted. If that's not possible due to catastrophic hardware failure, the config could be temporarily toggled (e.g. replica.unclean.election.enable=true) to allow a different leader broker to be elected and restore topic availability. Note that in order to preserve the maximum number of messages, the last surviving broker to die should still be started up first if this last resort option of unclean election has to occur.

        Show
        Andrew Olson added a comment - Right, it's a broker config property (default behavior plus topic regex override as you suggested), and request.required.acks can still be set to whatever value is deemed appropriate. With this new safeguard in place, when recovering from a situation when all brokers are down, the topic is available again once the last broker to die has been restarted. If that's not possible due to catastrophic hardware failure, the config could be temporarily toggled (e.g. replica.unclean.election.enable=true) to allow a different leader broker to be elected and restore topic availability. Note that in order to preserve the maximum number of messages, the last surviving broker to die should still be started up first if this last resort option of unclean election has to occur.
        Hide
        Jason Rosenberg added a comment -

        Thanks for the explanation Andrew Olson. So, this will need to be a property for the broker, and not the producer config, I see (so setting request.required.acks on the producer won't help with deciding to do unclean leader election).

        So, my next question, with this option, will we still be able to use whatever setting for request.required.acks from the producer? Seems like the answer is yes (and it doesn't change the case if you set acks to 0, and the topic is currently not available, then you wouldn't actually know if the message got published, which is the case anyway).

        I suppose another trade-off too, is what happens if the A and B not only go down (in your example above), but end up in an unrecoverable state? E.g. they caught on fire, the disks are toast. Will the topic ever be able to recover, in this case?

        Show
        Jason Rosenberg added a comment - Thanks for the explanation Andrew Olson . So, this will need to be a property for the broker, and not the producer config, I see (so setting request.required.acks on the producer won't help with deciding to do unclean leader election). So, my next question, with this option, will we still be able to use whatever setting for request.required.acks from the producer? Seems like the answer is yes (and it doesn't change the case if you set acks to 0, and the topic is currently not available, then you wouldn't actually know if the message got published, which is the case anyway). I suppose another trade-off too, is what happens if the A and B not only go down (in your example above), but end up in an unrecoverable state? E.g. they caught on fire, the disks are toast. Will the topic ever be able to recover, in this case?
        Hide
        Andrew Olson added a comment -

        Yes, it is a more robust solution than simply requiring 2 acks or a majority quorum. If the ISR ever becomes empty, there is a significant risk that the next leader will not be the same as the previous one, likely resulting in data inconsistency.

        Consider the following example sequence of events, given a cluster with 4 brokers, a replication factor of 4, and 2 required acks.

        1. Brokers A and B die simultaneously and are removed from the initially complete ISR set of (A,B,C,D). If A and B were leaders for any partitions, leadership is transferred to C or D. Messages can still be received and consumed by C and D, since ISR = (C,D).
        2. The other two surviving brokers C and D are both then shutdown. The entire cluster is offline, so ISR = ().
        3. Next, brokers A and B are restarted. As the only available brokers, they are the only candidates to be elected leader for all partitions, and ISR now = (A,B). This is called an "unclean" election since A and B were previously evicted from the ISR. All messages replicated to A and B before their sudden deaths are available for consumption, and new messages can also be received.
        4. Finally, brokers C and D are restarted. Since A and B are now the leaders for everything, C and D must assume the role of follower replicas. When synchronizing the leader and follower logs, the messages which were received by C and D while they were still alive when A and B were dead can be lost by log file truncation if A or B now has a lower latest offset value. There can also potentially be different message data in the logs for the same offset values.

        In this scenario, this enhancement would introduce the configurable option to prevent the unclean leader election of A and B on step 3 above, so that the data remains consistent. However, the tradeoff to ensuring consistency is that the topics in this cluster would be unavailable to produce to or consume from until either C or D is revived.

        Show
        Andrew Olson added a comment - Yes, it is a more robust solution than simply requiring 2 acks or a majority quorum. If the ISR ever becomes empty, there is a significant risk that the next leader will not be the same as the previous one, likely resulting in data inconsistency. Consider the following example sequence of events, given a cluster with 4 brokers, a replication factor of 4, and 2 required acks. 1. Brokers A and B die simultaneously and are removed from the initially complete ISR set of (A,B,C,D). If A and B were leaders for any partitions, leadership is transferred to C or D. Messages can still be received and consumed by C and D, since ISR = (C,D). 2. The other two surviving brokers C and D are both then shutdown. The entire cluster is offline, so ISR = (). 3. Next, brokers A and B are restarted. As the only available brokers, they are the only candidates to be elected leader for all partitions, and ISR now = (A,B). This is called an "unclean" election since A and B were previously evicted from the ISR. All messages replicated to A and B before their sudden deaths are available for consumption, and new messages can also be received. 4. Finally, brokers C and D are restarted. Since A and B are now the leaders for everything, C and D must assume the role of follower replicas. When synchronizing the leader and follower logs, the messages which were received by C and D while they were still alive when A and B were dead can be lost by log file truncation if A or B now has a lower latest offset value. There can also potentially be different message data in the logs for the same offset values. In this scenario, this enhancement would introduce the configurable option to prevent the unclean leader election of A and B on step 3 above, so that the data remains consistent. However, the tradeoff to ensuring consistency is that the topics in this cluster would be unavailable to produce to or consume from until either C or D is revived.
        Hide
        Jason Rosenberg added a comment -

        Can you clarify, is this different than using request.required.acks = 2, always? (e.g. if there's only one broker in the ISR, no messages can be acknowledged).

        Show
        Jason Rosenberg added a comment - Can you clarify, is this different than using request.required.acks = 2, always? (e.g. if there's only one broker in the ISR, no messages can be acknowledged).
        Hide
        Andrew Olson added a comment -

        I am currently working on implementing this enhancement, and should be able to submit a patch for consideration by the end of next week.

        Show
        Andrew Olson added a comment - I am currently working on implementing this enhancement, and should be able to submit a patch for consideration by the end of next week.
        Hide
        Jason Rosenberg added a comment -

        Agreed on a default setting. Also, it would be nice if this could be configurable for topics identified with a regex, so that all topics that match ".reliable." will have this feature applied.

        Also, it would be good for it be applicable to existing topics, not only newly created ones, so existing topics in a running system can have the benefit of this feature.

        (In general too, default settings for topics identified via regex, and retroactive applicability, are desirable for all per topic config options).

        Show
        Jason Rosenberg added a comment - Agreed on a default setting. Also, it would be nice if this could be configurable for topics identified with a regex, so that all topics that match " .reliable. " will have this feature applied. Also, it would be good for it be applicable to existing topics, not only newly created ones, so existing topics in a running system can have the benefit of this feature. (In general too, default settings for topics identified via regex, and retroactive applicability, are desirable for all per topic config options).
        Hide
        Andrew Olson added a comment -

        Can a configurable global default be added as well? Our Kafka topics are programmatically created.

        Show
        Andrew Olson added a comment - Can a configurable global default be added as well? Our Kafka topics are programmatically created.

          People

          • Assignee:
            Andrew Olson
            Reporter:
            Scott Clasen
          • Votes:
            2 Vote for this issue
            Watchers:
            10 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development