diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d1e7c434e77859d746b8dc68dd5d5a3740425e79..161490ceb22cfadbce9d74f4e665e00de48a02c8 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -25,17 +25,25 @@ import kafka.consumer.SimpleConsumer import kafka.api.{OffsetFetchResponse, OffsetFetchRequest, OffsetRequest} import kafka.common.{OffsetMetadataAndError, ErrorMapping, BrokerNotAvailableException, TopicAndPartition} import scala.collection._ +import scala.util.control.Breaks._ import kafka.client.ClientUtils import kafka.network.BlockingChannel import kafka.api.PartitionOffsetRequestInfo -import scala.Some import org.I0Itec.zkclient.exception.ZkNoNodeException +private object OutputFormatType extends Enumeration { + type OutputFormatType = Value + val CSV, JSON, NONE = Value +} + +import OutputFormatType._ + object ConsumerOffsetChecker extends Logging { private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map() private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map() + private var outputFormat: OutputFormatType = OutputFormatType.NONE private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = { try { @@ -77,8 +85,12 @@ object ConsumerOffsetChecker extends Logging { val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) - println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), - owner match {case Some(ownerStr) => ownerStr case None => "none"})) + val printableOwner = owner match { + case Some(ownerStr) => ownerStr + case None => "none" + } + + printPartitionInfo(group, topic, pid, offsetOpt, logSize, lagString, printableOwner) case None => // ignore } case None => @@ -86,6 +98,34 @@ object ConsumerOffsetChecker extends Logging { } } + def printPartitionInfo(group: String, topic: String, pid: Int, offsetOpt: Option[Long], + logSize: Long, lagString: Option[String], printableOwner: String) { + outputFormat match { + case OutputFormatType.NONE => println("%-15s %-30s %-3s %-15s %-15s %-15s %s". + format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, + lagString.getOrElse("unknown"), printableOwner + ) + ) + case OutputFormatType.CSV => println("%s,%s,%s,%s,%s,%s,%s". + format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, + lagString.getOrElse("unknown"), printableOwner + ) + ) + case OutputFormatType.JSON => println( + (Json.encode( + Map( + "Group" -> group, + "Topic" -> topic, + "Pid" -> pid, + "Offset" -> offsetOpt.getOrElse("unknown"), + "logSize" -> logSize, + "Lag" -> lagString.getOrElse("unknown"), + "Owner" -> printableOwner) + )) + ) + } + } + private def processTopic(zkClient: ZkClient, group: String, topic: String) { topicPidMap.get(topic) match { case Some(pids) => @@ -98,14 +138,35 @@ object ConsumerOffsetChecker extends Logging { private def printBrokerInfo() { println("BROKER INFO") + for ((bid, consumerOpt) <- consumerMap) consumerOpt match { case Some(consumer) => - println("%s -> %s:%d".format(bid, consumer.host, consumer.port)) + outputFormat match { + case OutputFormatType.NONE => println("%s -> %s:%d". + format(bid, consumer.host, consumer.port)) + case OutputFormatType.CSV => println("%s,%s:%d". + format(bid, consumer.host, consumer.port)) + case OutputFormatType.JSON => println(Json.encode( + Map( + "ID" -> bid, + "Consumer" -> Json.encode( + Map( + "Host" -> consumer.host, + "Port" -> consumer.port + ) + )))) + } case None => // ignore } } + private def getRequiredSleepTime(startTime: Long, loopInterval: Int): Long = { + val curTime = System.currentTimeMillis() + val sleepTime = loopInterval - (curTime - startTime) + if (sleepTime > 0) sleepTime else 0 + } + def main(args: Array[String]) { val parser = new OptionParser() @@ -120,6 +181,10 @@ object ConsumerOffsetChecker extends Logging { withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(6000) val channelRetryBackoffMsOpt = parser.accepts("retry.backoff.ms", "Retry back-off to use for failed offset queries."). withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(3000) + val loopIntervalOpt = parser.accepts("loop", "Loop interval (in seconds, greater than 0)"). + withRequiredArg().ofType(classOf[java.lang.Integer]) + val outputFormatOpt = parser.accepts("output.format", "Set Output format to csv or json"). + withRequiredArg().ofType(classOf[String]) parser.accepts("broker-info", "Print broker info") parser.accepts("help", "Print this message.") @@ -146,62 +211,103 @@ object ConsumerOffsetChecker extends Logging { val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt)) else None + var loopInterval = if (options.has(loopIntervalOpt)) options.valueOf(loopIntervalOpt).toInt + else 0 + if (loopInterval < 0) { + System.err.println("Loop value must be greater than 0: %s".format(options.valueOf + (loopIntervalOpt))) + parser.printHelpOn(System.err) + System.exit(1) + } else { + loopInterval *= 1000 + } + + if (options.hasArgument("output.format")) { + val outputFormatOptVal = options.valueOf(outputFormatOpt).trim + try { + outputFormat = OutputFormatType.withName(outputFormatOptVal.toUpperCase()); + } catch { + case t: NoSuchElementException => + System.err.println("Output format can only be specified as one of the following") + def ofTypes(t: OutputFormatType) = t != OutputFormatType.NONE + OutputFormatType.values filter ofTypes foreach System.err.println + System.err.println() + parser.printHelpOn(System.err) + System.exit(1) + } + } + var zkClient: ZkClient = null var channel: BlockingChannel = null try { zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - val topicList = topics match { - case Some(x) => x.split(",").view.toList - case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir + "/owners").toList - } + breakable { + do { + val loopStartTime = System.currentTimeMillis() - topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*) - val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq - val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) - - debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) - channel.send(OffsetFetchRequest(group, topicPartitions)) - val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) - debug("Received offset fetch response %s.".format(offsetFetchResponse)) - - offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => - if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { - val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) - // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool - // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) - try { - val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong - offsetMap.put(topicAndPartition, offset) - } catch { - case z: ZkNoNodeException => - if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) - offsetMap.put(topicAndPartition,-1) - else - throw z + val topicList = topics match { + case Some(x) => x.split(",").view.toList + case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir + "/owners").toList } - } - else if (offsetAndMetadata.error == ErrorMapping.NoError) - offsetMap.put(topicAndPartition, offsetAndMetadata.offset) - else { - println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) - } - } - channel.disconnect() - println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner")) - topicList.sorted.foreach { - topic => processTopic(zkClient, group, topic) - } + topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq: _*) + val topicPartitions = topicPidMap.flatMap { case (topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _))}.toSeq + val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) - if (options.has("broker-info")) - printBrokerInfo() + debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) + channel.send(OffsetFetchRequest(group, topicPartitions)) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) + debug("Received offset fetch response %s.".format(offsetFetchResponse)) - for ((_, consumerOpt) <- consumerMap) - consumerOpt match { - case Some(consumer) => consumer.close() - case None => // ignore - } + offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => + if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { + val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) + // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool + // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) + try { + val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong + offsetMap.put(topicAndPartition, offset) + } catch { + case z: ZkNoNodeException => + if (ZkUtils.pathExists(zkClient, topicDirs.consumerOffsetDir)) + offsetMap.put(topicAndPartition, -1) + else + throw z + } + } + else if (offsetAndMetadata.error == ErrorMapping.NoError) + offsetMap.put(topicAndPartition, offsetAndMetadata.offset) + else { + println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) + } + } + channel.disconnect() + + if (outputFormat != OutputFormatType.JSON) { + outputFormat match { + case OutputFormatType.NONE => println("%-15s %-30s %-3s %-15s %-15s %-15s %s". + format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner")) + case OutputFormatType.CSV => println("%s,%s,%s,%s,%s,%s,%s". + format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner")) + } + } + topicList.sorted.foreach { + topic => processTopic(zkClient, group, topic) + } + + if (options.has("broker-info")) + printBrokerInfo() + + for ((_, consumerOpt) <- consumerMap) + consumerOpt match { + case Some(consumer) => consumer.close() + case None => // ignore + } + + Thread.sleep(getRequiredSleepTime(loopStartTime, loopInterval)) + } while (loopInterval > 0) + } } catch { case t: Throwable =>