diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index f12ffc2..a8d0d52 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -38,6 +38,11 @@ import org.apache.log4j.Logger import scala.Some import kafka.common.TopicAndPartition import java.util.concurrent.locks.ReentrantLock +import scala.Some +import kafka.common.TopicAndPartition +import kafka.controller.ReassignedPartitionsContext +import kafka.controller.PartitionAndReplica +import kafka.controller.LeaderIsrAndControllerEpoch class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int) { @@ -335,15 +340,16 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg */ def onControllerResignation() { inLock(controllerContext.controllerLock) { - if (config.autoLeaderRebalanceEnable) - autoRebalanceScheduler.shutdown() - deleteTopicManager.shutdown() Utils.unregisterMBean(KafkaController.MBeanName) + deleteTopicManager.shutdown() partitionStateMachine.shutdown() replicaStateMachine.shutdown() + if(config.autoLeaderRebalanceEnable) + autoRebalanceScheduler.shutdown() if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null + info("Controller shutdown complete") } } } @@ -640,15 +646,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg def shutdown() = { inLock(controllerContext.controllerLock) { isRunning = false - partitionStateMachine.shutdown() - replicaStateMachine.shutdown() - if (config.autoLeaderRebalanceEnable) - autoRebalanceScheduler.shutdown() - if(controllerContext.controllerChannelManager != null) { - controllerContext.controllerChannelManager.shutdown() - controllerContext.controllerChannelManager = null - info("Controller shutdown complete") - } + onControllerResignation() } } diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 8262e10..721cbe7 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -22,6 +22,7 @@ import kafka.utils.Utils._ import collection.Set import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.api.{StopReplicaResponse, RequestOrResponse} +import java.util.concurrent.locks.ReentrantLock /** * This manages the state machine for topic deletion. @@ -71,9 +72,10 @@ class TopicDeletionManager(controller: KafkaController, val partitionStateMachine = controller.partitionStateMachine val replicaStateMachine = controller.replicaStateMachine var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted + val deleteLock = new ReentrantLock() var topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted) - val deleteTopicsCond = controllerContext.controllerLock.newCondition() + val deleteTopicsCond = deleteLock.newCondition() var deleteTopicStateChanged: Boolean = false var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable @@ -207,7 +209,9 @@ class TopicDeletionManager(controller: KafkaController, */ private def resumeTopicDeletionThread() { deleteTopicStateChanged = true - deleteTopicsCond.signal() + inLock(deleteLock) { + deleteTopicsCond.signal() + } } /** @@ -352,8 +356,11 @@ class TopicDeletionManager(controller: KafkaController, class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") { val zkClient = controllerContext.zkClient override def doWork() { - inLock(controllerContext.controllerLock) { + inLock(deleteLock) { awaitTopicDeletionNotification() + } + + inLock(controllerContext.controllerLock) { val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted if(topicsQueuedForDeletion.size > 0) info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(",")) diff --git a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala index ab95ce1..9585ec0 100644 --- a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala +++ b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala @@ -25,14 +25,14 @@ import scala.annotation.StaticAnnotation * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation * must respect */ -class threadsafe extends StaticAnnotation +//class threadsafe extends StaticAnnotation /** * Indicates that the annotated class is not threadsafe */ -class nonthreadsafe extends StaticAnnotation +//class nonthreadsafe extends StaticAnnotation /** * Indicates that the annotated class is immutable */ -class immutable extends StaticAnnotation +//class immutable extends StaticAnnotation diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 20fe93e..c7e058f 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -96,5 +96,25 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() server.shutdown() Utils.rm(server.config.logDirs) + verifyNonDaemonThreadsStatus + } + + @Test + def testCleanShutdownWithDeleteTopicEnabled() { + val newProps = TestUtils.createBrokerConfig(0, port) + newProps.setProperty("delete.topic.enable", "true") + val newConfig = new KafkaConfig(newProps) + var server = new KafkaServer(newConfig) + server.startup() + server.shutdown() + server.awaitShutdown() + Utils.rm(server.config.logDirs) + verifyNonDaemonThreadsStatus + } + + def verifyNonDaemonThreadsStatus() { + assertEquals(0, Thread.getAllStackTraces.keySet().toArray + .map(_.asInstanceOf[Thread]) + .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) } } diff --git a/gradle.properties b/gradle.properties index 4827769..236e243 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,7 +15,7 @@ group=org.apache.kafka version=0.8.1 -scalaVersion=2.8.0 +scalaVersion=2.9.2 task=build mavenUrl=