diff --git core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d1e7c43..e782fde 100644 --- core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -25,17 +25,25 @@ import kafka.consumer.SimpleConsumer import kafka.api.{OffsetFetchResponse, OffsetFetchRequest, OffsetRequest} import kafka.common.{OffsetMetadataAndError, ErrorMapping, BrokerNotAvailableException, TopicAndPartition} import scala.collection._ +import scala.util.control.Breaks._ import kafka.client.ClientUtils import kafka.network.BlockingChannel import kafka.api.PartitionOffsetRequestInfo -import scala.Some import org.I0Itec.zkclient.exception.ZkNoNodeException +private object OutputFormatType extends Enumeration { + type OutputFormatType = Value + val CSV, JSON, NONE = Value +} + +import OutputFormatType._ + object ConsumerOffsetChecker extends Logging { private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map() private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map() + private var outputFormat: OutputFormatType = OutputFormatType.NONE private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = { try { @@ -77,8 +85,30 @@ object ConsumerOffsetChecker extends Logging { val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) - println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), - owner match {case Some(ownerStr) => ownerStr case None => "none"})) + val printableOwner = owner match { + case Some(ownerStr) => ownerStr + case None => "none" + } + + outputFormat match { + case OutputFormatType.NONE => println("%-15s %-30s %-3s %-15s %-15s %-15s %s". + format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, + lagString.getOrElse("unknown"), printableOwner + ) + ) + case OutputFormatType.CSV => println("%s,%s,%s,%s,%s,%s,%s". + format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, + lagString.getOrElse("unknown"), printableOwner + ) + ) + case OutputFormatType.JSON => println( + ("{\"Group\":\"%s\", \"Topic\":\"%s\", \"Pid\":\"%s\", \"Offset\":\"%s\", " + + "\"logSize\":\"%s\", \"Lag\":\"%s\", \"Owner\":\"%s\"}"). + format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, + lagString.getOrElse("unknown"), printableOwner + ) + ) + } case None => // ignore } case None => @@ -97,15 +127,31 @@ object ConsumerOffsetChecker extends Logging { } private def printBrokerInfo() { - println("BROKER INFO") + if (outputFormat != OutputFormatType.JSON) { + println("BROKER INFO") + } for ((bid, consumerOpt) <- consumerMap) consumerOpt match { case Some(consumer) => - println("%s -> %s:%d".format(bid, consumer.host, consumer.port)) + outputFormat match { + case OutputFormatType.NONE => println("%s -> %s:%d". + format(bid, consumer.host, consumer.port)) + case OutputFormatType.CSV => println("%s,%s:%d". + format(bid, consumer.host, consumer.port)) + case OutputFormatType.JSON => println(("{\"Broker Info\": {\"ID\": \"%s\", " + + "\"Consumer\": {\"Host\":\"%s\",\"Port\":\"%d\"}}}"). + format(bid, consumer.host, consumer.port)) + } case None => // ignore } } + private def getRequiredSleepTime(startTime: Long, loopInterval: Int): Long = { + val curTime = System.currentTimeMillis() + val sleepTime = loopInterval - (curTime - startTime) + if (sleepTime > 0) sleepTime else 0 + } + def main(args: Array[String]) { val parser = new OptionParser() @@ -120,6 +166,10 @@ object ConsumerOffsetChecker extends Logging { withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(6000) val channelRetryBackoffMsOpt = parser.accepts("retry.backoff.ms", "Retry back-off to use for failed offset queries."). withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(3000) + val loopIntervalOpt = parser.accepts("loop", "Loop interval (in seconds, greater than 0)"). + withRequiredArg().ofType(classOf[java.lang.Integer]) + val outputFormatOpt = parser.accepts("output.format", "Set Output format to csv or json"). + withRequiredArg().ofType(classOf[String]) parser.accepts("broker-info", "Print broker info") parser.accepts("help", "Print this message.") @@ -146,62 +196,104 @@ object ConsumerOffsetChecker extends Logging { val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt)) else None + var loopInterval = if (options.has(loopIntervalOpt)) options.valueOf(loopIntervalOpt).toInt + else 0 + if (loopInterval < 0) { + System.err.println("Loop value must be greater than 0: %s".format(options.valueOf + (loopIntervalOpt))) + parser.printHelpOn(System.err) + System.exit(1) + } else { + loopInterval *= 1000 + } + + if (options.hasArgument("output.format")) { + val outputFormatOptVal = options.valueOf(outputFormatOpt).trim + outputFormatOptVal.toLowerCase() match { + case "csv" => outputFormat = OutputFormatType.CSV + case "json" => outputFormat = OutputFormatType.JSON + case _ => { + System.err.println("Output format can only be specified as one of the following") + def ofTypes(t: OutputFormatType) = t != OutputFormatType.NONE + OutputFormatType.values filter ofTypes foreach System.err.println + System.err.println() + parser.printHelpOn(System.err) + System.exit(1) + } + } + } + var zkClient: ZkClient = null var channel: BlockingChannel = null try { zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - val topicList = topics match { - case Some(x) => x.split(",").view.toList - case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir + "/owners").toList - } + breakable { + do { + val loopStartTime = System.currentTimeMillis() - topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*) - val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq - val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) - - debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) - channel.send(OffsetFetchRequest(group, topicPartitions)) - val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) - debug("Received offset fetch response %s.".format(offsetFetchResponse)) - - offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => - if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { - val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) - // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool - // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) - try { - val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong - offsetMap.put(topicAndPartition, offset) - } catch { - case z: ZkNoNodeException => - if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) - offsetMap.put(topicAndPartition,-1) - else - throw z + val topicList = topics match { + case Some(x) => x.split(",").view.toList + case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir + "/owners").toList } - } - else if (offsetAndMetadata.error == ErrorMapping.NoError) - offsetMap.put(topicAndPartition, offsetAndMetadata.offset) - else { - println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) - } - } - channel.disconnect() - println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner")) - topicList.sorted.foreach { - topic => processTopic(zkClient, group, topic) - } + topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq: _*) + val topicPartitions = topicPidMap.flatMap { case (topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _))}.toSeq + val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) - if (options.has("broker-info")) - printBrokerInfo() + debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) + channel.send(OffsetFetchRequest(group, topicPartitions)) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) + debug("Received offset fetch response %s.".format(offsetFetchResponse)) - for ((_, consumerOpt) <- consumerMap) - consumerOpt match { - case Some(consumer) => consumer.close() - case None => // ignore - } + offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => + if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { + val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) + // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool + // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) + try { + val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong + offsetMap.put(topicAndPartition, offset) + } catch { + case z: ZkNoNodeException => + if (ZkUtils.pathExists(zkClient, topicDirs.consumerOffsetDir)) + offsetMap.put(topicAndPartition, -1) + else + throw z + } + } + else if (offsetAndMetadata.error == ErrorMapping.NoError) + offsetMap.put(topicAndPartition, offsetAndMetadata.offset) + else { + println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) + } + } + channel.disconnect() + + if (outputFormat != OutputFormatType.JSON) { + outputFormat match { + case OutputFormatType.NONE => println("%-15s %-30s %-3s %-15s %-15s %-15s %s". + format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner")) + case OutputFormatType.CSV => println("%s,%s,%s,%s,%s,%s,%s". + format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner")) + } + } + topicList.sorted.foreach { + topic => processTopic(zkClient, group, topic) + } + + if (options.has("broker-info")) + printBrokerInfo() + + for ((_, consumerOpt) <- consumerMap) + consumerOpt match { + case Some(consumer) => consumer.close() + case None => // ignore + } + + Thread.sleep(getRequiredSleepTime(loopStartTime, loopInterval)) + } while (loopInterval > 0) + } } catch { case t: Throwable =>