From 85eaae629c109ccf71e30ad83aed9550abcc6e01 Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Thu, 6 Nov 2014 11:09:07 -0700 Subject: [PATCH] KAFKA-1476 Get list of consumer groups --- .../main/scala/kafka/tools/ConsumerCommand.scala | 184 +++++++++++++++++++++ core/src/main/scala/kafka/utils/ZkUtils.scala | 4 + 2 files changed, 188 insertions(+) create mode 100644 core/src/main/scala/kafka/tools/ConsumerCommand.scala diff --git a/core/src/main/scala/kafka/tools/ConsumerCommand.scala b/core/src/main/scala/kafka/tools/ConsumerCommand.scala new file mode 100644 index 0000000..b05871d --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConsumerCommand.scala @@ -0,0 +1,184 @@ +package kafka.tools + +import java.util + +import joptsimple.OptionParser +import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, OffsetFetchResponse, OffsetFetchRequest} +import kafka.client.ClientUtils +import kafka.common.{BrokerNotAvailableException, ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.SimpleConsumer +import kafka.tools.ConsumerOffsetChecker._ +import org.I0Itec.zkclient.ZkClient +import kafka.utils._ +import org.I0Itec.zkclient.exception.ZkNoNodeException +import collection.JavaConversions._ +import scala.collection.{Map, mutable, Seq, immutable} + +object ConsumerCommand { + + private var offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map() + + def main(args: Array[String]) { + + val parser = new OptionParser() + + val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect string."). + withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) + val listGroupOpt = parser.accepts("list", " List Consumer groups.") + val describeGroupOpt = parser.accepts("describeGroup", " Describe Consumer groups."). + withOptionalArg().ofType(classOf[java.lang.String]) + val topicOpt = parser.accepts("topic", " Topic to be analyzed."). + withOptionalArg().ofType(classOf[java.lang.String]) + + val channelSocketTimeoutMsOpt = parser.accepts("socket.timeout.ms", "Socket timeout to use when querying for offsets."). + withOptionalArg().ofType(classOf[java.lang.Integer]).defaultsTo(600) + val channelRetryBackoffMsOpt = parser.accepts("retry.backoff.ms", "Retry back-off to use for failed offset queries."). + withOptionalArg().ofType(classOf[java.lang.Integer]).defaultsTo(300) + + parser.accepts("help", "Print this message.") + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "List consumer groups or describe group.") + + val options = parser.parse(args: _*) + + if(options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + + val zkConnect = options.valueOf(zkConnectOpt) + var zkClient: ZkClient = null + + try { + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + options.has(listGroupOpt) match { + case true => { + val list = ZkUtils.getConsumerGroups(zkClient).toList + printConsumerGroups(list) + } + case _ => + options.hasArgument(describeGroupOpt) && options.hasArgument(topicOpt) match { + case true => { + val groupName = options.valueOf(describeGroupOpt) + val topic = options.valueOf(topicOpt) + val (topicPidMap , offsetMap1) = getOffsetsByTopic(topic, zkClient, groupName, options.valueOf(channelSocketTimeoutMsOpt).intValue(), options.valueOf(channelRetryBackoffMsOpt).intValue()) + offsetMap = offsetMap1 + processTopic(zkClient,groupName,topic,topicPidMap) + } + case _ => CommandLineUtils.printUsageAndDie(parser, "List consumer groups or describe group.") + } + } + + + + } + catch { + case ex: Throwable => + println("Exiting due to: %s.".format(ex.getMessage)) + } + } + + def printConsumerGroups(list: List[String]) = { + list.foreach(println) + } + + def getOffsetsByTopic(topic: String, zkClient: ZkClient, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int) = { + val topicList = List[String](topic) + val topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq: _*) + val topicPartitions = topicPidMap.flatMap { case (topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _))}.toSeq + val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) + val offsetMap1 = new util.HashMap[TopicAndPartition, Long]() + debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) + channel.send(OffsetFetchRequest(group, topicPartitions)) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) + debug("Received offset fetch response %s.".format(offsetFetchResponse)) + + offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => + if(offsetAndMetadata == OffsetMetadataAndError.NoOffset) { + val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) + // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool + // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) + try { + val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong + offsetMap1.put(topicAndPartition, offset) + } catch { + case z: ZkNoNodeException => + if(ZkUtils.pathExists(zkClient, topicDirs.consumerOffsetDir)) + offsetMap1.put(topicAndPartition, -1) + else + throw z + } + } + else if(offsetAndMetadata.error == ErrorMapping.NoError) + offsetMap1.put(topicAndPartition, offsetAndMetadata.offset) + else { + println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) + } + } + channel.disconnect() + (topicPidMap , offsetMap1) + } + + private def processPartition(zkClient: ZkClient, group: String, topic: String, pid: Int) { + val topicPartition = TopicAndPartition(topic, pid) + val offsetOpt = offsetMap.get(topicPartition) + val groupDirs = new ZKGroupTopicDirs(group, topic) + val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/%s".format(pid))._1 + val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() + ZkUtils.getLeaderForPartition(zkClient, topic, pid) match { + case Some(bid) => + val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkClient, bid)) + consumerOpt match { + case Some(consumer) => + val topicAndPartition = TopicAndPartition(topic, pid) + val request = + OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + + val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) + println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), + owner match {case Some(ownerStr) => ownerStr case None => "none"})) + case None => // ignore + } + case None => + println("No broker for partition %s - %s".format(topic, pid)) + } + } + + private def processTopic(zkClient: ZkClient, group: String, topic: String, topicPidMap:immutable.Map[String, Seq[Int]]) = { + println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("GROUP", "TOPIC", "PID", "CURRENT OFFSET", "LOG SIZE", "LAG","OWNER")) + topicPidMap.get(topic) match { + case Some(pids) => + pids.sorted.foreach { + pid => processPartition(zkClient, group, topic, pid) + } + case None => // ignore + } + } + + private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = { + try { + ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1 match { + case Some(brokerInfoString) => + Json.parseFull(brokerInfoString) match { + case Some(m) => + val brokerInfo = m.asInstanceOf[Map[String, Any]] + val host = brokerInfo.get("host").get.asInstanceOf[String] + val port = brokerInfo.get("port").get.asInstanceOf[Int] + Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerOffsetChecker")) + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) + } + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) + } + } catch { + case t: Throwable => + println("Could not parse broker info due to " + t.getCause) + None + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 56e3e88..e0ecd36 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -713,6 +713,10 @@ object ZkUtils extends Logging { }.flatten.toSet } } + + def getConsumerGroups(zkClient: ZkClient) = { + zkClient.getChildren("/consumers") + } } object ZKStringSerializer extends ZkSerializer { -- 1.8.5.2 (Apple Git-48)