diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index d29e556..8ea533e 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -81,12 +81,14 @@ class TopicDeletionManager(controller: KafkaController, val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false) var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable + @volatile var isShuttingDown = false /** * Invoked at the end of new controller initiation */ def start() { if(isDeleteTopicEnabled) { + isShuttingDown = false deleteTopicsThread = new DeleteTopicsThread() deleteTopicStateChanged.set(true) deleteTopicsThread.start() @@ -98,10 +100,13 @@ class TopicDeletionManager(controller: KafkaController, */ def shutdown() { if(isDeleteTopicEnabled) { + isShuttingDown = true + resumeTopicDeletionThread() deleteTopicsThread.shutdown() topicsToBeDeleted.clear() partitionsToBeDeleted.clear() topicsIneligibleForDeletion.clear() + isShuttingDown = false } } @@ -201,12 +206,15 @@ class TopicDeletionManager(controller: KafkaController, * controllerLock should be acquired before invoking this API */ private def awaitTopicDeletionNotification() { + debug("Acquiring delete lock in await topic notification") inLock(deleteLock) { + debug("Acquired delete lock in await topic notification") while(!deleteTopicStateChanged.compareAndSet(true, false)) { info("Waiting for signal to start or continue topic deletion") deleteTopicsCond.await() } } + debug("Released delete lock in await topic notification") } /** @@ -360,15 +368,19 @@ class TopicDeletionManager(controller: KafkaController, } } - class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") { + class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread", isInterruptible = false) { val zkClient = controllerContext.zkClient override def doWork() { awaitTopicDeletionNotification() + val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted + if(!isRunning.get || topicsQueuedForDeletion.isEmpty) + return + + info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(",")) + debug("Acquiring controller lock in delete topics thread") inLock(controllerContext.controllerLock) { - val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted - if(topicsQueuedForDeletion.size > 0) - info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(",")) + debug("Acquired controller lock in delete topics thread") topicsQueuedForDeletion.foreach { topic => // if all replicas are marked as deleted successfully, then topic deletion is done if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) { @@ -404,6 +416,7 @@ class TopicDeletionManager(controller: KafkaController, } } } + debug("Released controller lock in delete topics thread") } } }