diff --git a/config/server.properties b/config/server.properties index c9e923a..121b637 100644 --- a/config/server.properties +++ b/config/server.properties @@ -115,3 +115,4 @@ zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 +default.replication.factor=1 diff --git a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala deleted file mode 100644 index 804b331..0000000 --- a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.admin - -import joptsimple.OptionParser -import org.I0Itec.zkclient.ZkClient -import kafka.utils.{Utils, ZKStringSerializer, ZkUtils} - -object DeleteTopicCommand { - - def main(args: Array[String]): Unit = { - val parser = new OptionParser - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be deleted.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - - val options = parser.parse(args : _*) - - for(arg <- List(topicOpt, zkConnectOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - - val topic = options.valueOf(topicOpt) - val zkConnect = options.valueOf(zkConnectOpt) - var zkClient: ZkClient = null - try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) - println("deletion succeeded!") - } - catch { - case e: Throwable => - println("delection failed because of " + e.getMessage) - println(Utils.stackTrace(e)) - } - finally { - if (zkClient != null) - zkClient.close() - } - } -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index bdc72ea..6788c2e 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -36,9 +36,9 @@ object TopicCommand { val opts = new TopicCommandOptions(args) // should have exactly one action - val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _) + val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) if(actions != 1) { - System.err.println("Command must include exactly one action: --list, --describe, --create or --alter") + System.err.println("Command must include exactly one action: --list, --describe, --create, --alter or --delete") opts.parser.printHelpOn(System.err) System.exit(1) } @@ -56,6 +56,8 @@ object TopicCommand { listTopics(zkClient, opts) else if(opts.options.has(opts.describeOpt)) describeTopic(zkClient, opts) + else if(opts.options.has(opts.deleteOpt)) + deleteTopic(zkClient, opts) } catch { case e: Throwable => println("Error while executing topic command " + e.getMessage) @@ -122,7 +124,14 @@ object TopicCommand { for(topic <- topics) println(topic) } - + + def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { + val topics = getTopics(zkClient, opts) + topics.foreach { topic => + ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) + } + } + def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false @@ -210,6 +219,7 @@ object TopicCommand { .ofType(classOf[String]) val listOpt = parser.accepts("list", "List all available topics.") val createOpt = parser.accepts("create", "Create a new topic.") + val deleteOpt = parser.accepts("delete", "Delete a topic") val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.") val describeOpt = parser.accepts("describe", "List details for the given topics.") val helpOpt = parser.accepts("help", "Print usage information.") diff --git a/gradle.properties b/gradle.properties index 4827769..236e243 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,7 +15,7 @@ group=org.apache.kafka version=0.8.1 -scalaVersion=2.8.0 +scalaVersion=2.9.2 task=build mavenUrl=