diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 8af48ab..4e56d35 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -169,8 +169,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) private val brokerRequestBatch = new ControllerBrokerRequestBatch(this) - registerControllerChangedListener() - + readControllerEpochFromZookeeper() newGauge( "ActiveControllerCount", new Gauge[Int] { @@ -875,8 +874,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, new PreferredReplicaElectionListener(this)) } - private def registerControllerChangedListener() { - zkClient.subscribeDataChanges(ZkUtils.ControllerEpochPath, new ControllerEpochListener(this)) + private def readControllerEpochFromZookeeper() { + // initialize the controller epoch and zk version by reading from zookeeper + if(ZkUtils.pathExists(controllerContext.zkClient, ZkUtils.ControllerEpochPath)) { + val epochData = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath) + controllerContext.epoch = epochData._1.toInt + controllerContext.epochZkVersion = epochData._2.getVersion + info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion)) + } } def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) { @@ -1280,43 +1285,6 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD } } -class ControllerEpochListener(controller: KafkaController) extends IZkDataListener with Logging { - this.logIdent = "[ControllerEpochListener on " + controller.config.brokerId + "]: " - val controllerContext = controller.controllerContext - readControllerEpochFromZookeeper() - - /** - * Invoked when a controller updates the epoch value - * @throws Exception On any error. - */ - @throws(classOf[Exception]) - def handleDataChange(dataPath: String, data: Object) { - debug("Controller epoch listener fired with new epoch " + data.toString) - inLock(controllerContext.controllerLock) { - // read the epoch path to get the zk version - readControllerEpochFromZookeeper() - } - } - - /** - * @throws Exception - * On any error. - */ - @throws(classOf[Exception]) - def handleDataDeleted(dataPath: String) { - } - - private def readControllerEpochFromZookeeper() { - // initialize the controller epoch and zk version by reading from zookeeper - if(ZkUtils.pathExists(controllerContext.zkClient, ZkUtils.ControllerEpochPath)) { - val epochData = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath) - controllerContext.epoch = epochData._1.toInt - controllerContext.epochZkVersion = epochData._2.getVersion - info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion)) - } - } -} - case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty, var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)