From fef530784971f2ef9d66cf8166c153ea19c6df14 Mon Sep 17 00:00:00 2001 From: jqin Date: Fri, 13 Feb 2015 13:40:16 -0800 Subject: [PATCH 1/2] Fix for KAFKA-1951 --- .../scala/kafka/tools/ConsumerOffsetChecker.scala | 42 ++++++++++++++-------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d1e7c43..93176fd 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -63,7 +63,9 @@ object ConsumerOffsetChecker extends Logging { private def processPartition(zkClient: ZkClient, group: String, topic: String, pid: Int) { val topicPartition = TopicAndPartition(topic, pid) + val zkTopicPartitionKey = TopicAndPartition("(ZK)"+topic, pid) val offsetOpt = offsetMap.get(topicPartition) + val zkOffsetOpt = offsetMap.get(zkTopicPartitionKey) val groupDirs = new ZKGroupTopicDirs(group, topic) val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/%s".format(pid))._1 ZkUtils.getLeaderForPartition(zkClient, topic, pid) match { @@ -77,8 +79,11 @@ object ConsumerOffsetChecker extends Logging { val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) - println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), - owner match {case Some(ownerStr) => ownerStr case None => "none"})) + val zkLagString = zkOffsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) + println("%-15s %-30s %-3s %-30s %-15s %-30s %s".format( + group, topic, pid, offsetOpt.getOrElse("unknown")+"("+zkOffsetOpt.getOrElse("unknown")+")", logSize, + lagString.getOrElse("unknown") + "(" + zkLagString.getOrElse("unknown") + ")", + owner match {case Some(ownerStr) => ownerStr case None => "none"})) case None => // ignore } case None => @@ -151,6 +156,12 @@ object ConsumerOffsetChecker extends Logging { try { zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + val groupDirs = new ZKGroupDirs(group) + if (!ZkUtils.pathExists(zkClient, groupDirs.consumerGroupDir)) { + println("Consumer group %s does not exist.".format(group)) + throw new Throwable("consumer group does not exist") + } + val topicList = topics match { case Some(x) => x.split(",").view.toList case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir + "/owners").toList @@ -166,30 +177,31 @@ object ConsumerOffsetChecker extends Logging { debug("Received offset fetch response %s.".format(offsetFetchResponse)) offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => - if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { - val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) + if (offsetAndMetadata.error == ErrorMapping.NoError) + offsetMap.put(topicAndPartition, offsetAndMetadata.offset) + else + println("Could not fetch offset for %s from Kafka due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) + + val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) + // Only continue to check zookeeper if consumer group has offsets path in zookeeper + if (ZkUtils.pathExists(zkClient, topicDirs.consumerOffsetDir)) { + val zkTopicAndPartitionKey = new TopicAndPartition("(ZK)" + topicAndPartition.topic, topicAndPartition.partition) // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong - offsetMap.put(topicAndPartition, offset) + offsetMap.put(zkTopicAndPartitionKey, offset) } catch { case z: ZkNoNodeException => - if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) - offsetMap.put(topicAndPartition,-1) - else - throw z + offsetMap.put(zkTopicAndPartitionKey, -1) + case z: Throwable => + println("Could not fetch offset for %s from ZooKeeper due to %s.".format(topicAndPartition, z.getMessage())) } } - else if (offsetAndMetadata.error == ErrorMapping.NoError) - offsetMap.put(topicAndPartition, offsetAndMetadata.offset) - else { - println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) - } } channel.disconnect() - println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner")) + println("%-15s %-30s %-3s %-30s %-15s %-30s %s".format("Group", "Topic", "Pid", "Offset(ZK Offset)", "logSize", "Lag(ZK Lag)", "Owner")) topicList.sorted.foreach { topic => processTopic(zkClient, group, topic) } -- 1.8.3.4 (Apple Git-47) From 686aed91f35e1e448cafd2fb950cf4f00b722a2b Mon Sep 17 00:00:00 2001 From: jqin Date: Mon, 16 Feb 2015 16:34:34 -0800 Subject: [PATCH 2/2] Addressed Gwen's comment. --- core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index 93176fd..84fa04d 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -81,8 +81,8 @@ object ConsumerOffsetChecker extends Logging { val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) val zkLagString = zkOffsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) println("%-15s %-30s %-3s %-30s %-15s %-30s %s".format( - group, topic, pid, offsetOpt.getOrElse("unknown")+"("+zkOffsetOpt.getOrElse("unknown")+")", logSize, - lagString.getOrElse("unknown") + "(" + zkLagString.getOrElse("unknown") + ")", + group, topic, pid, offsetOpt.getOrElse("unknown")+","+zkOffsetOpt.getOrElse("unknown"), logSize, + lagString.getOrElse("unknown") + "," + zkLagString.getOrElse("unknown"), owner match {case Some(ownerStr) => ownerStr case None => "none"})) case None => // ignore } @@ -201,7 +201,7 @@ object ConsumerOffsetChecker extends Logging { } channel.disconnect() - println("%-15s %-30s %-3s %-30s %-15s %-30s %s".format("Group", "Topic", "Pid", "Offset(ZK Offset)", "logSize", "Lag(ZK Lag)", "Owner")) + println("%-15s %-30s %-3s %-30s %-15s %-30s %s".format("Group", "Topic", "Pid", "Offset,ZK_Offset", "logSize", "Lag,ZK_Lag", "Owner")) topicList.sorted.foreach { topic => processTopic(zkClient, group, topic) } -- 1.8.3.4 (Apple Git-47)