From 51fa654db2df4786e8b4bb4a00eb87798e0026b5 Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Thu, 17 Jul 2014 17:11:28 -0600 Subject: [PATCH] Update kafka.tools to show consumer groups for particular topic and all if no topic is provided KAFKA-1476# --- .../scala/kafka/tools/ConsumerGroupLister.scala | 68 ++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 core/src/main/scala/kafka/tools/ConsumerGroupLister.scala diff --git a/core/src/main/scala/kafka/tools/ConsumerGroupLister.scala b/core/src/main/scala/kafka/tools/ConsumerGroupLister.scala new file mode 100644 index 0000000..877c107 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConsumerGroupLister.scala @@ -0,0 +1,68 @@ +package kafka.tools + +import kafka.utils.Logging +import joptsimple.OptionParser +import kafka.utils.CommandLineUtils +import kafka.utils.ZkUtils +import org.I0Itec.zkclient.ZkClient +import kafka.utils.ZKStringSerializer +import scala.collection.JavaConversions._ + +object ConsumerGroupLister extends Logging { + + def main(args: Array[String]) { + + val parser = new OptionParser() + + val topic = parser.accepts("topic", "topic for which consumer groups needs to be listed").withOptionalArg().ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect string."). + withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) + + if (args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Check the offset of your consumers.") + + val options = parser.parse(args: _*) + + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + + val list = getConsumerGroups(options.valueOf(topic), options.valueOf(zkConnectOpt)) + list.isEmpty match { + case false => { + println("Consumer Group") + println("------------------") + list.foreach(println(_)) + + } + + case _ => () + + } + + } + + def getConsumerGroups(topic: String, zkconnect: String) = { + var list = List[String]() + var map = scala.collection.mutable.Map[String, List[String]]() + var filteredList = List[String]() + + var zkClient: ZkClient = null + zkClient = new ZkClient(zkconnect, 30000, 30000, ZKStringSerializer) + list = zkClient.getChildren("/consumers").toList + if (topic != null) { + list.foreach { item => + try { + map = ZkUtils.getConsumersPerTopic(zkClient, item, true) + } catch { + case ex => {} + } + map.keySet.contains(topic) match { + case true => filteredList = filteredList.::(item) + case false => () + } + } + } + + filteredList + } + +} \ No newline at end of file -- 1.8.5.2 (Apple Git-48)