diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 94bbd33..a7a21df 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -169,8 +169,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) private val brokerRequestBatch = new ControllerBrokerRequestBatch(this) - registerControllerChangedListener() - newGauge( "ActiveControllerCount", new Gauge[Int] { @@ -298,6 +296,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt def onControllerFailover() { if(isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) + //read controller epoch from zk + readControllerEpochFromZookeeper() // increment the controller epoch incrementControllerEpoch(zkClient) // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks @@ -346,6 +346,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null } + controllerContext.epoch=0 + controllerContext.epochZkVersion=0 brokerState.newState(RunningAsBroker) } } @@ -875,8 +877,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, new PreferredReplicaElectionListener(this)) } - private def registerControllerChangedListener() { - zkClient.subscribeDataChanges(ZkUtils.ControllerEpochPath, new ControllerEpochListener(this)) + private def readControllerEpochFromZookeeper() { + // initialize the controller epoch and zk version by reading from zookeeper + if(ZkUtils.pathExists(controllerContext.zkClient, ZkUtils.ControllerEpochPath)) { + val epochData = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath) + controllerContext.epoch = epochData._1.toInt + controllerContext.epochZkVersion = epochData._2.getVersion + info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion)) + } } def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) { @@ -1275,43 +1283,6 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD } } -class ControllerEpochListener(controller: KafkaController) extends IZkDataListener with Logging { - this.logIdent = "[ControllerEpochListener on " + controller.config.brokerId + "]: " - val controllerContext = controller.controllerContext - readControllerEpochFromZookeeper() - - /** - * Invoked when a controller updates the epoch value - * @throws Exception On any error. - */ - @throws(classOf[Exception]) - def handleDataChange(dataPath: String, data: Object) { - debug("Controller epoch listener fired with new epoch " + data.toString) - inLock(controllerContext.controllerLock) { - // read the epoch path to get the zk version - readControllerEpochFromZookeeper() - } - } - - /** - * @throws Exception - * On any error. - */ - @throws(classOf[Exception]) - def handleDataDeleted(dataPath: String) { - } - - private def readControllerEpochFromZookeeper() { - // initialize the controller epoch and zk version by reading from zookeeper - if(ZkUtils.pathExists(controllerContext.zkClient, ZkUtils.ControllerEpochPath)) { - val epochData = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath) - controllerContext.epoch = epochData._1.toInt - controllerContext.epochZkVersion = epochData._2.getVersion - info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion)) - } - } -} - case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty, var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)