diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 083fd63..187f4f6 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -42,6 +42,7 @@ object TopicCommand { } CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) + if (!opts.options.has(opts.listOpt)) CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) @@ -57,7 +58,9 @@ object TopicCommand { else if(opts.options.has(opts.describeOpt)) describeTopic(zkClient, opts) } catch { - case e => println("Error while executing topic command", e) + case e => + println("Error while executing topic command", e) + println(Utils.stackTrace(e)) } finally { zkClient.close() } @@ -71,7 +74,6 @@ object TopicCommand { } def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { - CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) val topic = opts.options.valueOf(opts.topicOpt) val configs = parseTopicConfigsToBeAdded(opts) if (opts.options.has(opts.replicaAssignmentOpt)) { @@ -87,7 +89,6 @@ object TopicCommand { } def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) { - CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) val topics = getTopics(zkClient, opts) topics.foreach { topic => if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { @@ -114,7 +115,6 @@ object TopicCommand { } def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { - CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) val topics = getTopics(zkClient, opts) topics.foreach { topic => AdminUtils.deleteTopic(zkClient, topic) @@ -141,7 +141,7 @@ object TopicCommand { } def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { - var topics = getTopics(zkClient, opts) + val topics = getTopics(zkClient, opts) val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet