diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala index f91eca2..0883d2d 100644 --- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala @@ -21,12 +21,13 @@ import joptsimple.OptionParser import org.I0Itec.zkclient.ZkClient import kafka.utils.{Utils, ZKStringSerializer, ZkUtils} import kafka.common.ErrorMapping +import kafka.cluster.Broker object ListTopicCommand { def main(args: Array[String]): Unit = { val parser = new OptionParser - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be deleted.") + val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be listed. Defaults to all existing topics.") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) @@ -55,7 +56,7 @@ object ListTopicCommand { zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) if (topic == "") - topicList = ZkUtils.getChildren(zkClient, ZkUtils.BrokerTopicsPath) + topicList = ZkUtils.getChildren(zkClient, ZkUtils.BrokerTopicsPath).sorted else topicList = List(topic) @@ -83,8 +84,14 @@ object ListTopicCommand { println("topic " + topic + " doesn't exist!") case _ => println("topic: " + topic) - for (part <- topicMetaData.partitionsMetadata) - println(part.toString) + for (pd <- topicMetaData.partitionsMetadata) { + print("\t\tpartition " + pd.partitionId) + print("\t\tleader: " + (if(pd.leader.isDefined) formatBroker(pd.leader.get) else "none")) + print("\t\treplicas: " + pd.replicas.map(formatBroker).mkString(", ")) + println("\t\tisr: " + pd.isr.map(formatBroker).mkString(", ")) + } } } + + def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")" } \ No newline at end of file