diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 3c08dee..468bec9 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -51,7 +51,7 @@ object TopicCommand { else if(opts.options.has(opts.deleteOpt)) deleteTopic(zkClient, opts) else if(opts.options.has(opts.listOpt)) - listTopics(zkClient) + listTopics(zkClient, opts) else if(opts.options.has(opts.describeOpt)) describeTopic(zkClient, opts) @@ -109,9 +109,22 @@ object TopicCommand { } } - def listTopics(zkClient: ZkClient) { - for(topic <- ZkUtils.getAllTopics(zkClient).sorted) - println(topic) + def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) { + if(opts.options.has(opts.topicsWithOverridesOpt)) { + ZkUtils.getAllTopics(zkClient).sorted.foreach { topic => + val configs = AdminUtils.fetchTopicConfig(zkClient, topic) + if(configs.size() != 0) { + val replicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) + val numPartitions = replicaAssignment.groupBy(_._1.topic).size + val replicationFactor = replicaAssignment.head._2.size + println("\nTopic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s".format(topic, numPartitions, + replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) + } + } + } else { + for(topic <- ZkUtils.getAllTopics(zkClient).sorted) + println(topic) + } } def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { @@ -229,6 +242,8 @@ object TopicCommand { "if set when describing topics, only show under replicated partitions") val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions", "if set when describing topics, only show partitions whose leader is not available") + val topicsWithOverridesOpt = parser.accepts("topics-with-overrides", + "if set when listing topics, only show topics that have overridden configs") val options = parser.parse(args : _*)