diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index 88f824f..ae83e4d 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -29,7 +29,7 @@ import kafka.client.ClientUtils import kafka.network.BlockingChannel import kafka.api.PartitionOffsetRequestInfo import scala.Some - +import org.I0Itec.zkclient.exception.ZkNoNodeException object ConsumerOffsetChecker extends Logging { @@ -160,21 +160,28 @@ object ConsumerOffsetChecker extends Logging { topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*) val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq - val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) - + debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) channel.send(OffsetFetchRequest(group, topicPartitions)) val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) debug("Received offset fetch response %s.".format(offsetFetchResponse)) - + offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) - val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong - offsetMap.put(topicAndPartition, offset) + try { + val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong + offsetMap.put(topicAndPartition, offset) + } catch { + case z: ZkNoNodeException => + if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) + offsetMap.put(topicAndPartition,0) + else + throw z + } } else if (offsetAndMetadata.error == ErrorMapping.NoError) offsetMap.put(topicAndPartition, offsetAndMetadata.offset) @@ -217,4 +224,3 @@ object ConsumerOffsetChecker extends Logging { } } } -