Description
In KafkaController, we access ZK in the following places. Those accesses are not necessary since we can read from the cache in the controller.
In onBrokerFailure(deadBrokers: Seq[Int]),
replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica)
}
In onBrokerStartup(newBrokers: Seq[Int])
replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), OnlineReplica)
}
In shutdownBroker(),
getPartitionsAssignedToBroker(zkClient, controllerContext.allTopics.toSeq, id).map {
}