diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index f12ffc2..f1feb7b 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -38,6 +38,11 @@ import org.apache.log4j.Logger import scala.Some import kafka.common.TopicAndPartition import java.util.concurrent.locks.ReentrantLock +import scala.Some +import kafka.common.TopicAndPartition +import kafka.controller.ReassignedPartitionsContext +import kafka.controller.PartitionAndReplica +import kafka.controller.LeaderIsrAndControllerEpoch class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int) { @@ -334,16 +339,25 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * required to clean up internal controller data structures */ def onControllerResignation() { + shutdownComponents(false) + } + + def shutdownComponents(isBrokerShutdown: Boolean) { inLock(controllerContext.controllerLock) { - if (config.autoLeaderRebalanceEnable) - autoRebalanceScheduler.shutdown() + if(isBrokerShutdown) { + isRunning = false + } else { + Utils.unregisterMBean(KafkaController.MBeanName) + } deleteTopicManager.shutdown() - Utils.unregisterMBean(KafkaController.MBeanName) partitionStateMachine.shutdown() replicaStateMachine.shutdown() + if(config.autoLeaderRebalanceEnable) + autoRebalanceScheduler.shutdown() if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null + info("Controller shutdown complete") } } } @@ -638,18 +652,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * shuts down the controller channel manager, if one exists (i.e. if it was the current controller) */ def shutdown() = { - inLock(controllerContext.controllerLock) { - isRunning = false - partitionStateMachine.shutdown() - replicaStateMachine.shutdown() - if (config.autoLeaderRebalanceEnable) - autoRebalanceScheduler.shutdown() - if(controllerContext.controllerChannelManager != null) { - controllerContext.controllerChannelManager.shutdown() - controllerContext.controllerChannelManager = null - info("Controller shutdown complete") - } - } + shutdownComponents(true) } def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = { diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 8262e10..b54dc39 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -22,6 +22,7 @@ import kafka.utils.Utils._ import collection.Set import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.api.{StopReplicaResponse, RequestOrResponse} +import java.util.concurrent.locks.ReentrantLock /** * This manages the state machine for topic deletion. @@ -70,10 +71,11 @@ class TopicDeletionManager(controller: KafkaController, val controllerContext = controller.controllerContext val partitionStateMachine = controller.partitionStateMachine val replicaStateMachine = controller.replicaStateMachine + val deleteLock: ReentrantLock = new ReentrantLock() var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted var topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted) - val deleteTopicsCond = controllerContext.controllerLock.newCondition() + val deleteTopicsCond = deleteLock.newCondition() var deleteTopicStateChanged: Boolean = false var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable @@ -207,7 +209,9 @@ class TopicDeletionManager(controller: KafkaController, */ private def resumeTopicDeletionThread() { deleteTopicStateChanged = true - deleteTopicsCond.signal() + inLock(deleteLock) { + deleteTopicsCond.signal() + } } /** @@ -351,6 +355,7 @@ class TopicDeletionManager(controller: KafkaController, class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") { val zkClient = controllerContext.zkClient + override def doWork() { inLock(controllerContext.controllerLock) { awaitTopicDeletionNotification() diff --git a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala index ab95ce1..9585ec0 100644 --- a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala +++ b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala @@ -25,14 +25,14 @@ import scala.annotation.StaticAnnotation * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation * must respect */ -class threadsafe extends StaticAnnotation +//class threadsafe extends StaticAnnotation /** * Indicates that the annotated class is not threadsafe */ -class nonthreadsafe extends StaticAnnotation +//class nonthreadsafe extends StaticAnnotation /** * Indicates that the annotated class is immutable */ -class immutable extends StaticAnnotation +//class immutable extends StaticAnnotation