Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3083

a soft failure in controller may leave a topic partition in an inconsistent state

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.9.0.0
    • Fix Version/s: 1.1.0
    • Component/s: core
    • Labels:

      Description

      The following sequence can happen.

      1. Broker A is the controller and is in the middle of processing a broker change event. As part of this process, let's say it's about to shrink the isr of a partition.

      2. Then broker A's session expires and broker B takes over as the new controller. Broker B sends the initial leaderAndIsr request to all brokers.

      3. Broker A continues by shrinking the isr of the partition in ZK and sends the new leaderAndIsr request to the broker (say C) that leads the partition. Broker C will reject this leaderAndIsr since the request comes from a controller with an older epoch. Now we could be in a situation that Broker C thinks the isr has all replicas, but the isr stored in ZK is different.

        Issue Links

          Activity

          Hide
          junrao Jun Rao added a comment -

          This is now fixed in KAFKA-5642.

          Show
          junrao Jun Rao added a comment - This is now fixed in KAFKA-5642 .
          Hide
          guozhang Guozhang Wang added a comment -

          Dibyendu Bhattacharya KAFKA-5027 is being actively worked on and expected to be released in the next release (mid Oct.), which would resolve this issue.

          Show
          guozhang Guozhang Wang added a comment - Dibyendu Bhattacharya KAFKA-5027 is being actively worked on and expected to be released in the next release (mid Oct.), which would resolve this issue.
          Hide
          dibbhatt Dibyendu Bhattacharya added a comment -

          Hi Jun Rao We also had this issue in Kafka 0.9.x. Any idea when this can be fixed.

          Show
          dibbhatt Dibyendu Bhattacharya added a comment - Hi Jun Rao We also had this issue in Kafka 0.9.x. Any idea when this can be fixed.
          Hide
          stephane.maarek@gmail.com Stephane Maarek added a comment - - edited

          Just hit this issue in prod with Kafka 0.10.2.0, and the only solution was to reboot the broker that got put in an inconsistent state

          Show
          stephane.maarek@gmail.com Stephane Maarek added a comment - - edited Just hit this issue in prod with Kafka 0.10.2.0, and the only solution was to reboot the broker that got put in an inconsistent state
          Hide
          mgharat Mayuresh Gharat added a comment -

          Flavio Junqueira do we have an umbrella jira where this issue is been tracked with the changes required to be made that are mentioned in this patch?

          Show
          mgharat Mayuresh Gharat added a comment - Flavio Junqueira do we have an umbrella jira where this issue is been tracked with the changes required to be made that are mentioned in this patch?
          Hide
          junrao Jun Rao added a comment -

          Someone encountered another issue related to this. After a broker's ZK session expires and it resigns as the controller, there is the following error in the controller log.

          2016-08-13 17:34:23,721 ERROR org.I0Itec.zkclient.ZkEventThread:77 [ZkClient-EventThread-87- [run] Error handling event ZkEvent[Children of /isr_change_notification changed sent to kafka.controller.IsrChangeNotificationListener@3c60b0b1]
          java.lang.IllegalStateException: java.lang.NullPointerException
          at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:435)
          at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:1029)
          at kafka.controller.IsrChangeNotificationListener.kafka$controller$IsrChangeNotificationListener$$processUpdateNotifications(KafkaController.scala:1372)
          at kafka.controller.IsrChangeNotificationListener$$anonfun$handleChildChange$1.apply$mcV$sp(KafkaController.scala:1359)
          at kafka.controller.IsrChangeNotificationListener$$anonfun$handleChildChange$1.apply(KafkaController.scala:1352)
          at kafka.controller.IsrChangeNotificationListener$$anonfun$handleChildChange$1.apply(KafkaController.scala:1352)
          at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
          at kafka.controller.IsrChangeNotificationListener.handleChildChange(KafkaController.scala:1352)
          at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
          at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
          Caused by: java.lang.NullPointerException
          at kafka.controller.KafkaController.sendRequest(KafkaController.scala:699)
          at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:404)
          at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:370)
          at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
          at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
          at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
          at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
          at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
          at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:370)
          ... 9 more

          The broker fails to send an UpdateMetadataRequest in react to an ISR change event since controllerChannelManager is null after the broker resigns as the controller. When this happen, the broker calls the logic to force a controller to resign. This could accidentally delete the controller path created by another broker.

          2016-08-13 17:34:23,639 ERROR kafka.utils.Logging$class:97 [ZkClient-EventThread-87-] [error] [Controller 43]: Forcing the controller to resign

          Show
          junrao Jun Rao added a comment - Someone encountered another issue related to this. After a broker's ZK session expires and it resigns as the controller, there is the following error in the controller log. 2016-08-13 17:34:23,721 ERROR org.I0Itec.zkclient.ZkEventThread:77 [ZkClient-EventThread-87- [run] Error handling event ZkEvent [Children of /isr_change_notification changed sent to kafka.controller.IsrChangeNotificationListener@3c60b0b1] java.lang.IllegalStateException: java.lang.NullPointerException at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:435) at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:1029) at kafka.controller.IsrChangeNotificationListener.kafka$controller$IsrChangeNotificationListener$$processUpdateNotifications(KafkaController.scala:1372) at kafka.controller.IsrChangeNotificationListener$$anonfun$handleChildChange$1.apply$mcV$sp(KafkaController.scala:1359) at kafka.controller.IsrChangeNotificationListener$$anonfun$handleChildChange$1.apply(KafkaController.scala:1352) at kafka.controller.IsrChangeNotificationListener$$anonfun$handleChildChange$1.apply(KafkaController.scala:1352) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.controller.IsrChangeNotificationListener.handleChildChange(KafkaController.scala:1352) at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Caused by: java.lang.NullPointerException at kafka.controller.KafkaController.sendRequest(KafkaController.scala:699) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:404) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:370) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:370) ... 9 more The broker fails to send an UpdateMetadataRequest in react to an ISR change event since controllerChannelManager is null after the broker resigns as the controller. When this happen, the broker calls the logic to force a controller to resign. This could accidentally delete the controller path created by another broker. 2016-08-13 17:34:23,639 ERROR kafka.utils.Logging$class:97 [ZkClient-EventThread-87-] [error] [Controller 43] : Forcing the controller to resign
          Hide
          fpj Flavio Junqueira added a comment -

          Sure, we need to transform all operations to look like what we currently have in ZKCheckedEphemeral. That particular class is a bit special because it performs checks and such, but essentially we need to change the current calls in ZkUtils to use asynchronous calls using the ZK handle directly and have a callback class that pairs up with the call.

          Related to this present issue, we will also need to implement session management, but this time it can't try to be transparent like ZkClient does. It is good to have a central point to get the current zk handle from, but we need to give the broker the ability to signal when to create a new session. As part of this signaling, we will need to implement some kind of listener to propagate events. Another option is to let the broker implement directly a Watcher to process event notifications.

          One simple way to start is to replace gradually the calls in ZkUtils with asynchronous calls, still using the handle ZkUtils provide. The calls would block to maintain the current behavior outside ZkUtils. Once that's done, we can make the calls non-blocking and do the necessary changes across broker/controller. Finally, we can replace the session management with our own last.

          If you guys want to do this, then we should probably create an umbrella jira.

          Show
          fpj Flavio Junqueira added a comment - Sure, we need to transform all operations to look like what we currently have in ZKCheckedEphemeral. That particular class is a bit special because it performs checks and such, but essentially we need to change the current calls in ZkUtils to use asynchronous calls using the ZK handle directly and have a callback class that pairs up with the call. Related to this present issue, we will also need to implement session management, but this time it can't try to be transparent like ZkClient does. It is good to have a central point to get the current zk handle from, but we need to give the broker the ability to signal when to create a new session. As part of this signaling, we will need to implement some kind of listener to propagate events. Another option is to let the broker implement directly a Watcher to process event notifications. One simple way to start is to replace gradually the calls in ZkUtils with asynchronous calls, still using the handle ZkUtils provide. The calls would block to maintain the current behavior outside ZkUtils. Once that's done, we can make the calls non-blocking and do the necessary changes across broker/controller. Finally, we can replace the session management with our own last. If you guys want to do this, then we should probably create an umbrella jira.
          Hide
          fpj Flavio Junqueira added a comment -

          1) We need to use a multi-op that combines the update to the ISR and a znode check. The znode check verifies that the version of the controller leadership znode is still the same and if it passes, then the ISR data is updated.

          I was really just thinking out loud, the multiop is just a hack to get around the fact that controller broker doesn't know if the underlying session has been recreated or not. The comment about using multiop was simply pointing that you can check and update atomically with this multiop recipe. If we do this the right way, then we don't need to use a multiop call.

          2) The race condition that Jun Rao mentioned still exist above in 1).

          It still exists but the multiop would fail to perform the update on ZK if you're checking a version.

          4) To do step 3), as Jun Rao suggested we have to detect the connection loss event.

          There are two parts. Detecting connection loss is one of them. If the controller isn't sure about its session when it receives connection loss, then it should stop. The second part is not to create a new session if the previous one expired. If the session of A has expired, which must happen by step 2) otherwise B can't be elected, then A isn't able to get requests completed on the expired session. Once B is elected, the session of A must have expired and no update coming from A will be executed. Of course, we want to bring broker A back up and to do it, we need to start a new session. However, before starting a new session, we need to make sure to stop any controller work in A.

          i) Broker A has connection loss and connects immediately in which case it gets a SyncConnected event. Now the session MIGHT NOT have expired since the connection happened immediately. Broker A is expected to continue since it is still the controller and the session has not expired. ii) Broker A has connection loss and connects back in which case it gets a SyncConnected event. Now the session MIGHT have expired. Broker A is expected to stop all the zk operations.

          The broker will only get SyncConnected if it connects and it is able to validate the session. If the session is invalid, then it gets an Expired notification. Note that if we are using SASL to authenticate, then we could be also getting an authenticated event.

          Show
          fpj Flavio Junqueira added a comment - 1) We need to use a multi-op that combines the update to the ISR and a znode check. The znode check verifies that the version of the controller leadership znode is still the same and if it passes, then the ISR data is updated. I was really just thinking out loud, the multiop is just a hack to get around the fact that controller broker doesn't know if the underlying session has been recreated or not. The comment about using multiop was simply pointing that you can check and update atomically with this multiop recipe. If we do this the right way, then we don't need to use a multiop call. 2) The race condition that Jun Rao mentioned still exist above in 1). It still exists but the multiop would fail to perform the update on ZK if you're checking a version. 4) To do step 3), as Jun Rao suggested we have to detect the connection loss event. There are two parts. Detecting connection loss is one of them. If the controller isn't sure about its session when it receives connection loss, then it should stop. The second part is not to create a new session if the previous one expired. If the session of A has expired, which must happen by step 2) otherwise B can't be elected, then A isn't able to get requests completed on the expired session. Once B is elected, the session of A must have expired and no update coming from A will be executed. Of course, we want to bring broker A back up and to do it, we need to start a new session. However, before starting a new session, we need to make sure to stop any controller work in A. i) Broker A has connection loss and connects immediately in which case it gets a SyncConnected event. Now the session MIGHT NOT have expired since the connection happened immediately. Broker A is expected to continue since it is still the controller and the session has not expired. ii) Broker A has connection loss and connects back in which case it gets a SyncConnected event. Now the session MIGHT have expired. Broker A is expected to stop all the zk operations. The broker will only get SyncConnected if it connects and it is able to validate the session. If the session is invalid, then it gets an Expired notification. Note that if we are using SASL to authenticate, then we could be also getting an authenticated event.
          Hide
          junrao Jun Rao added a comment -

          Mayuresh Gharat, in step 4) ii), if the session expires, you won't get SyncConnected event. Also, we probably should consider fixing this jira and KAFKA-3038 in a unified way.

          Flavio Junqueira, do you think that you can outline the approach of addressing both issues together by potentially moving to the raw ZK async api?

          Show
          junrao Jun Rao added a comment - Mayuresh Gharat , in step 4) ii), if the session expires, you won't get SyncConnected event. Also, we probably should consider fixing this jira and KAFKA-3038 in a unified way. Flavio Junqueira , do you think that you can outline the approach of addressing both issues together by potentially moving to the raw ZK async api?
          Hide
          mgharat Mayuresh Gharat added a comment -

          However, the replica would be in the cache of the controller B, so would it be elected in this case? Would it be an actual problem if the B is demoted and another controller comes up?

          ---> That seems right, it should be in the cache of controller B and should be elected as leader. If however B goes down before this and a new controller is elected, it will read the data from zookeeper and might not be able to elect a new leader if uncleanLeaderElection is turned OFF.

          Show
          mgharat Mayuresh Gharat added a comment - However, the replica would be in the cache of the controller B, so would it be elected in this case? Would it be an actual problem if the B is demoted and another controller comes up? ---> That seems right, it should be in the cache of controller B and should be elected as leader. If however B goes down before this and a new controller is elected, it will read the data from zookeeper and might not be able to elect a new leader if uncleanLeaderElection is turned OFF.
          Hide
          mgharat Mayuresh Gharat added a comment -

          Hi Flavio Junqueira,

          Correct me if I am wrong :
          1) We need to use a multi-op that combines the update to the ISR and a znode check. The znode check verifies that the version of the controller leadership znode is still the same and if it passes, then the ISR data is updated.
          2) The race condition that Jun Rao mentioned still exist above in 1).
          3) To overcome this we somehow need to detect that the broker A who was the controller got a session expiration and should drop all the zk work its doing immediately.
          4) To do step 3), as Jun Rao suggested we have to detect the connection loss event. Now 2 things might happen :
          i) Broker A has connection loss and connects immediately in which case it gets a SyncConnected event. Now the session MIGHT NOT have expired since the connection happened immediately. Broker A is expected to continue since it is still the controller and the session has not expired.
          ii) Broker A has connection loss and connects back in which case it gets a SyncConnected event. Now the session MIGHT have expired. Broker A is expected to stop all the zk operations.
          The only difference between i) and ii) is SessionExpiration check.

          Show
          mgharat Mayuresh Gharat added a comment - Hi Flavio Junqueira , Correct me if I am wrong : 1) We need to use a multi-op that combines the update to the ISR and a znode check. The znode check verifies that the version of the controller leadership znode is still the same and if it passes, then the ISR data is updated. 2) The race condition that Jun Rao mentioned still exist above in 1). 3) To overcome this we somehow need to detect that the broker A who was the controller got a session expiration and should drop all the zk work its doing immediately. 4) To do step 3), as Jun Rao suggested we have to detect the connection loss event. Now 2 things might happen : i) Broker A has connection loss and connects immediately in which case it gets a SyncConnected event. Now the session MIGHT NOT have expired since the connection happened immediately. Broker A is expected to continue since it is still the controller and the session has not expired. ii) Broker A has connection loss and connects back in which case it gets a SyncConnected event. Now the session MIGHT have expired. Broker A is expected to stop all the zk operations. The only difference between i) and ii) is SessionExpiration check.
          Hide
          fpj Flavio Junqueira added a comment -

          Another thing to mention is that if ZkClient didn't create a new session transparently, then the update of broker A in step 3 would fail because the session has expired and the ZK ensemble wouldn't take a request from an expired session.

          Show
          fpj Flavio Junqueira added a comment - Another thing to mention is that if ZkClient didn't create a new session transparently, then the update of broker A in step 3 would fail because the session has expired and the ZK ensemble wouldn't take a request from an expired session.
          Hide
          fpj Flavio Junqueira added a comment -

          Mayuresh Gharat

          I was just thinking if we can modify the controller code to always check if it is the controller before it makes such changes to zookeeper.

          In principle, there is the race that Jun Rao mentioned, but I was thinking that one possibility would be use a multi-op that combines the update to the ISR and a znode check. The znode check verifies that the version of the controller leadership znode is still the same and if it passes, then the ISR data is updated. Using the scenario in the description to illustrate, when broker A tries to update the ISR state in ZK in step 3, the operation fails because the version of the controller leadership znode has changed.

          The solution of handling the connection loss event is typical, but we could consider adding a multi-op to be extra safe against these spurious writes.

          Show
          fpj Flavio Junqueira added a comment - Mayuresh Gharat I was just thinking if we can modify the controller code to always check if it is the controller before it makes such changes to zookeeper. In principle, there is the race that Jun Rao mentioned, but I was thinking that one possibility would be use a multi-op that combines the update to the ISR and a znode check. The znode check verifies that the version of the controller leadership znode is still the same and if it passes, then the ISR data is updated. Using the scenario in the description to illustrate, when broker A tries to update the ISR state in ZK in step 3, the operation fails because the version of the controller leadership znode has changed. The solution of handling the connection loss event is typical, but we could consider adding a multi-op to be extra safe against these spurious writes.
          Hide
          fpj Flavio Junqueira added a comment -

          Jun Rao

          Even if it exists, it has the same problem--the session can expire after the check. So, you pretty much have to rely on the raw ZK client api that handles ConnectionLossException while reading/writing to ZK.

          Perhaps you meant to say this, but you can also learn about a connection loss via the watcher you pass when creating a zookeeper object.

          Show
          fpj Flavio Junqueira added a comment - Jun Rao Even if it exists, it has the same problem--the session can expire after the check. So, you pretty much have to rely on the raw ZK client api that handles ConnectionLossException while reading/writing to ZK. Perhaps you meant to say this, but you can also learn about a connection loss via the watcher you pass when creating a zookeeper object.
          Hide
          fpj Flavio Junqueira added a comment -

          Jun Rao

          does controller A just resume from where it's left off? Or does it ignore all outstanding events and re-read all subscribed ZK paths (since there could be missing events between the connection loss event and the SyncConnected event)?

          I don't see a reason for ignoring outstanding events and re-reading zk state. If the session hasn't expired, then the broker is still the controller and I'd say it is safe to assume the no other controller work happened in parallel.

          ZkClient actually hides the ZK ConnectionLoss event and only informs the application when the ZK session expires. To pursue this, we will have to access ZK directly.

          I think further down you noted that ZkClient actually exposes the connection loss event, but does put a thread in the middle.

          Show
          fpj Flavio Junqueira added a comment - Jun Rao does controller A just resume from where it's left off? Or does it ignore all outstanding events and re-read all subscribed ZK paths (since there could be missing events between the connection loss event and the SyncConnected event)? I don't see a reason for ignoring outstanding events and re-reading zk state. If the session hasn't expired, then the broker is still the controller and I'd say it is safe to assume the no other controller work happened in parallel. ZkClient actually hides the ZK ConnectionLoss event and only informs the application when the ZK session expires. To pursue this, we will have to access ZK directly. I think further down you noted that ZkClient actually exposes the connection loss event, but does put a thread in the middle.
          Hide
          junrao Jun Rao added a comment -

          We will have a take closer look at the ZKClient implementation, but handleStateChanged() doesn't seem enough. When receiving a state change event, ZKClient just puts the event into a queue. A separate thread takes each item off the queue and then calls handleStateChanged(). So, between the time that a connection is lost and the time handleStateChanged() is called, a write may already be done in ZK on a newly created session.

          Show
          junrao Jun Rao added a comment - We will have a take closer look at the ZKClient implementation, but handleStateChanged() doesn't seem enough. When receiving a state change event, ZKClient just puts the event into a queue. A separate thread takes each item off the queue and then calls handleStateChanged(). So, between the time that a connection is lost and the time handleStateChanged() is called, a write may already be done in ZK on a newly created session.
          Hide
          mgharat Mayuresh Gharat added a comment -

          Hi Jun,

          I was talking about the SessionExpirationListener that implements the IZkStateListener in KafkaController that has the handleStateChanged() api. I was thinking if we can handle the statechange to Disconnected in that callback, to do the cleanup.

          Show
          mgharat Mayuresh Gharat added a comment - Hi Jun, I was talking about the SessionExpirationListener that implements the IZkStateListener in KafkaController that has the handleStateChanged() api. I was thinking if we can handle the statechange to Disconnected in that callback, to do the cleanup.
          Hide
          junrao Jun Rao added a comment -

          That api doesn't exist in SessionExpirationListener. Even if it exists, it has the same problem--the session can expire after the check. So, you pretty much have to rely on the raw ZK client api that handles ConnectionLossException while reading/writing to ZK.

          Show
          junrao Jun Rao added a comment - That api doesn't exist in SessionExpirationListener. Even if it exists, it has the same problem--the session can expire after the check. So, you pretty much have to rely on the raw ZK client api that handles ConnectionLossException while reading/writing to ZK.
          Hide
          mgharat Mayuresh Gharat added a comment -

          That's right.
          I am thinking if there is a way to check the ConnectionLoss using handleStateChanged() api on the SessionExpirationListener which can be use to drop all the current zk task that controller A was about to do.

          Show
          mgharat Mayuresh Gharat added a comment - That's right. I am thinking if there is a way to check the ConnectionLoss using handleStateChanged() api on the SessionExpirationListener which can be use to drop all the current zk task that controller A was about to do.
          Hide
          junrao Jun Rao added a comment -

          Mayuresh Gharat, in general, the approach of checking if still controller and then writing to ZK won't work since the controller can change immediately after the check.

          Show
          junrao Jun Rao added a comment - Mayuresh Gharat , in general, the approach of checking if still controller and then writing to ZK won't work since the controller can change immediately after the check.
          Hide
          mgharat Mayuresh Gharat added a comment -

          on a side note, I noticed a new issue in controlled shutdown that is kind of similar. I will open a new Jira for that.

          Show
          mgharat Mayuresh Gharat added a comment - on a side note, I noticed a new issue in controlled shutdown that is kind of similar. I will open a new Jira for that.
          Hide
          mgharat Mayuresh Gharat added a comment -

          Jun Rao, Flavio Junqueira that makes it more clear. I was just thinking if we can modify the controller code to always check if it is the controller before it makes such changes to zookeeper.
          Again there is a race condition, wherein Broker A's session timesOut at time T. Broker B becomes the controller at T+2. Broker A can still proceed with the changes to ZK between T and T+2.

          I had some questions on the zookeeper session expiry:

          1) If suppose a broker establishes a connection with zookeeper and has a ZookeeperSessionTimeout set to 10 min.The broker goes down/or is stuck and comes back up and connects to zookeeper after 5 min, will it connect on the same session?

          2) The session Expiry is only invoked on SessionTimeout and nothing else. Am I right?

          Show
          mgharat Mayuresh Gharat added a comment - Jun Rao , Flavio Junqueira that makes it more clear. I was just thinking if we can modify the controller code to always check if it is the controller before it makes such changes to zookeeper. Again there is a race condition, wherein Broker A's session timesOut at time T. Broker B becomes the controller at T+2. Broker A can still proceed with the changes to ZK between T and T+2. I had some questions on the zookeeper session expiry: 1) If suppose a broker establishes a connection with zookeeper and has a ZookeeperSessionTimeout set to 10 min.The broker goes down/or is stuck and comes back up and connects to zookeeper after 5 min, will it connect on the same session? 2) The session Expiry is only invoked on SessionTimeout and nothing else. Am I right?
          Hide
          junrao Jun Rao added a comment -

          Flavio Junqueira, thanks for the clarification. If broker A shrinks the partition ISR in ZK before step 2, then broker A's ZK session expires, then, broker sends the shrunk ISR to broker C, two things can happen. (1) C has already received requests from the new controller B. In this case, A's request will be rejected. However, since the new controller B is re-elected after broker A shrinks the ISR in ZK and the new controller read the latest ISR from ZK on initialization, B will send the latest ISR to broker C. (2) C hasn't received any request from the new controller B. In this case, A's request will be accepted. The new controller B will later send the same ISR to broker B, but that's fine. So, in either case, we are covered.

          The problem in the description is really caused by broker A changing ZK after its session expires. So, it seems the fix would be the following. If the controller (say A) hits a ZK ConnectionLoss event while reading/writing to ZK, it will pause the operation. Two possibilities can follow. In the case when the controller A's ZK session expires, it will just ignore all the outstanding ZK events. This guarantees that controller A can't touch ZK any more after a new controller is elected (which has to happen after controller A's SessionExpiration event). So, the new controller is guaranteed to read the latest ZK data, act on this, and send the latest info to the broker. This would avoid the issue in the description.

          In the second case, controller A will get a SyncConnected event. In this case, does controller A just resume from where it's left off? Or does it ignore all outstanding events and re-read all subscribed ZK paths (since there could be missing events between the connection loss event and the SyncConnected event)?

          Finally, ZkClient actually hides the ZK ConnectionLoss event and only informs the application when the ZK session expires. To pursue this, we will have to access ZK directly.

          Show
          junrao Jun Rao added a comment - Flavio Junqueira , thanks for the clarification. If broker A shrinks the partition ISR in ZK before step 2, then broker A's ZK session expires, then, broker sends the shrunk ISR to broker C, two things can happen. (1) C has already received requests from the new controller B. In this case, A's request will be rejected. However, since the new controller B is re-elected after broker A shrinks the ISR in ZK and the new controller read the latest ISR from ZK on initialization, B will send the latest ISR to broker C. (2) C hasn't received any request from the new controller B. In this case, A's request will be accepted. The new controller B will later send the same ISR to broker B, but that's fine. So, in either case, we are covered. The problem in the description is really caused by broker A changing ZK after its session expires. So, it seems the fix would be the following. If the controller (say A) hits a ZK ConnectionLoss event while reading/writing to ZK, it will pause the operation. Two possibilities can follow. In the case when the controller A's ZK session expires, it will just ignore all the outstanding ZK events. This guarantees that controller A can't touch ZK any more after a new controller is elected (which has to happen after controller A's SessionExpiration event). So, the new controller is guaranteed to read the latest ZK data, act on this, and send the latest info to the broker. This would avoid the issue in the description. In the second case, controller A will get a SyncConnected event. In this case, does controller A just resume from where it's left off? Or does it ignore all outstanding events and re-read all subscribed ZK paths (since there could be missing events between the connection loss event and the SyncConnected event)? Finally, ZkClient actually hides the ZK ConnectionLoss event and only informs the application when the ZK session expires. To pursue this, we will have to access ZK directly.
          Hide
          mgharat Mayuresh Gharat added a comment -

          For your question regarding :
          If broker A shrinks the partition ISR in ZK before step 2 but notifies C after step 2 as in the description, does C eventually learn the ISR stored in ZK?

          C will not know about ISR stored in ZK until next LeaderAndISR from the new controller.

          Show
          mgharat Mayuresh Gharat added a comment - For your question regarding : If broker A shrinks the partition ISR in ZK before step 2 but notifies C after step 2 as in the description, does C eventually learn the ISR stored in ZK? C will not know about ISR stored in ZK until next LeaderAndISR from the new controller.
          Hide
          fpj Flavio Junqueira added a comment -

          Hey Mayuresh Gharat, Best practice with ZK is to put the master (controller in this case) on hold upon a connection loss event and wait until the next event, which can be flagging a reconnection or that the session has expired. It should call controllerResignation upon a session expiration, and resume if it reconnects.

          But, we have to be careful because we can't really control the speed of messages, and even if A stops before B takes over in your example, we can't guarantee that some message from A will hit some broker somewhere late. The description of this jira says that C correctly discards an old message, and it should be like that, so this part looks fine this far. It is about the change in ZK happening at the wrong time.

          Show
          fpj Flavio Junqueira added a comment - Hey Mayuresh Gharat , Best practice with ZK is to put the master (controller in this case) on hold upon a connection loss event and wait until the next event, which can be flagging a reconnection or that the session has expired. It should call controllerResignation upon a session expiration, and resume if it reconnects. But, we have to be careful because we can't really control the speed of messages, and even if A stops before B takes over in your example, we can't guarantee that some message from A will hit some broker somewhere late. The description of this jira says that C correctly discards an old message, and it should be like that, so this part looks fine this far. It is about the change in ZK happening at the wrong time.
          Hide
          mgharat Mayuresh Gharat added a comment - - edited

          That's a very good point, I will verify if this can happen.
          Moreover, I think the behavior should be :
          1) Broker A was the controller.
          2) Broker A faces a session expiration, invokes the controllerResignation and clears all its caches and also stops all the ongoing controller work.
          3) Broker B becomes the controller and proceeds.

          what do you think?

          Show
          mgharat Mayuresh Gharat added a comment - - edited That's a very good point, I will verify if this can happen. Moreover, I think the behavior should be : 1) Broker A was the controller. 2) Broker A faces a session expiration, invokes the controllerResignation and clears all its caches and also stops all the ongoing controller work. 3) Broker B becomes the controller and proceeds. what do you think?
          Hide
          fpj Flavio Junqueira added a comment -

          One clarification, if broker A shrinks the partition ISR in ZK before step 2 but notifies C after step 2 as in the description, does C eventually learn the ISR stored in ZK? If it doesn't, then the observation about connection loss might not be sufficient to fix it or even matter at all in this case.

          Show
          fpj Flavio Junqueira added a comment - One clarification, if broker A shrinks the partition ISR in ZK before step 2 but notifies C after step 2 as in the description, does C eventually learn the ISR stored in ZK? If it doesn't, then the observation about connection loss might not be sufficient to fix it or even matter at all in this case.
          Hide
          fpj Flavio Junqueira added a comment -

          It sounds like this comment from Jun Rao is extending the description of the jira. It assumes that the replica that was removed from the ISR in step 1 eventually came back, but it coming back isn't reflected in the state of ZK. However, the replica would be in the cache of the controller B, so would it be elected in this case? Would it be an actual problem if the B is demoted and another controller comes up?

          Show
          fpj Flavio Junqueira added a comment - It sounds like this comment from Jun Rao is extending the description of the jira. It assumes that the replica that was removed from the ISR in step 1 eventually came back, but it coming back isn't reflected in the state of ZK. However, the replica would be in the cache of the controller B, so would it be elected in this case? Would it be an actual problem if the B is demoted and another controller comes up?
          Hide
          guozhang Guozhang Wang added a comment -

          In this case, the truth should be that the other replica is no longer in ISR, so ZK's data is correct while C's cache is not, right?

          Show
          guozhang Guozhang Wang added a comment - In this case, the truth should be that the other replica is no longer in ISR, so ZK's data is correct while C's cache is not, right?
          Hide
          fpj Flavio Junqueira added a comment - - edited

          Mayuresh Gharat the fact that A kept going with a session expired makes me think that A ignored the connection loss event and kept doing controller work. What we recommend for mastership with ZooKeeper is that the master stops doing master work upon receiving a connection loss event, and either resumes if it reconnects or drops mastership altogether if the session expires. Talking to Jun Rao about this, it sounds like the controller isn't processing the event that ZkClient is passing up.

          Let me give you some more context on session semantics. At 2/3 of the session expiration, if the client hasn't heard from the current server it is connected to, then it will start looking for another server and will notify the application via connection loss events. At that point, the recommendation is that the client (broker in this case) stops doing any master work until it learns more about the session.

          I also need to add that I haven't verified this in the code, so it is possible that it is something else causing the problem, but it sounds wrong that a controller with a session expired keeps going.

          Show
          fpj Flavio Junqueira added a comment - - edited Mayuresh Gharat the fact that A kept going with a session expired makes me think that A ignored the connection loss event and kept doing controller work. What we recommend for mastership with ZooKeeper is that the master stops doing master work upon receiving a connection loss event, and either resumes if it reconnects or drops mastership altogether if the session expires. Talking to Jun Rao about this, it sounds like the controller isn't processing the event that ZkClient is passing up. Let me give you some more context on session semantics. At 2/3 of the session expiration, if the client hasn't heard from the current server it is connected to, then it will start looking for another server and will notify the application via connection loss events. At that point, the recommendation is that the client (broker in this case) stops doing any master work until it learns more about the session. I also need to add that I haven't verified this in the code, so it is possible that it is something else causing the problem, but it sounds wrong that a controller with a session expired keeps going.
          Hide
          junrao Jun Rao added a comment -

          This can potentially lead to more serious issues. For example, after step 3, if the current leader fails, we may not be able to select the new leader since the other replica (which is actually in sync) is not in ISR in Zookeeper (e.g., when unclean leader election is turned off).

          Show
          junrao Jun Rao added a comment - This can potentially lead to more serious issues. For example, after step 3, if the current leader fails, we may not be able to select the new leader since the other replica (which is actually in sync) is not in ISR in Zookeeper (e.g., when unclean leader election is turned off).
          Hide
          mgharat Mayuresh Gharat added a comment -

          Flavio Junqueira can you shed some light on what you meant by misuse?
          Jun Rao if this is actually an issue, I would like to give a shot at it.

          Show
          mgharat Mayuresh Gharat added a comment - Flavio Junqueira can you shed some light on what you meant by misuse? Jun Rao if this is actually an issue, I would like to give a shot at it.
          Hide
          junrao Jun Rao added a comment -

          Flavio Junqueira suggested that reason that this could happen is the misuse of ZK in Kafka controller.

          Show
          junrao Jun Rao added a comment - Flavio Junqueira suggested that reason that this could happen is the misuse of ZK in Kafka controller.

            People

            • Assignee:
              onurkaraman Onur Karaman
              Reporter:
              junrao Jun Rao
            • Votes:
              8 Vote for this issue
              Watchers:
              24 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development