diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 40c4c57..cd34482 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -80,12 +80,14 @@ class TopicDeletionManager(controller: KafkaController, var deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false) var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable + @volatile var isShuttingDown = false /** * Invoked at the end of new controller initiation */ def start() { if(isDeleteTopicEnabled) { + isShuttingDown = false deleteTopicsThread = new DeleteTopicsThread() deleteTopicStateChanged.set(true) deleteTopicsThread.start() @@ -97,9 +99,12 @@ class TopicDeletionManager(controller: KafkaController, */ def shutdown() { if(isDeleteTopicEnabled) { + isShuttingDown = true + resumeTopicDeletionThread() deleteTopicsThread.shutdown() topicsToBeDeleted.clear() topicsIneligibleForDeletion.clear() + isShuttingDown = false } } @@ -199,7 +204,7 @@ class TopicDeletionManager(controller: KafkaController, */ private def awaitTopicDeletionNotification() { inLock(deleteLock) { - while(!deleteTopicStateChanged.compareAndSet(true, false)) { + while(!isShuttingDown && !deleteTopicStateChanged.compareAndSet(true, false)) { info("Waiting for signal to start or continue topic deletion") deleteTopicsCond.await() } @@ -355,15 +360,20 @@ class TopicDeletionManager(controller: KafkaController, } } - class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") { + class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread", isInterruptible = false) { val zkClient = controllerContext.zkClient override def doWork() { awaitTopicDeletionNotification() + if(!isRunning.get) + return + inLock(controllerContext.controllerLock) { val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted - if(topicsQueuedForDeletion.size > 0) + + if(!topicsQueuedForDeletion.isEmpty) info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(",")) + topicsQueuedForDeletion.foreach { topic => // if all replicas are marked as deleted successfully, then topic deletion is done if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {