diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 4c319ab..0db634a 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -232,24 +232,27 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg def onControllerFailover() { if(isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) - // increment the controller epoch - incrementControllerEpoch(zkClient) - // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks - registerReassignedPartitionsListener() - registerPreferredReplicaElectionListener() - partitionStateMachine.registerListeners() - replicaStateMachine.registerListeners() - initializeControllerContext() - replicaStateMachine.startup() - partitionStateMachine.startup() - // register the partition change listeners for all existing topics on failover - controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) - Utils.registerMBean(this, KafkaController.MBeanName) - info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) - initializeAndMaybeTriggerPartitionReassignment() - initializeAndMaybeTriggerPreferredReplicaElection() - /* send partition leadership info to all live brokers */ - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + // This process also has to be synchronized since it triggers partition/replica state change + controllerContext.controllerLock synchronized { + // increment the controller epoch + incrementControllerEpoch(zkClient) + // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks + registerReassignedPartitionsListener() + registerPreferredReplicaElectionListener() + partitionStateMachine.registerListeners() + replicaStateMachine.registerListeners() + initializeControllerContext() + replicaStateMachine.startup() + partitionStateMachine.startup() + // register the partition change listeners for all existing topics on failover + controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) + Utils.registerMBean(this, KafkaController.MBeanName) + info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) + initializeAndMaybeTriggerPartitionReassignment() + initializeAndMaybeTriggerPreferredReplicaElection() + /* send partition leadership info to all live brokers */ + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + } } else info("Controller has been shut down, aborting startup/failover")