diff --git a/build.gradle b/build.gradle index 3db5f67..7531496 100644 --- a/build.gradle +++ b/build.gradle @@ -147,6 +147,7 @@ project(':core') { compile 'org.apache.zookeeper:zookeeper:3.3.4' compile 'com.101tec:zkclient:0.3' compile 'com.yammer.metrics:metrics-core:2.2.0' + compile 'com.yammer.metrics:metrics-annotation:2.2.0' compile 'net.sf.jopt-simple:jopt-simple:3.2' compile 'org.xerial.snappy:snappy-java:1.0.5' diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index f12ffc2..2f8190a 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -38,6 +38,11 @@ import org.apache.log4j.Logger import scala.Some import kafka.common.TopicAndPartition import java.util.concurrent.locks.ReentrantLock +import scala.Some +import kafka.common.TopicAndPartition +import kafka.controller.ReassignedPartitionsContext +import kafka.controller.PartitionAndReplica +import kafka.controller.LeaderIsrAndControllerEpoch class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int) { @@ -334,18 +339,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * required to clean up internal controller data structures */ def onControllerResignation() { - inLock(controllerContext.controllerLock) { - if (config.autoLeaderRebalanceEnable) - autoRebalanceScheduler.shutdown() - deleteTopicManager.shutdown() - Utils.unregisterMBean(KafkaController.MBeanName) - partitionStateMachine.shutdown() - replicaStateMachine.shutdown() - if(controllerContext.controllerChannelManager != null) { - controllerContext.controllerChannelManager.shutdown() - controllerContext.controllerChannelManager = null - } - } + shutdown(false) } /** @@ -637,12 +631,17 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * it shuts down the partition and replica state machines. If not, those are a no-op. In addition to that, it also * shuts down the controller channel manager, if one exists (i.e. if it was the current controller) */ - def shutdown() = { + def shutdown(isBrokerShutdown : Boolean = true) = { inLock(controllerContext.controllerLock) { - isRunning = false + if(isBrokerShutdown) { + isRunning = false + } else { + Utils.unregisterMBean(KafkaController.MBeanName) + } + deleteTopicManager.shutdown() partitionStateMachine.shutdown() replicaStateMachine.shutdown() - if (config.autoLeaderRebalanceEnable) + if(config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown() if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 8262e10..096bcc7 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -22,6 +22,8 @@ import kafka.utils.Utils._ import collection.Set import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.api.{StopReplicaResponse, RequestOrResponse} +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock /** * This manages the state machine for topic deletion. @@ -70,10 +72,11 @@ class TopicDeletionManager(controller: KafkaController, val controllerContext = controller.controllerContext val partitionStateMachine = controller.partitionStateMachine val replicaStateMachine = controller.replicaStateMachine + val deleteLock: ReentrantLock = new ReentrantLock() var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted var topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted) - val deleteTopicsCond = controllerContext.controllerLock.newCondition() + val deleteTopicsCond = deleteLock.newCondition() var deleteTopicStateChanged: Boolean = false var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable @@ -351,6 +354,7 @@ class TopicDeletionManager(controller: KafkaController, class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") { val zkClient = controllerContext.zkClient + override def doWork() { inLock(controllerContext.controllerLock) { awaitTopicDeletionNotification()