Kafka
  1. Kafka
  2. KAFKA-532

Multiple controllers can co-exist during soft failures

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: None
    • Labels:

      Description

      If the current controller experiences an intermittent soft failure (GC pause) in the middle of leader election or partition reassignment, a new controller might get elected and start communicating new state change decisions to the brokers. After recovering from the soft failure, the old controller might continue sending some stale state change decisions to the brokers, resulting in unexpected failures. We need to introduce a controller generation id that increments with controller election. The brokers should reject any state change requests by a controller with an older generation id.

      1. kafka-532-v5.patch
        87 kB
        Neha Narkhede
      2. kafka-532-v4.patch
        59 kB
        Neha Narkhede
      3. kafka-532-v3.patch
        51 kB
        Neha Narkhede
      4. kafka-532-v2.patch
        53 kB
        Neha Narkhede
      5. kafka-532-v1.patch
        49 kB
        Neha Narkhede

        Activity

        Hide
        Neha Narkhede added a comment -

        Introduced a controller generation/epoch that increments after a successful controller election

        Changes include -
        1. Guard zookeeper writes by the controller with the controller epoch. This includes initializing a leader/isr path, e
        lecting leader for a partition and shrinking the isr for a partition
        2. Include controller epoch in the state change requests sent to the broker
        3. Include logic to discard state change requests with a stale controller epoch on the brokers

        Testing
        LeaderElectionTest: Added a unit test to send leader/isr request with a stale controller epoch and check if the bro
        ker discards the request and sends back the appropriate error code (StaleControllerEpochCode)

        Show
        Neha Narkhede added a comment - Introduced a controller generation/epoch that increments after a successful controller election Changes include - 1. Guard zookeeper writes by the controller with the controller epoch. This includes initializing a leader/isr path, e lecting leader for a partition and shrinking the isr for a partition 2. Include controller epoch in the state change requests sent to the broker 3. Include logic to discard state change requests with a stale controller epoch on the brokers Testing LeaderElectionTest: Added a unit test to send leader/isr request with a stale controller epoch and check if the bro ker discards the request and sends back the appropriate error code (StaleControllerEpochCode)
        Hide
        Jun Rao added a comment -

        Thanks for patch v1. Some comments:
        1. PartitionStateMachine.electLeaderForPartition(): We throw an exception if controllerEpoch != controller.epoch. It seems that we should only throw an exception if controllerEpoch > controller.epoch. The controllerEpoch in the leaderAndIsr path in ZK indicates the epoch of the controller when leader was changed last time. It's ok for a leader not to be changed during multiple generations of the controller. What we need to prevent is that if a newer generation of a controller has already changed the leader, the older generation of the controller can't touch the leaderAndIsr path any more. Also, it's not clear to me if we need the var retry. Ditto in ReplicaStateMachine.handleStateChange().

        2. ZkUtils.getLeaderAndIsrForPartition(): Instead of returning a tuple, could we return a LeaderAndIsrInfo object?

        3. ZookeeperLeaderElector: We keep the controller epoch in the value of the controller path. The problem is that the controller path is ephemeral. During a controller failover, the controller path will be gone and we won't be able to obtain the last controller epoch. We will need to store the controller epoch in a separate persistence path. That will probably complicate things a bit more.

        4. LeaderAndIsRRequest,StopReplicaRequest sizeInBytes(): When computing size, could we add a comment for each number which field is it for?

        5. Partition.updateISR(): If we want to keep the semantics that the controllerEpoch in the leaderIsrPath is the epoch of the controller when the leader is changed, we need to use the controller epoch in the current leaderIsrPath when updating ISR, not the latest controller epoch this broker has seen.

        6. ControllerChannelManager.sendRequestToBrokers: For similar reasons as the above, the controller epoch in the LeaderAndIsrRequest may not always be the epoch of the current controller (e.g., when resending the leadership info stored in ZK during controller failover).

        7. ReplicaManager: We will nee to maintain a controller epoc per partition. Do we still need to maintain a global controller epoch?

        Show
        Jun Rao added a comment - Thanks for patch v1. Some comments: 1. PartitionStateMachine.electLeaderForPartition(): We throw an exception if controllerEpoch != controller.epoch. It seems that we should only throw an exception if controllerEpoch > controller.epoch. The controllerEpoch in the leaderAndIsr path in ZK indicates the epoch of the controller when leader was changed last time. It's ok for a leader not to be changed during multiple generations of the controller. What we need to prevent is that if a newer generation of a controller has already changed the leader, the older generation of the controller can't touch the leaderAndIsr path any more. Also, it's not clear to me if we need the var retry. Ditto in ReplicaStateMachine.handleStateChange(). 2. ZkUtils.getLeaderAndIsrForPartition(): Instead of returning a tuple, could we return a LeaderAndIsrInfo object? 3. ZookeeperLeaderElector: We keep the controller epoch in the value of the controller path. The problem is that the controller path is ephemeral. During a controller failover, the controller path will be gone and we won't be able to obtain the last controller epoch. We will need to store the controller epoch in a separate persistence path. That will probably complicate things a bit more. 4. LeaderAndIsRRequest,StopReplicaRequest sizeInBytes(): When computing size, could we add a comment for each number which field is it for? 5. Partition.updateISR(): If we want to keep the semantics that the controllerEpoch in the leaderIsrPath is the epoch of the controller when the leader is changed, we need to use the controller epoch in the current leaderIsrPath when updating ISR, not the latest controller epoch this broker has seen. 6. ControllerChannelManager.sendRequestToBrokers: For similar reasons as the above, the controller epoch in the LeaderAndIsrRequest may not always be the epoch of the current controller (e.g., when resending the leadership info stored in ZK during controller failover). 7. ReplicaManager: We will nee to maintain a controller epoc per partition. Do we still need to maintain a global controller epoch?
        Hide
        Neha Narkhede added a comment -

        Thanks for the review, Jun !

        1, 3 That's a bug, fixed it
        2. Changed it to be a case class instead of tuple
        4. While adding comments, realized that there was a bug in the way we computed the size of the leader and isr request. The size had an extra 1 byte in the be
        ginning, not sure if its required or not. This is probably a bug introduce in the very first version of the controller that we didn't catch during testing.

        6. I'm afraid that will not solve the problem. The whole point of the controller generation is to prevent the brokers from following requests sent by a stale controller. It doesn't matter whether the controller is re-publishing the old controller's decision or making its own, once it sends the decision to the brokers, it is effectively certifying that decision to be the right one. Hence, both the leader and isr request as well as the stop replica request needs to contain the epoch of the controller sending the request.With the above semantics, the new controller should re-write the leader and isr path with its epoch after sending the leader and isr request to the brokers. However, re-writing the path during the controller failover will have performance implications on the controller failover latency. An alternative is to do this in the leader and isr response callback. Currently, we rely on asynchronous leader election to work correctly. Ideally, we need to be able to act on the event that the leader and isr response is either negative or lost. When this happens, leader election needs to be triggered again. Since this is asynchronous, we can also update the leader and isr path with the new controller's epoch on receiving a successful leader and isr response. If this sounds good, I can either make the changes in patch v3 or file another JIRA. Let me know what you prefer. Until then, the broker will re-write the zk path with the latest controller epoch, which is theoretically correct, but not semantically.

        5, 7. With the semantics mentioned above, the brokers should just write the isr with the controller epoch that it knows.

        Show
        Neha Narkhede added a comment - Thanks for the review, Jun ! 1, 3 That's a bug, fixed it 2. Changed it to be a case class instead of tuple 4. While adding comments, realized that there was a bug in the way we computed the size of the leader and isr request. The size had an extra 1 byte in the be ginning, not sure if its required or not. This is probably a bug introduce in the very first version of the controller that we didn't catch during testing. 6. I'm afraid that will not solve the problem. The whole point of the controller generation is to prevent the brokers from following requests sent by a stale controller. It doesn't matter whether the controller is re-publishing the old controller's decision or making its own, once it sends the decision to the brokers, it is effectively certifying that decision to be the right one. Hence, both the leader and isr request as well as the stop replica request needs to contain the epoch of the controller sending the request.With the above semantics, the new controller should re-write the leader and isr path with its epoch after sending the leader and isr request to the brokers. However, re-writing the path during the controller failover will have performance implications on the controller failover latency. An alternative is to do this in the leader and isr response callback. Currently, we rely on asynchronous leader election to work correctly. Ideally, we need to be able to act on the event that the leader and isr response is either negative or lost. When this happens, leader election needs to be triggered again. Since this is asynchronous, we can also update the leader and isr path with the new controller's epoch on receiving a successful leader and isr response. If this sounds good, I can either make the changes in patch v3 or file another JIRA. Let me know what you prefer. Until then, the broker will re-write the zk path with the latest controller epoch, which is theoretically correct, but not semantically. 5, 7. With the semantics mentioned above, the brokers should just write the isr with the controller epoch that it knows.
        Hide
        Jun Rao added a comment -

        The purpose of controller epoc is to prevent an older controller from overriding the data already updated by a newer controller. What we can do is that when a controller wants to update leaderAndIsr, it first checks and makes sure that the controller epoc stored in the path is less than or equal to the current controller epoc. Otherwise, the controller won't update the path. This way, the controller epoc associated with leaderAndIsr is only updated when it's truly needed, i.e., when the controller wants to update the leader or the isr. So we don't need to rewrite leaderAndIsr during controller failover. When sending leaderAndIsr requests, we just need to send the controller epoc stored in the leaderAndIsr path.

        Show
        Jun Rao added a comment - The purpose of controller epoc is to prevent an older controller from overriding the data already updated by a newer controller. What we can do is that when a controller wants to update leaderAndIsr, it first checks and makes sure that the controller epoc stored in the path is less than or equal to the current controller epoc. Otherwise, the controller won't update the path. This way, the controller epoc associated with leaderAndIsr is only updated when it's truly needed, i.e., when the controller wants to update the leader or the isr. So we don't need to rewrite leaderAndIsr during controller failover. When sending leaderAndIsr requests, we just need to send the controller epoc stored in the leaderAndIsr path.
        Hide
        Neha Narkhede added a comment -

        >> So we don't need to rewrite leaderAndIsr during controller failover. When sending leaderAndIsr requests, we just need to send the controller epoc stored in the leaderAndIsr path.

        That still doesn't solve the problem. Here is the problem with re-publishing the previous controller's decision with an older epoch-

        Let's say that controller with epoch 1 elected leaders for partitions p1 and p2. Over time, controller moves and epoch increments to 2. Let's say this controller reassigns partition p2 and re-elects leader for p2 as part of that. Now, controller with epoch 2 goes into a GC pause and controller moves to epoch 3. This new controller re-publishes leader and isr decision made by controller epoch 1 as well as epoch 2. Now, the same broker will receive leader and isr requests for 2 different epochs. So, it will reject the requests sent by epoch 1 since it already received requests with a higher epoch (2). Ignoring state change requests is dangerous and can lead to a situation where some partitions are offline.

        So, the controller should use its own epoch while sending state change requests to the brokers. Precisely, here is how it will work -

        1. The leader and isr path has the epoch of the controller that made the new leader/isr decision. The leader and isr path will be conditionally updated. If the conditional update fails, the controller re-reads the controller epoch value. If the epoch has changed, it knows that another controller has taken over and it aborts the state change operation.
        2. Whenever a controller sends a state change request to a broker (leaderAndIsr/stopReplica), it tags the request with its own epoch. In other words, it certifies that decision to be current and correct.
        3. Each broker maintains the last known highest controller epoch. The broker will reject any state change request that is tagged with a controller epoch value lower than what it knows.

        Since multiple controllers during leader election is tricky, lets dive into some details -

        When the controller changes leader/isr state, it first
        1. conditionally updates the zookeeper path for that partition
        2. sends the leader/isr request to the brokers

        If the controller goes into soft failure before step #1, a new controller will get elected and it will notice that partition is offline, elect the leader and send the leader/isr request to the broker with the new epoch. When the failed controller falls out of the soft failure, it will try to update the zk path, but will fail and abort the operation.

        If the controller goes into soft failure between steps 1 & 2, a new controller will get elected and will just resend failed controller's leader/isr decision to the broker using its own controller epoch. When the failed controller wakes up, it might try to send the leader/isr decision to the broker, but the broker will reject that request since it already knows a higher controller epoch

        These changes were covered in patch v2, uploading v3 after rebasing.

        Show
        Neha Narkhede added a comment - >> So we don't need to rewrite leaderAndIsr during controller failover. When sending leaderAndIsr requests, we just need to send the controller epoc stored in the leaderAndIsr path. That still doesn't solve the problem. Here is the problem with re-publishing the previous controller's decision with an older epoch- Let's say that controller with epoch 1 elected leaders for partitions p1 and p2. Over time, controller moves and epoch increments to 2. Let's say this controller reassigns partition p2 and re-elects leader for p2 as part of that. Now, controller with epoch 2 goes into a GC pause and controller moves to epoch 3. This new controller re-publishes leader and isr decision made by controller epoch 1 as well as epoch 2. Now, the same broker will receive leader and isr requests for 2 different epochs. So, it will reject the requests sent by epoch 1 since it already received requests with a higher epoch (2). Ignoring state change requests is dangerous and can lead to a situation where some partitions are offline. So, the controller should use its own epoch while sending state change requests to the brokers. Precisely, here is how it will work - 1. The leader and isr path has the epoch of the controller that made the new leader/isr decision. The leader and isr path will be conditionally updated. If the conditional update fails, the controller re-reads the controller epoch value. If the epoch has changed, it knows that another controller has taken over and it aborts the state change operation. 2. Whenever a controller sends a state change request to a broker (leaderAndIsr/stopReplica), it tags the request with its own epoch. In other words, it certifies that decision to be current and correct. 3. Each broker maintains the last known highest controller epoch. The broker will reject any state change request that is tagged with a controller epoch value lower than what it knows. Since multiple controllers during leader election is tricky, lets dive into some details - When the controller changes leader/isr state, it first 1. conditionally updates the zookeeper path for that partition 2. sends the leader/isr request to the brokers If the controller goes into soft failure before step #1, a new controller will get elected and it will notice that partition is offline, elect the leader and send the leader/isr request to the broker with the new epoch. When the failed controller falls out of the soft failure, it will try to update the zk path, but will fail and abort the operation. If the controller goes into soft failure between steps 1 & 2, a new controller will get elected and will just resend failed controller's leader/isr decision to the broker using its own controller epoch. When the failed controller wakes up, it might try to send the leader/isr decision to the broker, but the broker will reject that request since it already knows a higher controller epoch These changes were covered in patch v2, uploading v3 after rebasing.
        Hide
        Jun Rao added a comment -

        Thanks for patch v3. The overall approach seems to work. Some comments:

        30. PartitionStateInfo and LeaderAndIsrRequest: When calculating size in sizeInBytes(), it's more readable if we put each number to be added in a separate line.

        31. Partition.updateIsr(): I am thinking about what controllerEpoch the leader should use when updating the leaderAndIsr path. There is probably nothing wrong to use the controllerEpoch in replicaManager. However, it seems to make more sense to use the controllerEpoch in the leaderAndIsr path itself, since this update is actually not made by the controller.

        32. ReplicaManager.controllerEpoch: Since this variable can be accessed from different threads, it needs to be a volatile. Also, we only need to update controllerEpoch if the one from the request is larger (but not equal). It probably should be initialized to 0 or -1?

        33. LeaderElectionTest.testLeaderElectionWithStaleControllerEpoch(): I wonder if we really need to start a new broker. Can we just send a stale controller epoc using the controllerChannelManager in the current controller?

        34. KafkaController: There seems to be a tricky issue with incrementing the controller epoc. We increment epoc in onControllerFailover() after the broker becomes a controller. What could happen is that broker 1 becomes the controller and goes to GC before we increment the epoc. Broker 2 becomes the new controller and increments the epoc. Broker 1 comes back from gc and increments epoc again. Now, broker 1's controller epoc is actually larger. Not sure what's the best way to address this. One thought is that immediately after controller epoc is incremented in onControllerFailover(), we check if this broker is still the controller (by reading the controller path in ZK). If not, we throw an exception. Also, epoc probably should be initialized to 0 if we want the first controller to have epoc 1.

        35. We use int to represent both controller and leader epoc. There is the potential issue if the number wraps. We probably don't need to worry about it now.

        Show
        Jun Rao added a comment - Thanks for patch v3. The overall approach seems to work. Some comments: 30. PartitionStateInfo and LeaderAndIsrRequest: When calculating size in sizeInBytes(), it's more readable if we put each number to be added in a separate line. 31. Partition.updateIsr(): I am thinking about what controllerEpoch the leader should use when updating the leaderAndIsr path. There is probably nothing wrong to use the controllerEpoch in replicaManager. However, it seems to make more sense to use the controllerEpoch in the leaderAndIsr path itself, since this update is actually not made by the controller. 32. ReplicaManager.controllerEpoch: Since this variable can be accessed from different threads, it needs to be a volatile. Also, we only need to update controllerEpoch if the one from the request is larger (but not equal). It probably should be initialized to 0 or -1? 33. LeaderElectionTest.testLeaderElectionWithStaleControllerEpoch(): I wonder if we really need to start a new broker. Can we just send a stale controller epoc using the controllerChannelManager in the current controller? 34. KafkaController: There seems to be a tricky issue with incrementing the controller epoc. We increment epoc in onControllerFailover() after the broker becomes a controller. What could happen is that broker 1 becomes the controller and goes to GC before we increment the epoc. Broker 2 becomes the new controller and increments the epoc. Broker 1 comes back from gc and increments epoc again. Now, broker 1's controller epoc is actually larger. Not sure what's the best way to address this. One thought is that immediately after controller epoc is incremented in onControllerFailover(), we check if this broker is still the controller (by reading the controller path in ZK). If not, we throw an exception. Also, epoc probably should be initialized to 0 if we want the first controller to have epoc 1. 35. We use int to represent both controller and leader epoc. There is the potential issue if the number wraps. We probably don't need to worry about it now.
        Hide
        Neha Narkhede added a comment -

        >> 34. KafkaController: There seems to be a tricky issue with incrementing the controller epoc. We increment epoc in onControllerFailover() after the broker becomes a controller. What could happen is that broker 1 becomes the controller and goes to GC before we increment the epoc. Broker 2 becomes the new controller and increments the epoc. Broker 1 comes back from gc and increments epoc again. Now, broker 1's controller epoc is actually larger. Not sure what's the best way to address this. One thought is that immediately after controller epoc is incremented in onControllerFailover(), we check if this broker is still the controller (by reading the controller path in ZK). If not, we throw an exception. Also, epoc probably should be initialized to 0 if we want the first controller to have epoc 1.

        Good point, I missed this earlier. But just raising an exception after writing to the zk path might not be the best solution. This is because it breaks the guarantee that the active controller has the largest epoch in the system. I can't think of an example that would lead to a bug, but it gives me a feeling that this could cause unforeseen-hard-to-debug issues in the future. On the surface, it doesn't seem to be incorrect. However, here is another solution that seems to cover the corner cases -

        Every broker registers a watch on the /controllerEpoch persistent path and caches the latest controller epoch and zk version. When a broker becomes controller, it uses this cached zk version to do the conditional write. Now, in the event that another controller takes over when the current controller goes into GC after election, the new controller will use the previous controller's zk version and successfully update the zk path. When the older controller comes back, it will try to use a stale zk version and its zookeeper write will fail. It will not be able to update the new zk version before the write since they are guarded by the same lock.

        If this sounds good, I will upload another patch that includes the fix

        Show
        Neha Narkhede added a comment - >> 34. KafkaController: There seems to be a tricky issue with incrementing the controller epoc. We increment epoc in onControllerFailover() after the broker becomes a controller. What could happen is that broker 1 becomes the controller and goes to GC before we increment the epoc. Broker 2 becomes the new controller and increments the epoc. Broker 1 comes back from gc and increments epoc again. Now, broker 1's controller epoc is actually larger. Not sure what's the best way to address this. One thought is that immediately after controller epoc is incremented in onControllerFailover(), we check if this broker is still the controller (by reading the controller path in ZK). If not, we throw an exception. Also, epoc probably should be initialized to 0 if we want the first controller to have epoc 1. Good point, I missed this earlier. But just raising an exception after writing to the zk path might not be the best solution. This is because it breaks the guarantee that the active controller has the largest epoch in the system. I can't think of an example that would lead to a bug, but it gives me a feeling that this could cause unforeseen-hard-to-debug issues in the future. On the surface, it doesn't seem to be incorrect. However, here is another solution that seems to cover the corner cases - Every broker registers a watch on the /controllerEpoch persistent path and caches the latest controller epoch and zk version. When a broker becomes controller, it uses this cached zk version to do the conditional write. Now, in the event that another controller takes over when the current controller goes into GC after election, the new controller will use the previous controller's zk version and successfully update the zk path. When the older controller comes back, it will try to use a stale zk version and its zookeeper write will fail. It will not be able to update the new zk version before the write since they are guarded by the same lock. If this sounds good, I will upload another patch that includes the fix
        Hide
        Neha Narkhede added a comment -

        31. Partition.updateIsr(): I am thinking about what controllerEpoch the leader should use when updating the leaderAndIsr path. There is probably nothing wrong to use the controllerEpoch in replicaManager. However, it seems to make more sense to use the controllerEpoch in the leaderAndIsr path itself, since this update is actually not made by the controller.

        You make a good point, I agree that it probably makes more sense to keep the decision maker's controller epoch while changing the isr. Fixed it

        32. ReplicaManager.controllerEpoch: Since this variable can be accessed from different threads, it needs to be a volatile. Also, we only need to update controllerEpoch if the one from the request is larger (but not equal). It probably should be initialized to 0 or -1?

        Good catch, fixed it.

        33. LeaderElectionTest.testLeaderElectionWithStaleControllerEpoch(): I wonder if we really need to start a new broker. Can we just send a stale controller epoc using the controllerChannelManager in the current controller?

        I just thought it makes it simpler to understand the logic if there is another broker that acts as the new controller, but you are right. I could've just hijacked the old controller's channel manager

        34. KafkaController: There seems to be a tricky issue with incrementing the controller epoc. We increment epoc in onControllerFailover() after the broker becomes a controller. What could happen is that broker 1 becomes the controller and goes to GC before we increment the epoc. Broker 2 becomes the new controller and increments the epoc. Broker 1 comes back from gc and increments epoc again. Now, broker 1's controller epoc is actually larger. Not sure what's the best way to address this. One thought is that immediately after controller epoc is incremented in onControllerFailover(), we check if this broker is still the controller (by reading the controller path in ZK). If not, we throw an exception. Also, epoc probably should be initialized to 0 if we want the first controller to have epoc 1.

        Implemented the fix I described earlier for this.

        Show
        Neha Narkhede added a comment - 31. Partition.updateIsr(): I am thinking about what controllerEpoch the leader should use when updating the leaderAndIsr path. There is probably nothing wrong to use the controllerEpoch in replicaManager. However, it seems to make more sense to use the controllerEpoch in the leaderAndIsr path itself, since this update is actually not made by the controller. You make a good point, I agree that it probably makes more sense to keep the decision maker's controller epoch while changing the isr. Fixed it 32. ReplicaManager.controllerEpoch: Since this variable can be accessed from different threads, it needs to be a volatile. Also, we only need to update controllerEpoch if the one from the request is larger (but not equal). It probably should be initialized to 0 or -1? Good catch, fixed it. 33. LeaderElectionTest.testLeaderElectionWithStaleControllerEpoch(): I wonder if we really need to start a new broker. Can we just send a stale controller epoc using the controllerChannelManager in the current controller? I just thought it makes it simpler to understand the logic if there is another broker that acts as the new controller, but you are right. I could've just hijacked the old controller's channel manager 34. KafkaController: There seems to be a tricky issue with incrementing the controller epoc. We increment epoc in onControllerFailover() after the broker becomes a controller. What could happen is that broker 1 becomes the controller and goes to GC before we increment the epoc. Broker 2 becomes the new controller and increments the epoc. Broker 1 comes back from gc and increments epoc again. Now, broker 1's controller epoc is actually larger. Not sure what's the best way to address this. One thought is that immediately after controller epoc is incremented in onControllerFailover(), we check if this broker is still the controller (by reading the controller path in ZK). If not, we throw an exception. Also, epoc probably should be initialized to 0 if we want the first controller to have epoc 1. Implemented the fix I described earlier for this.
        Hide
        Jun Rao added a comment -

        Thanks for patch v4. A few more comments:

        40. PartitionStateInfo: It seems that we need to send the controllerEpoc associated with this partition. Note that this epoc is different from the controllerEpoc in LeaderAndIsrRequest. The former is the epoc of the controller that last changed the leader or isr and will be used when broker updates the isr. The latter is the epoc of the controller that sends the request and will be used in ReplicaManager to decide which controller's decision to follow. We will need to change the controllerEpoc passed to makeLeader and makeFollower in ReplicaManager accordingly.

        41. ReplicaManager: In stopReplicas() and becomeLeaderOrFollower(), it would be better to only update controllerEpoch when it's truly necessary, i.e., the new controllerEpoch is larger than the cached one (not equal). This is because updating a volatile variable is a bit expensive than updating a local variable since the update has to be exposed to other threads.

        42. KafkaController: The approach in the new patch works. There are a few corner cases that we need to cover.
        42.1. incrementControllerEpoch(): If the controllerEpoc path doesn't exist, we create the path using the initial epoc version without using conditional update. It is possible for 2 controllers to execute this logic simultaneously and both get the initial epoc version. One solution is to make sure the controller epoc path exists during context initialization. Then we can always use conditional update here.
        42.2. ControllerContext: We need to initialize controllerEpoc by reading from ZK. We also need to make sure that we subscribe to the controllerEpoc path first and then read its value from ZK for initialization.
        42.3. ControllerEpochListener: It's safer to set both the epoc and the ZK version using the value from ZkUtils.readData.

        43. ControllerMovedException is missing in the patch

        Show
        Jun Rao added a comment - Thanks for patch v4. A few more comments: 40. PartitionStateInfo: It seems that we need to send the controllerEpoc associated with this partition. Note that this epoc is different from the controllerEpoc in LeaderAndIsrRequest. The former is the epoc of the controller that last changed the leader or isr and will be used when broker updates the isr. The latter is the epoc of the controller that sends the request and will be used in ReplicaManager to decide which controller's decision to follow. We will need to change the controllerEpoc passed to makeLeader and makeFollower in ReplicaManager accordingly. 41. ReplicaManager: In stopReplicas() and becomeLeaderOrFollower(), it would be better to only update controllerEpoch when it's truly necessary, i.e., the new controllerEpoch is larger than the cached one (not equal). This is because updating a volatile variable is a bit expensive than updating a local variable since the update has to be exposed to other threads. 42. KafkaController: The approach in the new patch works. There are a few corner cases that we need to cover. 42.1. incrementControllerEpoch(): If the controllerEpoc path doesn't exist, we create the path using the initial epoc version without using conditional update. It is possible for 2 controllers to execute this logic simultaneously and both get the initial epoc version. One solution is to make sure the controller epoc path exists during context initialization. Then we can always use conditional update here. 42.2. ControllerContext: We need to initialize controllerEpoc by reading from ZK. We also need to make sure that we subscribe to the controllerEpoc path first and then read its value from ZK for initialization. 42.3. ControllerEpochListener: It's safer to set both the epoc and the ZK version using the value from ZkUtils.readData. 43. ControllerMovedException is missing in the patch
        Hide
        Neha Narkhede added a comment -

        ew more changes in this patch -

        1. Changed leader and isr request to send the controller epoch that made the last change for leader/isr per partition. This is used by the broker to update the leader and isr path with the correct controller epoch. Each Partition object on a Kafka server will maintain the epoch of the controller that made the last leader/isr decision. If/when the broker changes the isr, it uses the correct value for the controller epoch, instead of using the currently active controller's epoch. Functionally, nothing bad will happen even if it uses the currently active controller's epoch (that is sent on every state change request), but semantically it will not quite be right to do so. This can happen when a previous controller has made the leader/isr decisions for partitions, while the newer controllers have merely re-published those decisions upon controller failover.
        2. Changed the become controller procedure to resign as the current controller if it runs into any unexpected error/exception while making the state change to become controller. This is to ensure that the currently elected controller is actually serving as the controller.
        3. LogRecoveryTest and LogText occasionally fail, but I believe they fail on our nightly build as well. Didn't attempt to fix those tests in this patch.

        Regarding Jun's review -

        >> 40. PartitionStateInfo: It seems that we need to send the controllerEpoc associated with this partition. Note that this epoc is different from the controllerEpoc in LeaderAndIsrRequest. The former is the epoc of the controller that last changed the leader or isr and will be used when broker updates the isr. The latter is the epoc of the controller that sends the request and will be used in ReplicaManager to decide which controller's decision to follow. We will need to change the controllerEpoc passed to makeLeader and makeFollower in ReplicaManager accordingly.

        You raise a good point here. What I missed is initializing the controller epoch for each partition. There are 2 ways to initialize it 1. zookeeper read on startup 2. Active controller sending the controller epoch of the controller that last made a leader/isr decision for that partition. I'm guessing 2. might be better from a performance perspective.

        >> 41. ReplicaManager: In stopReplicas() and becomeLeaderOrFollower(), it would be better to only update controllerEpoch when it's truly necessary, i.e., the new controllerEpoch is larger than the cached one (not equal). This is because updating a volatile variable is a bit expensive than updating a local variable since the update has to be exposed to other threads.

        Not sure if this is a performance win. Volatile variables are never cached in memory registers. So a read needs to reload data from memory and write needs to write data back to memory. The if statement would need to access the same volatile variable, requiring it to go to memory anyways.

        >> 2.1. incrementControllerEpoch(): If the controllerEpoc path doesn't exist, we create the path using the initial epoc version without using conditional update. It is possible for 2 controllers to execute this logic simultaneously and both get the initial epoc version. One solution is to make sure the controller epoc path exists during context initialization. Then we can always use conditional update here.

        It is not possible for 2 clients to create the same zookeeper path. This is the simplest guarantee zookeeper provides. One of the writes will fail and that controller will abort its controller startup procedure. The larger problem here is not so much that one of the writes should fail, but we need to ensure that if the failed zk operation happens to be for the latest active controller, then it will abort its controller startup procedure and the old one will lose its zookeeper session anyways

        >> 42.2. ControllerContext: We need to initialize controllerEpoc by reading from ZK. We also need to make sure that we subscribe to the controllerEpoc path first and then read its value from ZK for initialization.

        The controller constructor is modified to initialize the controller epoch and zk version by reading from zk and then it subscribes to controller epoch's zk path.

        >> 42.3. ControllerEpochListener: It's safer to set both the epoc and the ZK version using the value from ZkUtils.readData.

        You're right and there is no perfect solution to this. Ideally, the zkclient API should change to expose the version since the underlying zookeeper API exposes it. The problem is that there will always be a window after the listener has fired and before the read returns when the controller's epoch could change. There will be another listener fired, though during each listener invocation, this problem would exist. The right way is to rely on the data the listener returns to the controller. But, with this change, at least the epoch and its version will correspond to the same epoch change, so its still better.

        Show
        Neha Narkhede added a comment - ew more changes in this patch - 1. Changed leader and isr request to send the controller epoch that made the last change for leader/isr per partition. This is used by the broker to update the leader and isr path with the correct controller epoch. Each Partition object on a Kafka server will maintain the epoch of the controller that made the last leader/isr decision. If/when the broker changes the isr, it uses the correct value for the controller epoch, instead of using the currently active controller's epoch. Functionally, nothing bad will happen even if it uses the currently active controller's epoch (that is sent on every state change request), but semantically it will not quite be right to do so. This can happen when a previous controller has made the leader/isr decisions for partitions, while the newer controllers have merely re-published those decisions upon controller failover. 2. Changed the become controller procedure to resign as the current controller if it runs into any unexpected error/exception while making the state change to become controller. This is to ensure that the currently elected controller is actually serving as the controller. 3. LogRecoveryTest and LogText occasionally fail, but I believe they fail on our nightly build as well. Didn't attempt to fix those tests in this patch. Regarding Jun's review - >> 40. PartitionStateInfo: It seems that we need to send the controllerEpoc associated with this partition. Note that this epoc is different from the controllerEpoc in LeaderAndIsrRequest. The former is the epoc of the controller that last changed the leader or isr and will be used when broker updates the isr. The latter is the epoc of the controller that sends the request and will be used in ReplicaManager to decide which controller's decision to follow. We will need to change the controllerEpoc passed to makeLeader and makeFollower in ReplicaManager accordingly. You raise a good point here. What I missed is initializing the controller epoch for each partition. There are 2 ways to initialize it 1. zookeeper read on startup 2. Active controller sending the controller epoch of the controller that last made a leader/isr decision for that partition. I'm guessing 2. might be better from a performance perspective. >> 41. ReplicaManager: In stopReplicas() and becomeLeaderOrFollower(), it would be better to only update controllerEpoch when it's truly necessary, i.e., the new controllerEpoch is larger than the cached one (not equal). This is because updating a volatile variable is a bit expensive than updating a local variable since the update has to be exposed to other threads. Not sure if this is a performance win. Volatile variables are never cached in memory registers. So a read needs to reload data from memory and write needs to write data back to memory. The if statement would need to access the same volatile variable, requiring it to go to memory anyways. >> 2.1. incrementControllerEpoch(): If the controllerEpoc path doesn't exist, we create the path using the initial epoc version without using conditional update. It is possible for 2 controllers to execute this logic simultaneously and both get the initial epoc version. One solution is to make sure the controller epoc path exists during context initialization. Then we can always use conditional update here. It is not possible for 2 clients to create the same zookeeper path. This is the simplest guarantee zookeeper provides. One of the writes will fail and that controller will abort its controller startup procedure. The larger problem here is not so much that one of the writes should fail, but we need to ensure that if the failed zk operation happens to be for the latest active controller, then it will abort its controller startup procedure and the old one will lose its zookeeper session anyways >> 42.2. ControllerContext: We need to initialize controllerEpoc by reading from ZK. We also need to make sure that we subscribe to the controllerEpoc path first and then read its value from ZK for initialization. The controller constructor is modified to initialize the controller epoch and zk version by reading from zk and then it subscribes to controller epoch's zk path. >> 42.3. ControllerEpochListener: It's safer to set both the epoc and the ZK version using the value from ZkUtils.readData. You're right and there is no perfect solution to this. Ideally, the zkclient API should change to expose the version since the underlying zookeeper API exposes it. The problem is that there will always be a window after the listener has fired and before the read returns when the controller's epoch could change. There will be another listener fired, though during each listener invocation, this problem would exist. The right way is to rely on the data the listener returns to the controller. But, with this change, at least the epoch and its version will correspond to the same epoch change, so its still better.
        Hide
        Jun Rao added a comment -

        Thanks for patch v5. Looks good. Just a couple of minor comments.

        50. LeaderAndIsrResponse and StopReplicaResponse: Currently, for all types of response, we have moved to the model that there is no global error code at the response level. Instead, if a request can't be processed for any partition, we just set the same error code for each partition in the response. This achieves the same effect, but makes the handling of the response easier. One just has to deal with the error code per partition.

        51. Are the changes in test/resources/log4j.properties intended?

        Show
        Jun Rao added a comment - Thanks for patch v5. Looks good. Just a couple of minor comments. 50. LeaderAndIsrResponse and StopReplicaResponse: Currently, for all types of response, we have moved to the model that there is no global error code at the response level. Instead, if a request can't be processed for any partition, we just set the same error code for each partition in the response. This achieves the same effect, but makes the handling of the response easier. One just has to deal with the error code per partition. 51. Are the changes in test/resources/log4j.properties intended?
        Hide
        Neha Narkhede added a comment -

        Thanks for reviewing patch v5 quickly !

        50. True, thought about it. But in both of these cases, the error code is a request level error code. It is saying that this request is coming from an invalid controller. It is true that for other requests, there are just partition level error codes, but I'm guessing there are error codes that make sense at the request level as well. When you get request level errors, the response is empty and the error code is set. The other option of dealing with this is, if you have a request level error code, set it as every partition's error code, and skip the request level error code. But this seemed a little hacky since now the response map will not be empty and will contain each partition with the same error code. So it felt like we are trying to retrofit something to match the currently chosen response format. I can see both sides and probably the user's convenience would help pick one option. If this discussion already happened sometime, I can make the change right away. If not, do you think its worthwhile to quickly check on the user list ? If so, I can commit this patch and make that change, whatever is decided, in a follow up.

        51. Not intended, will remove on commit.

        Show
        Neha Narkhede added a comment - Thanks for reviewing patch v5 quickly ! 50. True, thought about it. But in both of these cases, the error code is a request level error code. It is saying that this request is coming from an invalid controller. It is true that for other requests, there are just partition level error codes, but I'm guessing there are error codes that make sense at the request level as well. When you get request level errors, the response is empty and the error code is set. The other option of dealing with this is, if you have a request level error code, set it as every partition's error code, and skip the request level error code. But this seemed a little hacky since now the response map will not be empty and will contain each partition with the same error code. So it felt like we are trying to retrofit something to match the currently chosen response format. I can see both sides and probably the user's convenience would help pick one option. If this discussion already happened sometime, I can make the change right away. If not, do you think its worthwhile to quickly check on the user list ? If so, I can commit this patch and make that change, whatever is decided, in a follow up. 51. Not intended, will remove on commit.
        Hide
        Jun Rao added a comment -

        50. My feeling is that request level error code conveys the same meaning that every partition fails with the same error code and my preference is to keep all response format consistent. However, if you prefer, it's ok to check in the patch as it is and revisit it when we finalize the wire format. So, +1 from me on the patch.

        Show
        Jun Rao added a comment - 50. My feeling is that request level error code conveys the same meaning that every partition fails with the same error code and my preference is to keep all response format consistent. However, if you prefer, it's ok to check in the patch as it is and revisit it when we finalize the wire format. So, +1 from me on the patch.
        Hide
        Neha Narkhede added a comment -

        50. Agree that the request/response formats must be consistent. Let's bring this up while finalizing the format, I will put in the change, if required, immediately.

        Thanks a lot for the review, committed v5 !

        Show
        Neha Narkhede added a comment - 50. Agree that the request/response formats must be consistent. Let's bring this up while finalizing the format, I will put in the change, if required, immediately. Thanks a lot for the review, committed v5 !

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Due:
              Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 48h
              48h
              Remaining:
              Remaining Estimate - 48h
              48h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development