diff -Naur main//scala/kafka/controller/KafkaController.scala kafka-0.9.0.0-src/core/src/main//scala/kafka/controller/KafkaController.scala --- main//scala/kafka/controller/KafkaController.scala 2016-06-16 21:14:09.000000000 +0800 +++ kafka-0.9.0.0-src/core/src/main//scala/kafka/controller/KafkaController.scala 2016-02-06 10:28:43.000000000 +0800 @@ -161,7 +161,7 @@ val partitionStateMachine = new PartitionStateMachine(this) val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, - onControllerBeforeResignation, onControllerResignation, config.brokerId) + onControllerResignation, config.brokerId) // have a separate scheduler for the controller to be able to start and stop independently of the // kafka server private val autoRebalanceScheduler = new KafkaScheduler(1) @@ -349,9 +349,13 @@ else info("Controller has been shut down, aborting startup/failover") } - - def onControllerBeforeResignation() { - debug("Before Controller resigning, shuting down some threads, broker id %d".format(config.brokerId)) + + /** + * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is + * required to clean up internal controller data structures + */ + def onControllerResignation() { + debug("Controller resigning, broker id %d".format(config.brokerId)) // de-register listeners deregisterIsrChangeNotificationListener() deregisterReassignedPartitionsListener() @@ -364,14 +368,7 @@ // shutdown leader rebalance scheduler if (config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown() - } - /** - * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is - * required to clean up internal controller data structures - */ - def onControllerResignation() { - debug("Controller resigning, broker id %d".format(config.brokerId)) inLock(controllerContext.controllerLock) { // de-register partition ISR listener for on-going partition reassignment task deregisterReassignedPartitionsIsrChangeListeners() @@ -695,7 +692,6 @@ inLock(controllerContext.controllerLock) { isRunning = false } - onControllerBeforeResignation() onControllerResignation() } @@ -1171,7 +1167,6 @@ @throws(classOf[Exception]) def handleNewSession() { info("ZK expired; shut down all controller components and try to re-elect") - onControllerBeforeResignation() inLock(controllerContext.controllerLock) { onControllerResignation() controllerElector.elect diff -Naur main//scala/kafka/server/ZookeeperLeaderElector.scala kafka-0.9.0.0-src/core/src/main//scala/kafka/server/ZookeeperLeaderElector.scala --- main//scala/kafka/server/ZookeeperLeaderElector.scala 2016-06-16 21:14:14.000000000 +0800 +++ kafka-0.9.0.0-src/core/src/main//scala/kafka/server/ZookeeperLeaderElector.scala 2016-02-06 10:28:43.000000000 +0800 @@ -34,7 +34,6 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: String, onBecomingLeader: () => Unit, - onBeforeResigningAsLeader: () => Unit, onResigningAsLeader: () => Unit, brokerId: Int) extends LeaderElector with Logging { @@ -122,20 +121,13 @@ */ @throws(classOf[Exception]) def handleDataChange(dataPath: String, data: Object) { - var isLeaderChange = false inLock(controllerContext.controllerLock) { val amILeaderBeforeDataChange = amILeader leaderId = KafkaController.parseControllerId(data.toString) info("New leader is %d".format(leaderId)) - var isLeaderChange = amILeaderBeforeDataChange && !amILeader - } - - // The old leader needs to resign leadership if it is no longer the leader - if (isLeaderChange) { - onBeforeResigningAsLeader() - inLock(controllerContext.controllerLock) { + // The old leader needs to resign leadership if it is no longer the leader + if (amILeaderBeforeDataChange && !amILeader) onResigningAsLeader() - } } } @@ -146,18 +138,10 @@ */ @throws(classOf[Exception]) def handleDataDeleted(dataPath: String) { - debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader" - .format(brokerId, dataPath)) - var isLeader = false inLock(controllerContext.controllerLock) { - isLeader = amILeader - } - - if(isLeader) - onBeforeResigningAsLeader() - - inLock(controllerContext.controllerLock) { - if(isLeader) + debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader" + .format(brokerId, dataPath)) + if(amILeader) onResigningAsLeader() elect }