diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 842c110..ecce945 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -42,7 +42,8 @@ object TopicCommand { } CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) - if (!opts.options.has(opts.listOpt)) CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) + if (!opts.options.has(opts.listOpt) && !opts.options.has(opts.describeOpt)) + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) @@ -66,13 +67,23 @@ object TopicCommand { } } - private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = { + private def getSpecifiedTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = { val topicsSpec = opts.options.valueOf(opts.topicOpt) val topicsFilter = new Whitelist(topicsSpec) val allTopics = ZkUtils.getAllTopics(zkClient) allTopics.filter(topicsFilter.isTopicAllowed).sorted } + private def getSpecifiedOrAllTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = { + val allTopics = ZkUtils.getAllTopics(zkClient).sorted + if (opts.options.has(opts.topicOpt)) { + val topicsSpec = opts.options.valueOf(opts.topicOpt) + val topicsFilter = new Whitelist(topicsSpec) + allTopics.filter(topicsFilter.isTopicAllowed) + } else + allTopics + } + def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topic = opts.options.valueOf(opts.topicOpt) val configs = parseTopicConfigsToBeAdded(opts) @@ -89,7 +100,7 @@ object TopicCommand { } def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) { - val topics = getTopics(zkClient, opts) + val topics = getSpecifiedTopics(zkClient, opts) topics.foreach { topic => if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { val configsToBeAdded = parseTopicConfigsToBeAdded(opts) @@ -115,7 +126,7 @@ object TopicCommand { } def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { - val topics = getTopics(zkClient, opts) + val topics = getSpecifiedTopics(zkClient, opts) topics.foreach { topic => AdminUtils.deleteTopic(zkClient, topic) println("Topic \"%s\" deleted.".format(topic)) @@ -123,49 +134,47 @@ object TopicCommand { } def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) { - if(opts.options.has(opts.topicsWithOverridesOpt)) { - ZkUtils.getAllTopics(zkClient).sorted.foreach { topic => - val configs = AdminUtils.fetchTopicConfig(zkClient, topic) - if(configs.size() != 0) { - val replicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) - val numPartitions = replicaAssignment.size - val replicationFactor = replicaAssignment.head._2.size - println("\nTopic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s".format(topic, numPartitions, - replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) - } - } - } else { - for(topic <- ZkUtils.getAllTopics(zkClient).sorted) + val topics = getSpecifiedOrAllTopics(zkClient, opts) + for(topic <- topics) println(topic) - } } def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { - val topics = getTopics(zkClient, opts) + CommandLineUtils.checkExclusiveArgs(opts.parser, opts.options, + opts.reportUnderReplicatedPartitionsOpt, opts.reportUnavailablePartitionsOpt, opts.topicsWithOverridesOpt) + val topics = getSpecifiedOrAllTopics(zkClient, opts) val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false + val reportOverriddenConfigs = if (opts.options.has(opts.topicsWithOverridesOpt)) true else false val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet for (topic <- topics) { ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match { case Some(topicPartitionAssignment) => + var skipTopic = false val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) if (!reportUnavailablePartitions && !reportUnderReplicatedPartitions) { - println(topic) val config = AdminUtils.fetchTopicConfig(zkClient, topic) - println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", ")) - println("\tpartitions: " + sortedPartitions.size) + if (reportOverriddenConfigs && config.size() == 0) { + skipTopic = true + } else { + println(topic) + println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", ")) + println("\tpartitions: " + sortedPartitions.size) + } } - for ((partitionId, assignedReplicas) <- sortedPartitions) { - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId) - val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId) - if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) || - (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) || - (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) { - print("\t\ttopic: " + topic) - print("\tpartition: " + partitionId) - print("\tleader: " + (if(leader.isDefined) leader.get else "none")) - print("\treplicas: " + assignedReplicas.mkString(",")) - println("\tisr: " + inSyncReplicas.mkString(",")) + if (!skipTopic) { + for ((partitionId, assignedReplicas) <- sortedPartitions) { + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId) + val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId) + if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) || + (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) || + (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) { + print("\t\ttopic: " + topic) + print("\tpartition: " + partitionId) + print("\tleader: " + (if(leader.isDefined) leader.get else "none")) + print("\treplicas: " + assignedReplicas.mkString(",")) + println("\tisr: " + inSyncReplicas.mkString(",")) + } } } case None => @@ -255,7 +264,7 @@ object TopicCommand { val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions", "if set when describing topics, only show partitions whose leader is not available") val topicsWithOverridesOpt = parser.accepts("topics-with-overrides", - "if set when listing topics, only show topics that have overridden configs") + "if set when describing topics, only show topics that have overridden configs") val options = parser.parse(args : _*) diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala index 5f563ca..26083fe 100644 --- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala +++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala @@ -18,7 +18,7 @@ import joptsimple.{OptionSpec, OptionSet, OptionParser} -/** + /** * Helper functions for dealing with command line utilities */ object CommandLineUtils extends Logging { @@ -33,4 +33,12 @@ object CommandLineUtils extends Logging { } } + def checkExclusiveArgs(parser: OptionParser, options: OptionSet, exclusiveOptions: OptionSpec[_]*) { + val usedOptions = exclusiveOptions.filter(options.has(_)).toSet + if (usedOptions.size > 1) { + System.err.println("Arguments " + usedOptions.map(_.toString).mkString(",") + " are mutually exclusive") + parser.printHelpOn(System.err) + System.exit(1) + } + } } \ No newline at end of file