Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9769

ReplicaManager Partition.makeFollower Increases LeaderEpoch when ZooKeeper disconnect occurs




      The ZooKeeper Session once expired and got disconnected and the broker received the 1st LeaderAndIsr request simultaneously. As the broker was processing the 1st LeaderAndIsr Request, the ZooKeeper session has not been reestablished just yet.

      Within the makeFollowers method, partition.getOrCreateReplica is called before the fetcher begins. partition.getOrCreateReplica needs to fetch information from ZooKeeper but an exception is thrown when calling the ZooKeeper client because the session is invalid, rendering the fetcher start to be skipped.


      In Partition class's getOrCreateReplica method calls AdminZkClient's fetchEntityConfig(..) which throws an exception if the ZooKeeper session is invalid. 


      val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)


      When this occurs, the leader epoch should not have been incremented due to ZooKeeper being invalid because once the second LeaderAndIsr request comes in, the leader epoch could be the same between the brokers. 

      Few options I can think of for a fix. I think third route could be feasible:

      1 - Make LeaderEpoch update and fetch update atomic.

      2 - Wait until all individual partitions are successful without problems then process fetch.

      3 - Catch the ZooKeeper exception in the caller code block (ReplicaManager.makeFollowers) and simply do not touch the remaining partitions to ensure that the batch of successful partitions up to that point are updated and processed (fetch).

      4 - Or make LeaderAndIsr request never arrive at the broker in case of ZooKeeper disconnect, then that would be safe because it is already possible for some replicas to receive the LeaderAndIsr later than the others. However, in that case, the code need to make sure the controller will retry.


       else if (requestLeaderEpoch > currentLeaderEpoch) {
       // If the leader epoch is valid record the epoch of the controller that made the leadership decision. 
      // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path 
      if (stateInfo.basePartitionState.replicas.contains(localBrokerId))       
      partitionState.put(partition, stateInfo)
      def getOrCreateReplica(replicaId: Int, isNew: Boolean = false): Replica = {          allReplicasMap.getAndMaybePut(replicaId, { 
      if (isReplicaLocal(replicaId)) {
      val adminZkClient = new AdminZkClient(zkClient) val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)




        Issue Links



              andrewchoi5 Andrew Choi
              andrewchoi5 Andrew Choi
              0 Vote for this issue
              3 Start watching this issue