From 51fa654db2df4786e8b4bb4a00eb87798e0026b5 Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Thu, 17 Jul 2014 17:11:28 -0600 Subject: [PATCH 1/3] Update kafka.tools to show consumer groups for particular topic and all if no topic is provided KAFKA-1476# --- .../scala/kafka/tools/ConsumerGroupLister.scala | 68 ++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 core/src/main/scala/kafka/tools/ConsumerGroupLister.scala diff --git a/core/src/main/scala/kafka/tools/ConsumerGroupLister.scala b/core/src/main/scala/kafka/tools/ConsumerGroupLister.scala new file mode 100644 index 0000000..877c107 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConsumerGroupLister.scala @@ -0,0 +1,68 @@ +package kafka.tools + +import kafka.utils.Logging +import joptsimple.OptionParser +import kafka.utils.CommandLineUtils +import kafka.utils.ZkUtils +import org.I0Itec.zkclient.ZkClient +import kafka.utils.ZKStringSerializer +import scala.collection.JavaConversions._ + +object ConsumerGroupLister extends Logging { + + def main(args: Array[String]) { + + val parser = new OptionParser() + + val topic = parser.accepts("topic", "topic for which consumer groups needs to be listed").withOptionalArg().ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect string."). + withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) + + if (args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Check the offset of your consumers.") + + val options = parser.parse(args: _*) + + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + + val list = getConsumerGroups(options.valueOf(topic), options.valueOf(zkConnectOpt)) + list.isEmpty match { + case false => { + println("Consumer Group") + println("------------------") + list.foreach(println(_)) + + } + + case _ => () + + } + + } + + def getConsumerGroups(topic: String, zkconnect: String) = { + var list = List[String]() + var map = scala.collection.mutable.Map[String, List[String]]() + var filteredList = List[String]() + + var zkClient: ZkClient = null + zkClient = new ZkClient(zkconnect, 30000, 30000, ZKStringSerializer) + list = zkClient.getChildren("/consumers").toList + if (topic != null) { + list.foreach { item => + try { + map = ZkUtils.getConsumersPerTopic(zkClient, item, true) + } catch { + case ex => {} + } + map.keySet.contains(topic) match { + case true => filteredList = filteredList.::(item) + case false => () + } + } + } + + filteredList + } + +} \ No newline at end of file -- 1.8.5.2 (Apple Git-48) From 74d62b5531b2e91c1d65d05e4aff7f03c118bb10 Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Fri, 18 Jul 2014 13:18:32 -0600 Subject: [PATCH 2/3] Renamed the class as per Jay.Kreps --- .../main/scala/kafka/tools/ConsumerCommand.scala | 77 ++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 core/src/main/scala/kafka/tools/ConsumerCommand.scala diff --git a/core/src/main/scala/kafka/tools/ConsumerCommand.scala b/core/src/main/scala/kafka/tools/ConsumerCommand.scala new file mode 100644 index 0000000..60d95f4 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConsumerCommand.scala @@ -0,0 +1,77 @@ +package kafka.tools + +import kafka.utils.Logging +import joptsimple.OptionParser +import kafka.utils.CommandLineUtils +import kafka.utils.ZkUtils +import org.I0Itec.zkclient.ZkClient +import kafka.utils.ZKStringSerializer +import scala.collection.JavaConversions._ + +object ConsumerCommand extends Logging { + + def main(args: Array[String]) { + + val parser = new OptionParser() + val listAll = parser.accepts("listAll", "List all groups") + val topic = parser.accepts("topic", "topic for which consumer groups needs to be listed").withOptionalArg().ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect string."). + withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) + + if (args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Check the offset of your consumers.") + + val options = parser.parse(args: _*) + + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + + var topicString = options.valueOf(topic) + + if(options.has("listAll")) + topicString = "" + + val list = getConsumerGroups(topicString, options.valueOf(zkConnectOpt)) + list.isEmpty match { + case false => { + println("Consumer Group") + println("------------------") + list.foreach(println(_)) + + } + + case _ => () + + } + + } + + def getConsumerGroups(topic: String, zkconnect: String) = { + var list = List[String]() + var map = scala.collection.mutable.Map[String, List[String]]() + var filteredList = List[String]() + + var zkClient: ZkClient = null + zkClient = new ZkClient(zkconnect, 30000, 30000, ZKStringSerializer) + list = zkClient.getChildren("/consumers").toList + if (topic != "") { + list.foreach { item => + try { + map = ZkUtils.getConsumersPerTopic(zkClient, item, true) + } catch { + case ex => {} + } + map.keySet.contains(topic) match { + case true => filteredList = filteredList.::(item) + case false => () + } + } + } + else + { + filteredList = list + } + + filteredList + } + +} \ No newline at end of file -- 1.8.5.2 (Apple Git-48) From 838b6af1f14144841ffbf7af414a697b5341594a Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Fri, 18 Jul 2014 13:18:50 -0600 Subject: [PATCH 3/3] Remove this class --- .../scala/kafka/tools/ConsumerGroupLister.scala | 68 ---------------------- 1 file changed, 68 deletions(-) delete mode 100644 core/src/main/scala/kafka/tools/ConsumerGroupLister.scala diff --git a/core/src/main/scala/kafka/tools/ConsumerGroupLister.scala b/core/src/main/scala/kafka/tools/ConsumerGroupLister.scala deleted file mode 100644 index 877c107..0000000 --- a/core/src/main/scala/kafka/tools/ConsumerGroupLister.scala +++ /dev/null @@ -1,68 +0,0 @@ -package kafka.tools - -import kafka.utils.Logging -import joptsimple.OptionParser -import kafka.utils.CommandLineUtils -import kafka.utils.ZkUtils -import org.I0Itec.zkclient.ZkClient -import kafka.utils.ZKStringSerializer -import scala.collection.JavaConversions._ - -object ConsumerGroupLister extends Logging { - - def main(args: Array[String]) { - - val parser = new OptionParser() - - val topic = parser.accepts("topic", "topic for which consumer groups needs to be listed").withOptionalArg().ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect string."). - withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) - - if (args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Check the offset of your consumers.") - - val options = parser.parse(args: _*) - - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) - - val list = getConsumerGroups(options.valueOf(topic), options.valueOf(zkConnectOpt)) - list.isEmpty match { - case false => { - println("Consumer Group") - println("------------------") - list.foreach(println(_)) - - } - - case _ => () - - } - - } - - def getConsumerGroups(topic: String, zkconnect: String) = { - var list = List[String]() - var map = scala.collection.mutable.Map[String, List[String]]() - var filteredList = List[String]() - - var zkClient: ZkClient = null - zkClient = new ZkClient(zkconnect, 30000, 30000, ZKStringSerializer) - list = zkClient.getChildren("/consumers").toList - if (topic != null) { - list.foreach { item => - try { - map = ZkUtils.getConsumersPerTopic(zkClient, item, true) - } catch { - case ex => {} - } - map.keySet.contains(topic) match { - case true => filteredList = filteredList.::(item) - case false => () - } - } - } - - filteredList - } - -} \ No newline at end of file -- 1.8.5.2 (Apple Git-48)