From 18ebadb2200c8be0ac512e46646039474dd6a3e0 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 7 Aug 2014 16:16:27 -0700 Subject: [PATCH] v1 --- .../scala/kafka/controller/KafkaController.scala | 37 ++++++++++++++++++++-- .../kafka/controller/PartitionStateMachine.scala | 30 +++++++++++++++--- 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a7a21df..8ab4a1b 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -169,6 +169,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) private val brokerRequestBatch = new ControllerBrokerRequestBatch(this) + + private val partitionReassignedListener = new PartitionsReassignedListener(this) + private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this) + newGauge( "ActiveControllerCount", new Gauge[Int] { @@ -333,19 +337,30 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * required to clean up internal controller data structures */ def onControllerResignation() { + // de-register listeners + deregisterReassignedPartitionsListener() + deregisterPreferredReplicaElectionListener() + + // shutdown delete topic manager if (deleteTopicManager != null) deleteTopicManager.shutdown() inLock(controllerContext.controllerLock) { + // de-register partition ISR listener for on-going partition reassignment task + deregisterReassignedPartitionsIsrChangeListeners() + // shutdown leader rebalance scheduler if (config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown() - + // shutdown partition state machine partitionStateMachine.shutdown() + // shutdown replica state machine replicaStateMachine.shutdown() + // shutdown controller channel manager if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null } + // reset controller context controllerContext.epoch=0 controllerContext.epochZkVersion=0 brokerState.newState(RunningAsBroker) @@ -870,11 +885,27 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt } private def registerReassignedPartitionsListener() = { - zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, new PartitionsReassignedListener(this)) + zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) + } + + private def deregisterReassignedPartitionsListener() = { + zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) } private def registerPreferredReplicaElectionListener() { - zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, new PreferredReplicaElectionListener(this)) + zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener) + } + + private def deregisterPreferredReplicaElectionListener() { + zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener) + } + + private def deregisterReassignedPartitionsIsrChangeListeners() { + controllerContext.partitionsBeingReassigned.foreach { + case (topicAndPartition, reassignedPartitionsContext) => + val zkPartitionPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition) + zkClient.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener) + } } private def readControllerEpochFromZookeeper() { diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 34c70b6..bfd58cb 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -51,9 +51,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext) this.logIdent = "[Partition state machine on Controller " + controllerId + "]: " private val stateChangeLogger = KafkaController.stateChangeLogger - private var topicChangeListener: TopicChangeListener = null - private var deleteTopicsListener: DeleteTopicsListener = null - private var addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty + private val topicChangeListener = new TopicChangeListener() + private val deleteTopicsListener = new DeleteTopicsListener() + private val addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty /** * Invoked on successful controller election. First registers a topic change listener since that triggers all @@ -76,11 +76,24 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { registerDeleteTopicListener() } + // de-register topic and partition change listeners + def deregisterListeners() { + deregisterTopicChangeListener() + addPartitionsListener.foreach { + case (topic, listener) => + zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), listener) + } + addPartitionsListener.clear() + if(controller.config.deleteTopicEnable) + deregisterDeleteTopicListener() + } + /** * Invoked on controller shutdown. */ def shutdown() { hasStarted.set(false) + deregisterListeners() partitionState.clear() } @@ -362,10 +375,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } private def registerTopicChangeListener() = { - topicChangeListener = new TopicChangeListener() zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener) } + private def deregisterTopicChangeListener() = { + zkClient.unsubscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener) + } + def registerPartitionChangeListener(topic: String) = { addPartitionsListener.put(topic, new AddPartitionsListener(topic)) zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic)) @@ -373,13 +389,17 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { def deregisterPartitionChangeListener(topic: String) = { zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic)) + addPartitionsListener.remove(topic) } private def registerDeleteTopicListener() = { - deleteTopicsListener = new DeleteTopicsListener() zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener) } + private def deregisterDeleteTopicListener() = { + zkClient.unsubscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener) + } + private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = { val topicAndPartition = TopicAndPartition(topic, partition) ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match { -- 1.7.12.4