From 255a2f23342600866d23c5076aab9cb67e1d1626 Mon Sep 17 00:00:00 2001 From: asingh Date: Mon, 23 Feb 2015 18:11:26 -0800 Subject: [PATCH] KAFKA-313: Add JSON/CSV output and looping options to ConsumerGroupCommand --- .../scala/kafka/admin/ConsumerGroupCommand.scala | 162 ++++++++++++++++++--- 1 file changed, 142 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index f23120e..f1a4513 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -18,23 +18,32 @@ package kafka.admin -import kafka.utils._ -import org.I0Itec.zkclient.ZkClient -import kafka.common._ import java.util.Properties + +import joptsimple.{OptionParser, OptionSpec} +import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo} import kafka.client.ClientUtils -import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, OffsetFetchResponse, OffsetFetchRequest} -import org.I0Itec.zkclient.exception.ZkNoNodeException -import kafka.common.TopicAndPartition -import joptsimple.{OptionSpec, OptionParser} -import scala.collection.{Set, mutable} +import kafka.common.{TopicAndPartition, _} import kafka.consumer.SimpleConsumer -import collection.JavaConversions._ +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient +import org.I0Itec.zkclient.exception.ZkNoNodeException import org.apache.kafka.common.utils.Utils +import scala.collection.JavaConversions._ +import scala.collection.{Set, mutable} + object ConsumerGroupCommand { + private object OutputFormatType extends Enumeration { + type OutputFormatType = Value + val CSV, JSON, NONE = Value + } + + import OutputFormatType._ + private var outputFormat: OutputFormatType = OutputFormatType.NONE + def main(args: Array[String]) { val opts = new ConsumerGroupCommandOptions(args) @@ -47,7 +56,7 @@ object ConsumerGroupCommand { CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete") opts.checkArgs() - + setOutputFormat(opts) val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000) try { @@ -76,10 +85,35 @@ object ConsumerGroupCommand { val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt val group = opts.options.valueOf(opts.groupOpt) val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group) - if (topics.isEmpty) { - println("No topic available for consumer group provided") + + val loopInterval = getLoopInterval(opts) + + var currentIteration = 0 + do { + val loopStartTime = System.currentTimeMillis() + + if (topics.isEmpty) { + println("No topic available for consumer group provided") + } + topics.foreach(topic => describeTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs)) + + + Thread.sleep(getSleepTime(loopStartTime, loopInterval)) + currentIteration += 1 + } while (loopInterval > 0 && iterationsLeft(opts, currentIteration)) + } + + def iterationsLeft(opts: ConsumerGroupCommandOptions, currentIteration: Int): Boolean = { + if (opts.options.has(opts.numIterationsOpt)) { + val numIterations = opts.options.valueOf(opts.numIterationsOpt).toInt + + if (numIterations < 0) + CommandLineUtils.printUsageAndDie(opts.parser, s"Loop value must be greater than 0: ${opts.options.valueOf(opts.loopIntervalOpt)}") + + currentIteration < numIterations + } else { + true // keep iterating, if iterations not limited by numIterations } - topics.foreach(topic => describeTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs)) } def delete(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { @@ -151,8 +185,7 @@ object ConsumerGroupCommand { channelRetryBackoffMs: Int) { val topicPartitions = getTopicPartitions(zkClient, topic) val partitionOffsets = getPartitionOffsets(zkClient, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs) - println("%s, %s, %s, %s, %s, %s, %s" - .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER")) + printHeader() topicPartitions .sortBy { case topicPartition => topicPartition.partition } .foreach { topicPartition => @@ -200,6 +233,46 @@ object ConsumerGroupCommand { offsetMap.toMap } + private def printHeader() { + outputFormat match { + case OutputFormatType.NONE => println( + Seq("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER") + .mkString(", ") + ) + case OutputFormatType.CSV => println( + Seq("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER") + .mkString(",") + ) + case OutputFormatType.JSON => // Header not required + } + } + + private def printInfo(group: String, topic: String, partition: Int, currentOffset: Option[Long], + logEndOffset: String, lag: String, owner: String) { + outputFormat match { + case OutputFormatType.NONE => println( + Seq(group, topic, partition, currentOffset.getOrElse("unknown"), logEndOffset, + lag, owner).mkString(", ") + ) + case OutputFormatType.CSV => println( + Seq(group, topic, partition, currentOffset.getOrElse("unknown"), logEndOffset, + lag, owner).mkString(",") + ) + case OutputFormatType.JSON => println( + (Json.encode( + Map( + "Group" -> group, + "Topic" -> topic, + "Partition" -> partition, + "Current Offset" -> currentOffset.getOrElse("unknown"), + "Log End Offset" -> logEndOffset, + "Lag" -> lag, + "Owner" -> owner) + )) + ) + } + } + private def describePartition(zkClient: ZkClient, group: String, topic: String, @@ -210,20 +283,21 @@ object ConsumerGroupCommand { val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/" + partition)._1 ZkUtils.getLeaderForPartition(zkClient, topic, partition) match { case Some(-1) => - println("%s, %s, %s, %s, %s, %s, %s" - .format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none"))) + printInfo(group, topic, partition, offsetOpt, "unknown", "unknown", + owner.getOrElse("none")) case Some(brokerId) => val consumerOpt = getConsumer(zkClient, brokerId) consumerOpt match { case Some(consumer) => val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) - val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + val logEndOffset = + consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head consumer.close() val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _) - println("%s, %s, %s, %s, %s, %s, %s" - .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none"))) + printInfo(group, topic, partition, offsetOpt, logEndOffset.toString, + lag.getOrElse("unknown").toString, owner.getOrElse("none")) case None => // ignore } case None => @@ -254,6 +328,40 @@ object ConsumerGroupCommand { } } + private def getLoopInterval(opts: ConsumerGroupCommandOptions): Int = { + var loopInterval = if (opts.options.has(opts.loopIntervalOpt)) opts.options.valueOf(opts + .loopIntervalOpt).toInt else 0 + if (loopInterval < 0) + CommandLineUtils.printUsageAndDie(opts.parser, "Loop value must be greater than 0: %s" + .format(opts.options.valueOf(opts.loopIntervalOpt))) + else + loopInterval *= 1000 + + loopInterval + } + + private def getSleepTime(startTime: Long, loopInterval: Int): Long = { + val elapsedTime = System.currentTimeMillis() - startTime + val sleepTime = loopInterval - elapsedTime + if (sleepTime > 0) sleepTime else 0 + } + + private def setOutputFormat(opts: ConsumerGroupCommandOptions) { + if (opts.options.has(opts.outputFormatOpt)) { + val outputFormatOptVal = opts.options.valueOf(opts.outputFormatOpt).trim + try { + outputFormat = OutputFormatType.withName(outputFormatOptVal.toUpperCase()); + } catch { + case t: NoSuchElementException => + System.err.println("Output format can only be specified as one of the following") + OutputFormatType.values.filter(x => x != OutputFormatType.NONE).foreach(System.err.println) + System.err.println() + opts.parser.printHelpOn(System.err) + System.exit(1) + } + } + } + class ConsumerGroupCommandOptions(args: Array[String]) { val ZkConnectDoc = "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over." @@ -270,6 +378,10 @@ object ConsumerGroupCommand { "Pass in just a topic to delete the given topic's partition offsets and ownership information " + "for every consumer group. For instance --topic t1" + nl + "WARNING: Only does deletions on consumer groups that are not active." + val loopDoc = "Loop interval (in seconds, greater than 0)" + val outputFormatDoc = "Set Output format to csv or json" + val numIterationsDoc = "Number of times the command should be looped" + val parser = new OptionParser val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc) .withRequiredArg @@ -290,6 +402,12 @@ object ConsumerGroupCommand { val listOpt = parser.accepts("list", ListDoc) val describeOpt = parser.accepts("describe", DescribeDoc) val deleteOpt = parser.accepts("delete", DeleteDoc) + val loopIntervalOpt = parser.accepts("loop", loopDoc). + withRequiredArg().ofType(classOf[java.lang.Integer]) + val outputFormatOpt = parser.accepts("output-format", outputFormatDoc). + withRequiredArg().ofType(classOf[String]) + val numIterationsOpt = parser.accepts("num-iterations", numIterationsDoc). + withRequiredArg().ofType(classOf[java.lang.Integer]) val options = parser.parse(args : _*) val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt) @@ -301,6 +419,10 @@ object ConsumerGroupCommand { CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt)) CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt)) + if (options.has(loopIntervalOpt)) + CommandLineUtils.checkRequiredArgs(parser, options, describeOpt) + if (options.has(numIterationsOpt)) + CommandLineUtils.checkRequiredArgs(parser, options, loopIntervalOpt) // check invalid args CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt) -- 2.3.2 (Apple Git-55)