diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index f12ffc2..f1feb7b 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -38,6 +38,11 @@ import org.apache.log4j.Logger import scala.Some import kafka.common.TopicAndPartition import java.util.concurrent.locks.ReentrantLock +import scala.Some +import kafka.common.TopicAndPartition +import kafka.controller.ReassignedPartitionsContext +import kafka.controller.PartitionAndReplica +import kafka.controller.LeaderIsrAndControllerEpoch class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int) { @@ -334,16 +339,25 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * required to clean up internal controller data structures */ def onControllerResignation() { + shutdownComponents(false) + } + + def shutdownComponents(isBrokerShutdown: Boolean) { inLock(controllerContext.controllerLock) { - if (config.autoLeaderRebalanceEnable) - autoRebalanceScheduler.shutdown() + if(isBrokerShutdown) { + isRunning = false + } else { + Utils.unregisterMBean(KafkaController.MBeanName) + } deleteTopicManager.shutdown() - Utils.unregisterMBean(KafkaController.MBeanName) partitionStateMachine.shutdown() replicaStateMachine.shutdown() + if(config.autoLeaderRebalanceEnable) + autoRebalanceScheduler.shutdown() if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null + info("Controller shutdown complete") } } } @@ -638,18 +652,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * shuts down the controller channel manager, if one exists (i.e. if it was the current controller) */ def shutdown() = { - inLock(controllerContext.controllerLock) { - isRunning = false - partitionStateMachine.shutdown() - replicaStateMachine.shutdown() - if (config.autoLeaderRebalanceEnable) - autoRebalanceScheduler.shutdown() - if(controllerContext.controllerChannelManager != null) { - controllerContext.controllerChannelManager.shutdown() - controllerContext.controllerChannelManager = null - info("Controller shutdown complete") - } - } + shutdownComponents(true) } def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = { diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 8262e10..5485906 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -22,6 +22,7 @@ import kafka.utils.Utils._ import collection.Set import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.api.{StopReplicaResponse, RequestOrResponse} +import java.util.concurrent.locks.ReentrantLock /** * This manages the state machine for topic deletion. @@ -70,10 +71,11 @@ class TopicDeletionManager(controller: KafkaController, val controllerContext = controller.controllerContext val partitionStateMachine = controller.partitionStateMachine val replicaStateMachine = controller.replicaStateMachine + val deleteLock: ReentrantLock = new ReentrantLock() var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted var topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted) - val deleteTopicsCond = controllerContext.controllerLock.newCondition() + val deleteTopicsCond = deleteLock.newCondition() var deleteTopicStateChanged: Boolean = false var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable @@ -351,6 +353,7 @@ class TopicDeletionManager(controller: KafkaController, class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") { val zkClient = controllerContext.zkClient + override def doWork() { inLock(controllerContext.controllerLock) { awaitTopicDeletionNotification()