diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala index 095469b..4c560a5 100644 --- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala @@ -36,6 +36,8 @@ object ListTopicCommand { .withRequiredArg .describedAs("urls") .ofType(classOf[String]) + val reportUnderReplicatedPartitionsOpt = parser.accepts("under-replicated-partitions", + "if set, only show under replicated partiitons") val options = parser.parse(args : _*) @@ -49,13 +51,14 @@ object ListTopicCommand { val topic = options.valueOf(topicOpt) val zkConnect = options.valueOf(zkConnectOpt) + val reportUnderReplicatedPartitions = if (options.has(reportUnderReplicatedPartitionsOpt)) true else false var zkClient: ZkClient = null try { var topicList: Seq[String] = Nil zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) if (topic == "") - topicList = ZkUtils.getChildren(zkClient, ZkUtils.BrokerTopicsPath).sorted + topicList = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted else topicList = List(topic) @@ -63,7 +66,7 @@ object ListTopicCommand { println("no topics exist!") for (t <- topicList) - showTopic(t, zkClient) + showTopic(t, zkClient, reportUnderReplicatedPartitions) } catch { case e => @@ -76,14 +79,25 @@ object ListTopicCommand { } } - def showTopic(topic: String, zkClient: ZkClient) { - val topicMetaData = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient) - topicMetaData.errorCode match { - case ErrorMapping.UnknownTopicOrPartitionCode => + def showTopic(topic: String, zkClient: ZkClient, reportUnderReplicatedPartitions: Boolean) { + ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match { + case Some(topicPartitionAssignment) => + if (!reportUnderReplicatedPartitions) + println("topic: " + topic) + val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) + for ((partitionId, assignedReplicas) <- sortedPartitions) { + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId) + val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId) + if (!reportUnderReplicatedPartitions || inSyncReplicas.size < assignedReplicas.size) { + print("\ttopic: " + topic) + print("\tpartition: " + partitionId) + print("\tleader: " + (if(leader.isDefined) leader.get else "none")) + print("\treplicas: " + assignedReplicas.mkString(",")) + println("\tisr: " + inSyncReplicas.mkString(",")) + } + } + case None => println("topic " + topic + " doesn't exist!") - case _ => - for (part <- topicMetaData.partitionsMetadata) - println("topic: " + topic + "\t" + part.toString) } } } \ No newline at end of file