diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 965d0e5..2fcc36d 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -110,8 +110,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) private val partitionStateMachine = new PartitionStateMachine(this) private val replicaStateMachine = new ReplicaStateMachine(this) - private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, - config.brokerId) + private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, + onControllerFailover, onControllerResignation, config.brokerId) val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) @@ -256,6 +256,22 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } /** + * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is + * required to clean up internal controller data structures + */ + def onControllerResignation() { + controllerContext.controllerLock synchronized { + Utils.unregisterMBean(KafkaController.MBeanName) + partitionStateMachine.shutdown() + replicaStateMachine.shutdown() + if(controllerContext.controllerChannelManager != null) { + controllerContext.controllerChannelManager.shutdown() + controllerContext.controllerChannelManager = null + } + } + } + + /** * Returns true if this broker is the current controller. */ def isActive(): Boolean = { @@ -894,16 +910,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg @throws(classOf[Exception]) def handleNewSession() { info("ZK expired; shut down all controller components and try to re-elect") - controllerContext.controllerLock synchronized { - Utils.unregisterMBean(KafkaController.MBeanName) - partitionStateMachine.shutdown() - replicaStateMachine.shutdown() - if(controllerContext.controllerChannelManager != null) { - controllerContext.controllerChannelManager.shutdown() - controllerContext.controllerChannelManager = null - } - controllerElector.elect - } + onControllerResignation() + controllerElector.elect } } } diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index cc6f1eb..b189619 100644 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -30,7 +30,10 @@ import kafka.common.KafkaException * leader is dead, this class will handle automatic re-election and if it succeeds, it invokes the leader state change * callback */ -class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: String, onBecomingLeader: () => Unit, +class ZookeeperLeaderElector(controllerContext: ControllerContext, + electionPath: String, + onBecomingLeader: () => Unit, + onResigningAsLeader: () => Unit, brokerId: Int) extends LeaderElector with Logging { var leaderId = -1 @@ -58,23 +61,22 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: info(brokerId + " successfully elected as leader") leaderId = brokerId onBecomingLeader() - } catch { - case e: ZkNodeExistsException => - // If someone else has written the path, then - leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match { - case Some(controller) => KafkaController.parseControllerId(controller) - case None => { - warn("A leader has been elected but just resigned, this will result in another round of election") - -1 - } + } catch { + case e: ZkNodeExistsException => + // If someone else has written the path, then + leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match { + case Some(controller) => KafkaController.parseControllerId(controller) + case None => { + warn("A leader has been elected but just resigned, this will result in another round of election") + -1 } - if (leaderId != -1) - debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) - case e2: Throwable => - error("Error while electing or becoming leader on broker %d".format(brokerId), e2) - leaderId = -1 + } + if (leaderId != -1) + debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) + case e2: Throwable => + error("Error while electing or becoming leader on broker %d".format(brokerId), e2) + resign() } - amILeader } @@ -116,6 +118,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: controllerContext.controllerLock synchronized { debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader" .format(brokerId, dataPath)) + if(amILeader) + onResigningAsLeader() elect } }