From 18ebadb2200c8be0ac512e46646039474dd6a3e0 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 7 Aug 2014 16:16:27 -0700 Subject: [PATCH 1/2] v1 --- .../scala/kafka/controller/KafkaController.scala | 37 ++++++++++++++++++++-- .../kafka/controller/PartitionStateMachine.scala | 30 +++++++++++++++--- 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a7a21df..8ab4a1b 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -169,6 +169,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) private val brokerRequestBatch = new ControllerBrokerRequestBatch(this) + + private val partitionReassignedListener = new PartitionsReassignedListener(this) + private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this) + newGauge( "ActiveControllerCount", new Gauge[Int] { @@ -333,19 +337,30 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * required to clean up internal controller data structures */ def onControllerResignation() { + // de-register listeners + deregisterReassignedPartitionsListener() + deregisterPreferredReplicaElectionListener() + + // shutdown delete topic manager if (deleteTopicManager != null) deleteTopicManager.shutdown() inLock(controllerContext.controllerLock) { + // de-register partition ISR listener for on-going partition reassignment task + deregisterReassignedPartitionsIsrChangeListeners() + // shutdown leader rebalance scheduler if (config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown() - + // shutdown partition state machine partitionStateMachine.shutdown() + // shutdown replica state machine replicaStateMachine.shutdown() + // shutdown controller channel manager if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null } + // reset controller context controllerContext.epoch=0 controllerContext.epochZkVersion=0 brokerState.newState(RunningAsBroker) @@ -870,11 +885,27 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt } private def registerReassignedPartitionsListener() = { - zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, new PartitionsReassignedListener(this)) + zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) + } + + private def deregisterReassignedPartitionsListener() = { + zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) } private def registerPreferredReplicaElectionListener() { - zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, new PreferredReplicaElectionListener(this)) + zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener) + } + + private def deregisterPreferredReplicaElectionListener() { + zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener) + } + + private def deregisterReassignedPartitionsIsrChangeListeners() { + controllerContext.partitionsBeingReassigned.foreach { + case (topicAndPartition, reassignedPartitionsContext) => + val zkPartitionPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition) + zkClient.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener) + } } private def readControllerEpochFromZookeeper() { diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 34c70b6..bfd58cb 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -51,9 +51,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext) this.logIdent = "[Partition state machine on Controller " + controllerId + "]: " private val stateChangeLogger = KafkaController.stateChangeLogger - private var topicChangeListener: TopicChangeListener = null - private var deleteTopicsListener: DeleteTopicsListener = null - private var addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty + private val topicChangeListener = new TopicChangeListener() + private val deleteTopicsListener = new DeleteTopicsListener() + private val addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty /** * Invoked on successful controller election. First registers a topic change listener since that triggers all @@ -76,11 +76,24 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { registerDeleteTopicListener() } + // de-register topic and partition change listeners + def deregisterListeners() { + deregisterTopicChangeListener() + addPartitionsListener.foreach { + case (topic, listener) => + zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), listener) + } + addPartitionsListener.clear() + if(controller.config.deleteTopicEnable) + deregisterDeleteTopicListener() + } + /** * Invoked on controller shutdown. */ def shutdown() { hasStarted.set(false) + deregisterListeners() partitionState.clear() } @@ -362,10 +375,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } private def registerTopicChangeListener() = { - topicChangeListener = new TopicChangeListener() zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener) } + private def deregisterTopicChangeListener() = { + zkClient.unsubscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener) + } + def registerPartitionChangeListener(topic: String) = { addPartitionsListener.put(topic, new AddPartitionsListener(topic)) zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic)) @@ -373,13 +389,17 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { def deregisterPartitionChangeListener(topic: String) = { zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic)) + addPartitionsListener.remove(topic) } private def registerDeleteTopicListener() = { - deleteTopicsListener = new DeleteTopicsListener() zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener) } + private def deregisterDeleteTopicListener() = { + zkClient.unsubscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener) + } + private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = { val topicAndPartition = TopicAndPartition(topic, partition) ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match { -- 1.7.12.4 From 159c093c7f16b45a082f3d03cd1fe112b25f7e15 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 8 Aug 2014 11:39:02 -0700 Subject: [PATCH 2/2] add de-register for topic change listener --- .../kafka/controller/PartitionStateMachine.scala | 18 ++++++++---- .../kafka/controller/ReplicaStateMachine.scala | 32 ++++++++++++++++++---- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index bfd58cb..e20b63a 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -45,15 +45,16 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private val controllerContext = controller.controllerContext private val controllerId = controller.config.brokerId private val zkClient = controllerContext.zkClient - var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty - val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) + private val partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty + private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) private val hasStarted = new AtomicBoolean(false) private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext) - this.logIdent = "[Partition state machine on Controller " + controllerId + "]: " - private val stateChangeLogger = KafkaController.stateChangeLogger private val topicChangeListener = new TopicChangeListener() private val deleteTopicsListener = new DeleteTopicsListener() private val addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty + private val stateChangeLogger = KafkaController.stateChangeLogger + + this.logIdent = "[Partition state machine on Controller " + controllerId + "]: " /** * Invoked on successful controller election. First registers a topic change listener since that triggers all @@ -63,9 +64,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { def startup() { // initialize partition state initializePartitionState() + // set started flag hasStarted.set(true) // try to move partitions to online state triggerOnlinePartitionStateChange() + info("Started partition state machine with initial state -> " + partitionState.toString()) } @@ -92,9 +95,14 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * Invoked on controller shutdown. */ def shutdown() { + // reset started flag hasStarted.set(false) - deregisterListeners() + // clear partition state partitionState.clear() + // de-register all ZK listeners + deregisterListeners() + + info("Stopped partition state machine") } /** diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index ad9c7c4..2592c53 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -48,12 +48,15 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { private val controllerContext = controller.controllerContext private val controllerId = controller.config.brokerId private val zkClient = controllerContext.zkClient - var replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty - val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) + private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty + private val brokerChangeListener = new BrokerChangeListener() + private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) private val hasStarted = new AtomicBoolean(false) - this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: " private val stateChangeLogger = KafkaController.stateChangeLogger + this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: " + + /** * Invoked on successful controller election. First registers a broker change listener since that triggers all * state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper. @@ -62,23 +65,38 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { def startup() { // initialize replica state initializeReplicaState() + // set started flag hasStarted.set(true) // move all Online replicas to Online handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica) + info("Started replica state machine with initial state -> " + replicaState.toString()) } - // register broker change listener + // register ZK listeners of the replica manager def registerListeners() { + // register broker change listener registerBrokerChangeListener() } + // de-register ZK listeners of the replica manager + def deregisterListeners() { + // de-register broker change listener + deregisterBrokerChangeListener() + } + /** * Invoked on controller shutdown. */ def shutdown() { + // reset started flag hasStarted.set(false) + // reset replica state replicaState.clear() + // de-register all ZK listeners + deregisterListeners() + + info("Stopped replica state machine") } /** @@ -295,7 +313,11 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } private def registerBrokerChangeListener() = { - zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, new BrokerChangeListener()) + zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener) + } + + private def deregisterBrokerChangeListener() = { + zkClient.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener) } /** -- 1.7.12.4