diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 3c08dee..bac6dba 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -51,7 +51,7 @@ object TopicCommand { else if(opts.options.has(opts.deleteOpt)) deleteTopic(zkClient, opts) else if(opts.options.has(opts.listOpt)) - listTopics(zkClient) + listTopics(zkClient, opts) else if(opts.options.has(opts.describeOpt)) describeTopic(zkClient, opts) @@ -109,9 +109,19 @@ object TopicCommand { } } - def listTopics(zkClient: ZkClient) { - for(topic <- ZkUtils.getAllTopics(zkClient).sorted) - println(topic) + def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) { + if(opts.options.has(opts.topicsWithOverridesOpt)) { + ZkUtils.getAllTopics(zkClient).sorted.foreach { topic => + val configs = AdminUtils.fetchTopicConfig(zkClient, topic) + if(configs.size() != 0) { + println("\nConfig overrides for topic %s\n".format(topic)) + configs.list(System.out) + } + } + } else { + for(topic <- ZkUtils.getAllTopics(zkClient).sorted) + println(topic) + } } def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { @@ -229,6 +239,8 @@ object TopicCommand { "if set when describing topics, only show under replicated partitions") val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions", "if set when describing topics, only show partitions whose leader is not available") + val topicsWithOverridesOpt = parser.accepts("topics-with-overrides", + "if set when listing topics, only show topics that have overridden configs") val options = parser.parse(args : _*)