From 4d1cb4dd12248d6a938e8b09e21bf687f0e91f0f Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Fri, 5 Dec 2014 11:57:14 -0700 Subject: [PATCH] KAFKA-1476 Implemented review comments from Neha and Ashish --- .../main/scala/kafka/tools/ConsumerCommand.scala | 225 +++++++++++++++++++++ core/src/main/scala/kafka/utils/ZkUtils.scala | 8 + 2 files changed, 233 insertions(+) create mode 100644 core/src/main/scala/kafka/tools/ConsumerCommand.scala diff --git a/core/src/main/scala/kafka/tools/ConsumerCommand.scala b/core/src/main/scala/kafka/tools/ConsumerCommand.scala new file mode 100644 index 0000000..c4cb26c --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConsumerCommand.scala @@ -0,0 +1,225 @@ +package kafka.tools + +import java.util +import java.util.Properties + +import joptsimple.{OptionSet, ArgumentAcceptingOptionSpec, OptionParser} +import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, OffsetFetchResponse, OffsetFetchRequest} +import kafka.client.ClientUtils +import kafka.common.{BrokerNotAvailableException, ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.SimpleConsumer +import kafka.log.LogConfig +import kafka.tools.ConsumerOffsetChecker._ +import org.I0Itec.zkclient.ZkClient +import kafka.utils._ +import org.I0Itec.zkclient.exception.ZkNoNodeException +import collection.JavaConversions._ +import scala.collection.{Map, mutable, Seq, immutable} + +object ConsumerCommand { + + private var offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map() + + def main(args: Array[String]) { + + val parser = new OptionParser() + + val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect string."). + withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) + val listGroupOpt = parser.accepts("list-group", " List Consumer groups.") + val describeGroupOpt = parser.accepts("describe-group", " Describe Consumer group and list offset lag related to given group."). + withOptionalArg().ofType(classOf[String]) + + val configOpt: ArgumentAcceptingOptionSpec[String] = parser.accepts("config", "Configuration for timeouts for instance " + + "--config channelSocketTimeoutMs=600") + .withOptionalArg().describedAs("name=value").ofType(classOf[String]) + + parser.accepts("help", "Print this message.") + + val (options: OptionSet, zkConnect: String) = validateArgs(args, parser, zkConnectOpt) + + var zkClient: ZkClient = null + + try { + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + if(options.has(listGroupOpt)) { + listConsumerGroups(zkClient) + } + else if(options.hasArgument(describeGroupOpt)) { + val (channelSocketTimeoutMs: String, channelRetryBackoffMs: String) = getTimeouts(configOpt, options) + describeGroup(parser, describeGroupOpt, options, zkClient, channelSocketTimeoutMs.toInt, channelRetryBackoffMs.toInt) + } + else { + CommandLineUtils.printUsageAndDie(parser, "List consumer groups or describe group.") + } + } + catch { + case ex: Throwable => + println("Exiting due to: %s.".format(ex.getStackTraceString)) + } + finally{ + zkClient.close() + } + } + + def getTimeouts(configOpt: ArgumentAcceptingOptionSpec[String], options: OptionSet): (String, String) = { + val configs = getConfig(options, configOpt) + val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600") + val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300") + (channelSocketTimeoutMs, channelRetryBackoffMs) + } + + def validateArgs(args: Array[String], parser: OptionParser, zkConnectOpt: ArgumentAcceptingOptionSpec[String]): (OptionSet, String) = { + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "List consumer groups or describe group.") + + val options: OptionSet = parser.parse(args: _*) + + if(options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + + val zkConnect = options.valueOf(zkConnectOpt) + + if(!zkConnect.contains(":")) { + CommandLineUtils.printUsageAndDie(parser, "Please enter zookeeper port zookeeper101:2181.") + } + (options, zkConnect) + } + + def describeGroup(parser: OptionParser, describeGroupOpt: ArgumentAcceptingOptionSpec[String], options: OptionSet, zkClient: ZkClient, channelSocketTimeoutMs: Integer, channelRetryBackoffMs: Integer) { + val listOfTopics = ZkUtils.getTopicsByConsumerGroup(zkClient, options.valueOf(describeGroupOpt)) + + if(listOfTopics.isEmpty) { + CommandLineUtils.printUsageAndDie(parser, "No Topic available for consumer group provided") + } + + listOfTopics.foreach(topic => describeGroupByTopic(options.valueOf(describeGroupOpt), topic, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)) + } + + def printConsumerGroups(list: Seq[String]) = { + list.foreach(println) + } + + def getOffsetsByTopic(topic: String, zkClient: ZkClient, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int) = { + val topicList = List[String](topic) + val topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq: _*) + val topicPartitions = topicPidMap.flatMap { case (topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _))}.toSeq + val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) + val offsetMap = new util.HashMap[TopicAndPartition, Long]() + debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) + channel.send(OffsetFetchRequest(group, topicPartitions)) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) + debug("Received offset fetch response %s.".format(offsetFetchResponse)) + + offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => + if(offsetAndMetadata == OffsetMetadataAndError.NoOffset) { + val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) + // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool + // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) + try { + val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong + offsetMap.put(topicAndPartition, offset) + } catch { + case z: ZkNoNodeException => + if(ZkUtils.pathExists(zkClient, topicDirs.consumerOffsetDir)) + offsetMap.put(topicAndPartition, -1) + else + throw z + } + } + else if(offsetAndMetadata.error == ErrorMapping.NoError) + offsetMap.put(topicAndPartition, offsetAndMetadata.offset) + else { + println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) + } + } + channel.disconnect() + (topicPidMap, offsetMap) + } + + private def processPartition(zkClient: ZkClient, group: String, topic: String, pid: Int) { + val topicPartition = TopicAndPartition(topic, pid) + val offsetOpt = offsetMap.get(topicPartition) + val groupDirs = new ZKGroupTopicDirs(group, topic) + val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/%s".format(pid))._1 + val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() + ZkUtils.getLeaderForPartition(zkClient, topic, pid) match { + case Some(bid) => + val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkClient, bid)) + consumerOpt match { + case Some(consumer) => + val topicAndPartition = TopicAndPartition(topic, pid) + val request = + OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + + val lagString = offsetOpt.map(o => if(o == -1) "unknown" else (logSize - o).toString) + println("%-15s, %-30s, %-3s, %-15s, %-15s, %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), + owner match { case Some(ownerStr) => ownerStr case None => "none"})) + case None => // ignore + } + case None => + println("No broker for partition %s - %s".format(topic, pid)) + } + } + + private def processTopic(zkClient: ZkClient, group: String, topic: String, topicPidMap: immutable.Map[String, Seq[Int]]) = { + println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("GROUP", "TOPIC", "PID", "CURRENT OFFSET", "LOG SIZE", "LAG", "OWNER")) + topicPidMap.get(topic) match { + case Some(pids) => + pids.sorted.foreach { + pid => processPartition(zkClient, group, topic, pid) + } + case None => // ignore + } + } + + private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = { + try { + ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1 match { + case Some(brokerInfoString) => + Json.parseFull(brokerInfoString) match { + case Some(m) => + val brokerInfo = m.asInstanceOf[Map[String, Any]] + val host = brokerInfo.get("host").get.asInstanceOf[String] + val port = brokerInfo.get("port").get.asInstanceOf[Int] + Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerOffsetChecker")) + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) + } + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) + } + } catch { + case t: Throwable => + println("Could not parse broker info due to " + t.getStackTraceString) + None + } + } + + private def listConsumerGroups(zkClient: ZkClient) = { + val list = ZkUtils.getConsumerGroups(zkClient) + printConsumerGroups(list) + } + + + private def describeGroupByTopic(groupName: String, topic: String, zkClient: ZkClient, channelSocketTimeout: Int, channelRetryBackoffMs: Int) = { + val (topicPidMap, offsetMap1) = getOffsetsByTopic(topic, zkClient, groupName, channelSocketTimeout, channelRetryBackoffMs) + offsetMap = offsetMap1 + processTopic(zkClient, groupName, topic, topicPidMap) + } + + private def getConfig(options: OptionSet, configOpt: ArgumentAcceptingOptionSpec[String]) = { + val configs = options.valuesOf(configOpt).map(_.split( """\s*=\s*""")) + require(configs.forall(config => config.length == 2), + "Invalid topic config: all configs to be added must be in the format \"key=val\".") + val props = new Properties + if(!configs.isEmpty) + configs.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim)) + props + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 56e3e88..bfd9376 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -713,6 +713,14 @@ object ZkUtils extends Logging { }.flatten.toSet } } + + def getConsumerGroups(zkClient: ZkClient) = { + zkClient.getChildren(ConsumersPath) + } + + def getTopicsByConsumerGroup(zkClient: ZkClient,consumerGroup:String) = { + zkClient.getChildren(ConsumersPath+"/"+consumerGroup+"/owners") + } } object ZKStringSerializer extends ZkSerializer { -- 1.8.5.2 (Apple Git-48)