diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 2b9438a..91c4d28 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -23,25 +23,27 @@ import joptsimple._ import java.net.URI import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} import kafka.common.TopicAndPartition +import kafka.client.ClientUtils +import kafka.utils.CommandLineUtils object GetOffsetShell { def main(args: Array[String]): Unit = { val parser = new OptionParser - val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") + val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") .withRequiredArg - .describedAs("kafka://hostname:port") + .describedAs("hostname:port,...,hostname:port") .ofType(classOf[String]) val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val partitionOpt = parser.accepts("partition", "partition id") + val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions") .withRequiredArg - .describedAs("partition id") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) + .describedAs("partition ids") + .ofType(classOf[String]) + .defaultsTo("") val timeOpt = parser.accepts("time", "timestamp of the offsets before that") .withRequiredArg .describedAs("timestamp/-1(latest)/-2(earliest)") @@ -51,28 +53,52 @@ object GetOffsetShell { .describedAs("count") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) + val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1000) val options = parser.parse(args : _*) - for(arg <- List(urlOpt, topicOpt, timeOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } + CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt) - val url = new URI(options.valueOf(urlOpt)) + val clientId = "GetOffsetShell" + val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) val topic = options.valueOf(topicOpt) - val partition = options.valueOf(partitionOpt).intValue + var partitionList = options.valueOf(partitionOpt) var time = options.valueOf(timeOpt).longValue val nOffsets = options.valueOf(nOffsetsOpt).intValue - val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000, "GetOffsetShell") - val topicAndPartition = TopicAndPartition(topic, partition) - val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets))) - val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets - println("get " + offsets.length + " results") - for (offset <- offsets) - println(offset) + val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() + + val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata + if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { + System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + + "kafka-list-topic.sh to verify") + System.exit(1) + } + val partitions = + if(partitionList == "") { + topicsMetadata.head.partitionsMetadata.map(_.partitionId) + } else { + partitionList.split(",").map(_.toInt).toSeq + } + partitions.foreach { partitionId => + val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId) + partitionMetadataOpt match { + case Some(metadata) => + metadata.leader match { + case Some(leader) => + val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId) + val topicAndPartition = TopicAndPartition(topic, partitionId) + val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets))) + val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets + + println("%s:%d:%s".format(topic, partitionId, offsets.mkString(","))) + case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId)) + } + case None => System.err.println("Error: partition %d does not exist".format(partitionId)) + } + } } } diff --git a/kafka-patch-review.py b/kafka-patch-review.py index f1d5192..2653465 100644 --- a/kafka-patch-review.py +++ b/kafka-patch-review.py @@ -90,7 +90,7 @@ def main(): comment="Created reviewboard " if not opt.reviewboard: - print 'Created a new reviewboard ',rb_url + print 'Created a new reviewboard ',rb_url,' against branch: ',opt.branch else: print 'Updated reviewboard',opt.reviewboard comment="Updated reviewboard "