diff --git a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala index 804b331..455b43e 100644 --- a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala @@ -50,8 +50,8 @@ object DeleteTopicCommand { var zkClient: ZkClient = null try { zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) - println("deletion succeeded!") + zkClient.createPersistent(ZkUtils.getDeleteTopicPath(topic), true) + println("topic [" + topic + "] deletion queued!") } catch { case e: Throwable => diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index bdc72ea..6788c2e 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -36,9 +36,9 @@ object TopicCommand { val opts = new TopicCommandOptions(args) // should have exactly one action - val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _) + val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) if(actions != 1) { - System.err.println("Command must include exactly one action: --list, --describe, --create or --alter") + System.err.println("Command must include exactly one action: --list, --describe, --create, --alter or --delete") opts.parser.printHelpOn(System.err) System.exit(1) } @@ -56,6 +56,8 @@ object TopicCommand { listTopics(zkClient, opts) else if(opts.options.has(opts.describeOpt)) describeTopic(zkClient, opts) + else if(opts.options.has(opts.deleteOpt)) + deleteTopic(zkClient, opts) } catch { case e: Throwable => println("Error while executing topic command " + e.getMessage) @@ -122,7 +124,14 @@ object TopicCommand { for(topic <- topics) println(topic) } - + + def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { + val topics = getTopics(zkClient, opts) + topics.foreach { topic => + ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) + } + } + def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false @@ -210,6 +219,7 @@ object TopicCommand { .ofType(classOf[String]) val listOpt = parser.accepts("list", "List all available topics.") val createOpt = parser.accepts("create", "Create a new topic.") + val deleteOpt = parser.accepts("delete", "Delete a topic") val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.") val describeOpt = parser.accepts("describe", "List details for the given topics.") val helpOpt = parser.accepts("help", "Print usage information.") diff --git a/system_test/testcase_to_run.json b/system_test/testcase_to_run.json index 8252860..982625c 100644 --- a/system_test/testcase_to_run.json +++ b/system_test/testcase_to_run.json @@ -1,5 +1,5 @@ { - "ReplicaBasicTest" : [ - "testcase_0001" + "DeleteTopicTest" : [ + "testcase_7001" ] }