diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 6fef9df..686a0df 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -36,9 +36,9 @@ object TopicCommand { val opts = new TopicCommandOptions(args) // should have exactly one action - val actions = Seq(opts.createOpt, opts.deleteOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _) + val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _) if(actions != 1) { - System.err.println("Command must include exactly one action: --list, --describe, --create, --delete, or --alter") + System.err.println("Command must include exactly one action: --list, --describe, --create or --alter") opts.parser.printHelpOn(System.err) System.exit(1) } @@ -52,8 +52,6 @@ object TopicCommand { createTopic(zkClient, opts) else if(opts.options.has(opts.alterOpt)) alterTopic(zkClient, opts) - else if(opts.options.has(opts.deleteOpt)) - deleteTopic(zkClient, opts) else if(opts.options.has(opts.listOpt)) listTopics(zkClient, opts) else if(opts.options.has(opts.describeOpt)) @@ -119,14 +117,6 @@ object TopicCommand { } } - def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { - val topics = getTopics(zkClient, opts) - topics.foreach { topic => - AdminUtils.deleteTopic(zkClient, topic) - println("Topic \"%s\" queued for deletion.".format(topic)) - } - } - def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) for(topic <- topics) @@ -221,10 +211,9 @@ object TopicCommand { val listOpt = parser.accepts("list", "List all available topics.") val createOpt = parser.accepts("create", "Create a new topic.") val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.") - val deleteOpt = parser.accepts("delete", "Delete the topic.") val describeOpt = parser.accepts("describe", "List details for the given topics.") val helpOpt = parser.accepts("help", "Print usage information.") - val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe. Can also accept a regular " + + val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " + "expression except for --create option") .withRequiredArg .describedAs("topic") @@ -263,7 +252,7 @@ object TopicCommand { val options = parser.parse(args : _*) - val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, deleteOpt, describeOpt, listOpt) + val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt) def checkArgs() { // check required args diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index c69077e..c3e8d05 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -72,7 +72,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // register topic and partition change listeners def registerListeners() { registerTopicChangeListener() - registerDeleteTopicListener() + if(controller.config.deleteTopicEnable) + registerDeleteTopicListener() } /** diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 58f1c42..6a407ad 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -76,23 +76,28 @@ class TopicDeletionManager(controller: KafkaController, val deleteTopicsCond = controllerContext.controllerLock.newCondition() var deleteTopicStateChanged: Boolean = false var deleteTopicsThread: DeleteTopicsThread = null + val isDeleteTopicEnabled = controller.config.deleteTopicEnable /** * Invoked at the end of new controller initiation */ def start() { - deleteTopicsThread = new DeleteTopicsThread() - deleteTopicStateChanged = true - deleteTopicsThread.start() + if(isDeleteTopicEnabled) { + deleteTopicsThread = new DeleteTopicsThread() + deleteTopicStateChanged = true + deleteTopicsThread.start() + } } /** * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared */ def shutdown() { - deleteTopicsThread.shutdown() - topicsToBeDeleted.clear() - topicsIneligibleForDeletion.clear() + if(isDeleteTopicEnabled) { + deleteTopicsThread.shutdown() + topicsToBeDeleted.clear() + topicsIneligibleForDeletion.clear() + } } /** @@ -102,8 +107,10 @@ class TopicDeletionManager(controller: KafkaController, * @param topics Topics that should be deleted */ def enqueueTopicsForDeletion(topics: Set[String]) { - topicsToBeDeleted ++= topics - resumeTopicDeletionThread() + if(isDeleteTopicEnabled) { + topicsToBeDeleted ++= topics + resumeTopicDeletionThread() + } } /** @@ -115,11 +122,13 @@ class TopicDeletionManager(controller: KafkaController, * @param topics Topics for which deletion can be resumed */ def resumeDeletionForTopics(topics: Set[String] = Set.empty) { + if(isDeleteTopicEnabled) { val topicsToResumeDeletion = topics & topicsToBeDeleted if(topicsToResumeDeletion.size > 0) { topicsIneligibleForDeletion --= topicsToResumeDeletion resumeTopicDeletionThread() } + } } /** @@ -131,14 +140,16 @@ class TopicDeletionManager(controller: KafkaController, * @param replicas Replicas for which deletion has failed */ def failReplicaDeletion(replicas: Set[PartitionAndReplica]) { - val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic)) - if(replicasThatFailedToDelete.size > 0) { - val topics = replicasThatFailedToDelete.map(_.topic) - debug("Deletion failed for replicas %s. Halting deletion for topics %s" - .format(replicasThatFailedToDelete.mkString(","), topics)) - controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible) - markTopicIneligibleForDeletion(topics) - resumeTopicDeletionThread() + if(isDeleteTopicEnabled) { + val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic)) + if(replicasThatFailedToDelete.size > 0) { + val topics = replicasThatFailedToDelete.map(_.topic) + debug("Deletion failed for replicas %s. Halting deletion for topics %s" + .format(replicasThatFailedToDelete.mkString(","), topics)) + controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible) + markTopicIneligibleForDeletion(topics) + resumeTopicDeletionThread() + } } } @@ -150,22 +161,33 @@ class TopicDeletionManager(controller: KafkaController, * @param topics Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion */ def markTopicIneligibleForDeletion(topics: Set[String]) { - val newTopicsToHaltDeletion = topicsToBeDeleted & topics - topicsIneligibleForDeletion ++= newTopicsToHaltDeletion - if(newTopicsToHaltDeletion.size > 0) - info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(","))) + if(isDeleteTopicEnabled) { + val newTopicsToHaltDeletion = topicsToBeDeleted & topics + topicsIneligibleForDeletion ++= newTopicsToHaltDeletion + if(newTopicsToHaltDeletion.size > 0) + info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(","))) + } } def isTopicIneligibleForDeletion(topic: String): Boolean = { - topicsIneligibleForDeletion.contains(topic) + if(isDeleteTopicEnabled) { + topicsIneligibleForDeletion.contains(topic) + } else + true } def isTopicDeletionInProgress(topic: String): Boolean = { - controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic) + if(isDeleteTopicEnabled) { + controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic) + } else + false } def isTopicQueuedUpForDeletion(topic: String): Boolean = { - topicsToBeDeleted.contains(topic) + if(isDeleteTopicEnabled) { + topicsToBeDeleted.contains(topic) + } else + false } /** diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 08de0ef..b0506d4 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -287,4 +287,8 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /** The required acks before the commit can be accepted. In general, the default (-1) should not be overridden. */ val offsetCommitRequiredAcks = props.getShortInRange("offsets.commit.required.acks", OffsetManagerConfig.DefaultOffsetCommitRequiredAcks, (-1, offsetsTopicReplicationFactor)) + + /* Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off */ + val deleteTopicEnable = props.getBoolean("delete.topic.enable", false) + } diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 6db76a5..e704290 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -212,8 +212,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" val topicAndPartition = TopicAndPartition(topic, 0) + val brokerConfigs = TestUtils.createBrokerConfigs(4) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers - val allServers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) + val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) @@ -252,8 +254,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" val topicAndPartition = TopicAndPartition(topic, 0) + val brokerConfigs = TestUtils.createBrokerConfigs(4) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers - val allServers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) + val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) @@ -421,8 +425,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) + val brokerConfigs = TestUtils.createBrokerConfigs(3) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers - val servers = TestUtils.createBrokerConfigs(3).map(b => TestUtils.createServer(new KafkaConfig(b))) + val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // wait until replica log is created on every broker