From d14f6fd7b0e523bf979030344873242c38e055a1 Mon Sep 17 00:00:00 2001 From: asingh Date: Mon, 23 Feb 2015 18:11:26 -0800 Subject: [PATCH] KAFKA-313: Add JSON/CSV output and looping options to ConsumerGroupCommand --- .../scala/kafka/admin/ConsumerGroupCommand.scala | 125 +++++++++++++++++++-- 1 file changed, 116 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 89fa29a..b9918c9 100644 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -28,12 +28,21 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException import kafka.common.TopicAndPartition import joptsimple.{OptionSpec, OptionParser} import scala.collection.{Set, mutable} +import scala.util.control.Breaks._ import kafka.consumer.SimpleConsumer import collection.JavaConversions._ +private object OutputFormatType extends Enumeration { + type OutputFormatType = Value + val CSV, JSON, NONE = Value +} + +import OutputFormatType._ object ConsumerGroupCommand { + private var outputFormat: OutputFormatType = OutputFormatType.NONE + def main(args: Array[String]) { val opts = new ConsumerGroupCommandOptions(args) @@ -47,6 +56,8 @@ object ConsumerGroupCommand { opts.checkArgs() + setOutputFormat(opts) + val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) try { @@ -75,10 +86,22 @@ object ConsumerGroupCommand { val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt val group = opts.options.valueOf(opts.groupOpt) val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group) - if (topics.isEmpty) { - println("No topic available for consumer group provided") + + val loopInterval = getLoopInterval(opts) + + breakable { + do { + val loopStartTime = System.currentTimeMillis() + + if (topics.isEmpty) { + println("No topic available for consumer group provided") + } + topics.foreach(topic => describeTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs)) + + + Thread.sleep(getSleepTime(loopStartTime, loopInterval)) + } while (loopInterval > 0) } - topics.foreach(topic => describeTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs)) } def delete(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { @@ -150,8 +173,7 @@ object ConsumerGroupCommand { channelRetryBackoffMs: Int) { val topicPartitions = getTopicPartitions(zkClient, topic) val partitionOffsets = getPartitionOffsets(zkClient, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs) - println("%s, %s, %s, %s, %s, %s, %s" - .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER")) + printHeader() topicPartitions .sortBy { case topicPartition => topicPartition.partition } .foreach { topicPartition => @@ -199,6 +221,46 @@ object ConsumerGroupCommand { offsetMap.toMap } + private def printHeader() { + val columns = Seq() + + outputFormat match { + case OutputFormatType.NONE => println("%s, %s, %s, %s, %s, %s, %s". + format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER") + ) + case OutputFormatType.CSV => println("%s,%s,%s,%s,%s,%s,%s". + format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER") + ) + case OutputFormatType.JSON => // Header not required + } + } + + private def printInfo(group: String, topic: String, partition: Int, currentOffset: Option[Long], + logEndOffset: Any, lag: Any, owner: String) { + outputFormat match { + case OutputFormatType.NONE => println("%s, %s, %s, %s, %s, %s, %s". + format(group, topic, partition, currentOffset.getOrElse("unknown"), logEndOffset, + lag, owner) + ) + case OutputFormatType.CSV => println("%s,%s,%s,%s,%s,%s,%s". + format(group, topic, partition, currentOffset.getOrElse("unknown"), logEndOffset, + lag, owner) + ) + case OutputFormatType.JSON => println( + (Json.encode( + Map( + "Group" -> group, + "Topic" -> topic, + "Partition" -> partition, + "Current Offset" -> currentOffset.getOrElse("unknown"), + "Log End Offset" -> logEndOffset, + "Lag" -> lag, + "Owner" -> owner) + )) + ) + } + } + private def describePartition(zkClient: ZkClient, group: String, topic: String, @@ -209,8 +271,8 @@ object ConsumerGroupCommand { val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/" + partition)._1 ZkUtils.getLeaderForPartition(zkClient, topic, partition) match { case Some(-1) => - println("%s, %s, %s, %s, %s, %s, %s" - .format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none"))) + printInfo(group, topic, partition, offsetOpt, "unknown", "unknown", + owner.getOrElse("none")) case Some(brokerId) => val consumerOpt = getConsumer(zkClient, brokerId) consumerOpt match { @@ -221,8 +283,8 @@ object ConsumerGroupCommand { consumer.close() val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _) - println("%s, %s, %s, %s, %s, %s, %s" - .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none"))) + printInfo(group, topic, partition, offsetOpt, logEndOffset, + lag.getOrElse("unknown"), owner.getOrElse("none")) case None => // ignore } case None => @@ -253,6 +315,42 @@ object ConsumerGroupCommand { } } + private def getLoopInterval(opts: ConsumerGroupCommandOptions): Int = { + var loopInterval = if (opts.options.has(opts.loopIntervalOpt)) opts.options.valueOf(opts + .loopIntervalOpt).toInt else 0 + if (loopInterval < 0) { + CommandLineUtils.printUsageAndDie(opts.parser, "Loop value must be greater than 0: %s" + .format(opts.options.valueOf(opts.loopIntervalOpt))) + } else { + loopInterval *= 1000 + } + + return loopInterval + } + + private def getSleepTime(startTime: Long, loopInterval: Int): Long = { + val curTime = System.currentTimeMillis() + val sleepTime = loopInterval - (curTime - startTime) + if (sleepTime > 0) sleepTime else 0 + } + + private def setOutputFormat(opts: ConsumerGroupCommandOptions) { + if (opts.options.has(opts.outputFormatOpt)) { + val outputFormatOptVal = opts.options.valueOf(opts.outputFormatOpt).trim + try { + outputFormat = OutputFormatType.withName(outputFormatOptVal.toUpperCase()); + } catch { + case t: NoSuchElementException => + System.err.println("Output format can only be specified as one of the following") + def ofTypes(t: OutputFormatType) = t != OutputFormatType.NONE + OutputFormatType.values filter ofTypes foreach System.err.println + System.err.println() + opts.parser.printHelpOn(System.err) + System.exit(1) + } + } + } + class ConsumerGroupCommandOptions(args: Array[String]) { val ZkConnectDoc = "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over." @@ -269,6 +367,9 @@ object ConsumerGroupCommand { "Pass in just a topic to delete the given topic's partition offsets and ownership information " + "for every consumer group. For instance --topic t1" + nl + "WARNING: Only does deletions on consumer groups that are not active." + val loopDoc = "Loop interval (in seconds, greater than 0)" + val outputFormatDoc= "Set Output format to csv or json" + val parser = new OptionParser val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc) .withRequiredArg @@ -289,6 +390,10 @@ object ConsumerGroupCommand { val listOpt = parser.accepts("list", ListDoc) val describeOpt = parser.accepts("describe", DescribeDoc) val deleteOpt = parser.accepts("delete", DeleteDoc) + val loopIntervalOpt = parser.accepts("loop", loopDoc). + withRequiredArg().ofType(classOf[java.lang.Integer]) + val outputFormatOpt = parser.accepts("output.format", outputFormatDoc). + withRequiredArg().ofType(classOf[String]) val options = parser.parse(args : _*) val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt) @@ -300,6 +405,8 @@ object ConsumerGroupCommand { CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt)) CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt)) + if (options.has(loopIntervalOpt)) + CommandLineUtils.checkRequiredArgs(parser, options, describeOpt) // check invalid args CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt) -- 1.9.3 (Apple Git-50)