diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala index 095469b..12dadec 100644 --- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala @@ -20,7 +20,6 @@ package kafka.admin import joptsimple.OptionParser import org.I0Itec.zkclient.ZkClient import kafka.utils.{Utils, ZKStringSerializer, ZkUtils} -import kafka.common.ErrorMapping object ListTopicCommand { @@ -36,6 +35,10 @@ object ListTopicCommand { .withRequiredArg .describedAs("urls") .ofType(classOf[String]) + val reportUnderReplicatedPartitionsOpt = parser.accepts("under-replicated-partitions", + "if set, only show under replicated partiitons") + val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions", + "if set, only show partitions whose leader is not available") val options = parser.parse(args : _*) @@ -49,21 +52,24 @@ object ListTopicCommand { val topic = options.valueOf(topicOpt) val zkConnect = options.valueOf(zkConnectOpt) + val reportUnderReplicatedPartitions = if (options.has(reportUnderReplicatedPartitionsOpt)) true else false + val reportUnavailablePartitions = if (options.has(reportUnavailablePartitionsOpt)) true else false var zkClient: ZkClient = null try { var topicList: Seq[String] = Nil zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) if (topic == "") - topicList = ZkUtils.getChildren(zkClient, ZkUtils.BrokerTopicsPath).sorted + topicList = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted else topicList = List(topic) if (topicList.size <= 0) println("no topics exist!") + val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet for (t <- topicList) - showTopic(t, zkClient) + showTopic(t, zkClient, reportUnderReplicatedPartitions, reportUnavailablePartitions, liveBrokers) } catch { case e => @@ -76,14 +82,28 @@ object ListTopicCommand { } } - def showTopic(topic: String, zkClient: ZkClient) { - val topicMetaData = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient) - topicMetaData.errorCode match { - case ErrorMapping.UnknownTopicOrPartitionCode => + def showTopic(topic: String, zkClient: ZkClient, reportUnderReplicatedPartitions: Boolean, + reportUnavailablePartitions: Boolean, liveBrokers: Set[Int]) { + ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match { + case Some(topicPartitionAssignment) => + if (!reportUnderReplicatedPartitions && !reportUnavailablePartitions) + println("topic: " + topic) + val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) + for ((partitionId, assignedReplicas) <- sortedPartitions) { + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId) + val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId) + if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) || + (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) || + (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) { + print("\ttopic: " + topic) + print("\tpartition: " + partitionId) + print("\tleader: " + (if(leader.isDefined) leader.get else "none")) + print("\treplicas: " + assignedReplicas.mkString(",")) + println("\tisr: " + inSyncReplicas.mkString(",")) + } + } + case None => println("topic " + topic + " doesn't exist!") - case _ => - for (part <- topicMetaData.partitionsMetadata) - println("topic: " + topic + "\t" + part.toString) } } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index ce1904b..5673ae2 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -72,7 +72,7 @@ object ZkUtils extends Logging { } def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = { - val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted + val brokerIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath).sorted brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) }