From 7c4a14f91533b0f48c68fb8a3781411bbbde3eff Mon Sep 17 00:00:00 2001 From: Matt Warhaftig Date: Sun, 22 Mar 2015 15:37:43 -0400 Subject: [PATCH] KAFKA-1293 # Housekeeping/standardization for multiple command arguments. --- .../PreferredReplicaLeaderElectionCommand.scala | 5 +++++ .../kafka/admin/ReassignPartitionsCommand.scala | 7 ++++++- core/src/main/scala/kafka/admin/TopicCommand.scala | 4 ++-- .../main/scala/kafka/tools/ConsoleConsumer.scala | 15 +++++++++----- .../main/scala/kafka/tools/ConsoleProducer.scala | 15 ++++++++++---- .../scala/kafka/tools/ConsumerOffsetChecker.scala | 12 ++++++----- .../scala/kafka/tools/ConsumerPerformance.scala | 8 ++++++-- .../main/scala/kafka/tools/DumpLogSegments.scala | 12 +++++------ .../main/scala/kafka/tools/ExportZkOffsets.scala | 5 +++-- .../main/scala/kafka/tools/GetOffsetShell.scala | 6 +++--- .../main/scala/kafka/tools/ImportZkOffsets.scala | 5 +++-- core/src/main/scala/kafka/tools/JmxTool.scala | 4 ++-- .../main/scala/kafka/tools/KafkaMigrationTool.java | 18 ++++++++-------- core/src/main/scala/kafka/tools/MirrorMaker.scala | 12 +++++------ core/src/main/scala/kafka/tools/PerfConfig.scala | 8 ++++---- .../scala/kafka/tools/ProducerPerformance.scala | 24 +++++++++++++--------- .../main/scala/kafka/tools/ReplayLogProducer.scala | 11 +++++++--- .../kafka/tools/ReplicaVerificationTool.scala | 10 +++++++-- .../kafka/tools/SimpleConsumerPerformance.scala | 6 +++++- .../scala/kafka/tools/SimpleConsumerShell.scala | 18 ++++++++++------ .../scala/kafka/tools/StateChangeLogMerger.scala | 18 ++++++++-------- .../main/scala/kafka/tools/TestLogCleaning.scala | 7 ++++--- .../kafka/tools/VerifyConsumerRebalance.scala | 5 +++-- .../scala/other/kafka/TestLinearWriteSpeed.scala | 7 ++++--- .../test/scala/other/kafka/TestOffsetManager.scala | 7 ++++--- 25 files changed, 154 insertions(+), 95 deletions(-) diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 79b5e0a..9191623 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -40,12 +40,17 @@ object PreferredReplicaLeaderElectionCommand extends Logging { .withRequiredArg .describedAs("urls") .ofType(classOf[String]) + val helpOpt = parser.accepts("help", "Print this message.") if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "This tool causes leadership for each partition to be transferred back to the 'preferred replica'," + " it can be used to balance leadership among the servers.") val options = parser.parse(args : _*) + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 979992b..d61a276 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -177,7 +177,7 @@ object ReassignPartitionsCommand extends Logging { " Note that this only generates a candidate assignment, it does not execute it.") val executeOpt = parser.accepts("execute", "Kick off the reassignment as specified by the --reassignment-json-file option.") val verifyOpt = parser.accepts("verify", "Verify if the reassignment completed as specified by the --reassignment-json-file option.") - val reassignmentJsonFileOpt = parser.accepts("reassignment-json-file", "The JSON file with the partition reassignment configuration" + + val reassignmentJsonFileOpt = parser.accepts("reassignment-json-file", "The JSON file with the partition reassignment configuration. " + "The format to use is - \n" + "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }],\n\"version\":1\n}") .withRequiredArg @@ -194,11 +194,16 @@ object ReassignPartitionsCommand extends Logging { .withRequiredArg .describedAs("brokerlist") .ofType(classOf[String]) + val helpOpt = parser.accepts("help", "Print this message.") if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "This command moves topic partitions between replicas.") val options = parser.parse(args : _*) + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } } } diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index f400b71..3f741c0 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -245,10 +245,10 @@ object TopicCommand { .ofType(classOf[String]) val listOpt = parser.accepts("list", "List all available topics.") val createOpt = parser.accepts("create", "Create a new topic.") - val deleteOpt = parser.accepts("delete", "Delete a topic") + val deleteOpt = parser.accepts("delete", "Delete a topic.") val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.") val describeOpt = parser.accepts("describe", "List details for the given topics.") - val helpOpt = parser.accepts("help", "Print usage information.") + val helpOpt = parser.accepts("help", "Print this message.") val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " + "expression except for --create option") .withRequiredArg diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 910691e..befd101 100644 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -37,7 +37,7 @@ object ConsoleConsumer extends Logging { def main(args: Array[String]) { val parser = new OptionParser - val topicIdOpt = parser.accepts("topic", "The topic id to consume on.") + val topicIdOpt = parser.accepts("topic", "The topic id to consume from.") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) @@ -55,7 +55,7 @@ object ConsoleConsumer extends Logging { .describedAs("urls") .ofType(classOf[String]) - val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") + val consumerConfigOpt = parser.accepts("consumer-config", "Consumer config properties file.") .withRequiredArg .describedAs("config file") .ofType(classOf[String]) @@ -64,11 +64,11 @@ object ConsoleConsumer extends Logging { .describedAs("class") .ofType(classOf[String]) .defaultsTo(classOf[DefaultMessageFormatter].getName) - val messageFormatterArgOpt = parser.accepts("property") + val messageFormatterArgOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message formatter.") .withRequiredArg .describedAs("prop") .ofType(classOf[String]) - val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); + val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up."); val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + "start with the earliest message present in the log rather than the latest message.") val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") @@ -77,18 +77,23 @@ object ConsoleConsumer extends Logging { .ofType(classOf[java.lang.Integer]) val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + "skip it instead of halt.") - val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") + val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled.") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + "set, the csv metrics will be outputed here") .withRequiredArg .describedAs("metrics dictory") .ofType(classOf[java.lang.String]) + val helpOpt = parser.accepts("help", "Print this message.") if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") var groupIdPassed = true val options: OptionSet = tryParse(parser, args) + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) if (topicOrFilterOpt.size != 1) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 00265f9..0cfb8a5 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -131,7 +131,7 @@ object ConsoleProducer { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.") + val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") .withRequiredArg .describedAs("broker-list") .ofType(classOf[String]) @@ -166,17 +166,18 @@ object ConsoleProducer { .describedAs("queue_size") .ofType(classOf[java.lang.Integer]) .defaultsTo(10000) - val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue") + val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue.") .withRequiredArg .describedAs("queue enqueuetimeout ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(Int.MaxValue) - val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests") + val requestRequiredAcksOpt = parser.accepts("request-required-acks", "Number of acks required for producer request " + + "to complete") .withRequiredArg .describedAs("request required acks") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) - val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero") + val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.") .withRequiredArg .describedAs("request timeout ms") .ofType(classOf[java.lang.Integer]) @@ -237,8 +238,14 @@ object ConsoleProducer { .describedAs("producer_prop") .ofType(classOf[String]) val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") + val helpOpt = parser.accepts("help", "Print this message.") val options = parser.parse(args : _*) + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.") CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt) diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d1e7c43..53f5d1e 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -109,19 +109,21 @@ object ConsumerOffsetChecker extends Logging { def main(args: Array[String]) { val parser = new OptionParser() - val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect string."). + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over."). withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) val topicsOpt = parser.accepts("topic", "Comma-separated list of consumer topics (all topics if absent)."). withRequiredArg().ofType(classOf[String]) - val groupOpt = parser.accepts("group", "Consumer group."). + val groupOpt = parser.accepts("group", "REQUIRED: Consumer group."). withRequiredArg().ofType(classOf[String]) - val channelSocketTimeoutMsOpt = parser.accepts("socket.timeout.ms", "Socket timeout to use when querying for offsets."). + val channelSocketTimeoutMsOpt = parser.accepts("socket-timeout-ms", "Socket timeout to use when querying for offsets."). withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(6000) - val channelRetryBackoffMsOpt = parser.accepts("retry.backoff.ms", "Retry back-off to use for failed offset queries."). + val channelRetryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. " + + "Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata."). withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(3000) - parser.accepts("broker-info", "Print broker info") + parser.accepts("broker-info", "Print broker info.") parser.accepts("help", "Print this message.") if(args.length == 0) diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index c39c067..7d9e1d4 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -145,12 +145,12 @@ object ConsumerPerformance { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val groupIdOpt = parser.accepts("group", "The group id to consume on.") + val groupIdOpt = parser.accepts("group", "Consumer group.") .withRequiredArg .describedAs("gid") .defaultsTo("perf-consumer-" + new Random().nextInt(100000)) .ofType(classOf[String]) - val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") + val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.") .withRequiredArg .describedAs("size") .ofType(classOf[java.lang.Integer]) @@ -175,6 +175,10 @@ object ConsumerPerformance { val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.") val options = parser.parse(args: _*) + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index fe2cc11..917ccb7 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -30,9 +30,9 @@ object DumpLogSegments { def main(args: Array[String]) { val parser = new OptionParser - val printOpt = parser.accepts("print-data-log", "if set, printing the messages content when dumping data logs") - val verifyOpt = parser.accepts("verify-index-only", "if set, just verify the index log without printing its content") - val filesOpt = parser.accepts("files", "REQUIRED: The comma separated list of data and index log files to be dumped") + val printOpt = parser.accepts("print-data-log", "If set, printing the messages content when dumping data logs.") + val verifyOpt = parser.accepts("verify-index-only", "If set, just verify the index log without printing its content.") + val filesOpt = parser.accepts("files", "REQUIRED: The comma separated list of data and index log files to be dumped.") .withRequiredArg .describedAs("file1, file2, ...") .ofType(classOf[String]) @@ -41,12 +41,12 @@ object DumpLogSegments { .describedAs("size") .ofType(classOf[java.lang.Integer]) .defaultsTo(5 * 1024 * 1024) - val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration") - val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.") + val deepIterationOpt = parser.accepts("deep-iteration", "If set, uses deep instead of shallow iteration.") + val valueDecoderOpt = parser.accepts("value-decoder-class", "If set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.") .withOptionalArg() .ofType(classOf[java.lang.String]) .defaultsTo("kafka.serializer.StringDecoder") - val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.") + val keyDecoderOpt = parser.accepts("key-decoder-class", "If set, used to deserialize the keys. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.") .withOptionalArg() .ofType(classOf[java.lang.String]) .defaultsTo("kafka.serializer.StringDecoder") diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala index 4d051bc..ea0ce2f 100644 --- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala @@ -44,14 +44,15 @@ object ExportZkOffsets extends Logging { def main(args: Array[String]) { val parser = new OptionParser - val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") + val zkConnectOpt = parser.accepts("zookeeper", "The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") .withRequiredArg() .defaultsTo("localhost:2181") .ofType(classOf[String]) val groupOpt = parser.accepts("group", "Consumer group.") .withRequiredArg() .ofType(classOf[String]) - val outFileOpt = parser.accepts("output-file", "Output file") + val outFileOpt = parser.accepts("output-file", "REQUIRED: Output file.") .withRequiredArg() .ofType(classOf[String]) parser.accepts("help", "Print this message.") diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 3d9293e..11e7ec9 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -38,16 +38,16 @@ object GetOffsetShell { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions") + val partitionOpt = parser.accepts("partitions", "Comma separated list of partition ids. If not specified, it will find offsets for all partitions.") .withRequiredArg .describedAs("partition ids") .ofType(classOf[String]) .defaultsTo("") - val timeOpt = parser.accepts("time", "timestamp of the offsets before that") + val timeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.") .withRequiredArg .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) - val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned") + val nOffsetsOpt = parser.accepts("offsets", "Number of offsets returned.") .withRequiredArg .describedAs("count") .ofType(classOf[java.lang.Integer]) diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index abe0972..dcece09 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -44,11 +44,12 @@ object ImportZkOffsets extends Logging { def main(args: Array[String]) { val parser = new OptionParser - val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") + val zkConnectOpt = parser.accepts("zookeeper", "The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") .withRequiredArg() .defaultsTo("localhost:2181") .ofType(classOf[String]) - val inFileOpt = parser.accepts("input-file", "Input file") + val inFileOpt = parser.accepts("input-file", "REQUIRED: Input file.") .withRequiredArg() .ofType(classOf[String]) parser.accepts("help", "Print this message.") diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index 1d1a120..dd69bc3 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -46,12 +46,12 @@ object JmxTool extends Logging { .withRequiredArg .describedAs("name") .ofType(classOf[String]) - val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats.") + val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(2000) - val helpOpt = parser.accepts("help", "Print usage information.") + val helpOpt = parser.accepts("help", "Print this message.") val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + "See java.text.SimpleDateFormat for options.") .withRequiredArg diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index 026d819..c381c06 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -86,57 +86,57 @@ public class KafkaMigrationTool public static void main(String[] args) throws InterruptedException, IOException { OptionParser parser = new OptionParser(); ArgumentAcceptingOptionSpec consumerConfigOpt - = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.") + = parser.accepts("consumer-config", "REQUIRED: Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.") .withRequiredArg() .describedAs("config file") .ofType(String.class); ArgumentAcceptingOptionSpec producerConfigOpt - = parser.accepts("producer.config", "Producer config.") + = parser.accepts("producer-config", "REQUIRED: Producer config.") .withRequiredArg() .describedAs("config file") .ofType(String.class); ArgumentAcceptingOptionSpec numProducersOpt - = parser.accepts("num.producers", "Number of producer instances") + = parser.accepts("num-producers", "Number of producer instances.") .withRequiredArg() .describedAs("Number of producers") .ofType(Integer.class) .defaultsTo(1); ArgumentAcceptingOptionSpec zkClient01JarOpt - = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file") + = parser.accepts("zkclient.01.jar", "REQUIRED: zkClient 0.1 jar file.") .withRequiredArg() .describedAs("zkClient 0.1 jar file required by Kafka 0.7") .ofType(String.class); ArgumentAcceptingOptionSpec kafka07JarOpt - = parser.accepts("kafka.07.jar", "Kafka 0.7 jar file") + = parser.accepts("kafka.07.jar", "REQUIRED: Kafka 0.7 jar file.") .withRequiredArg() .describedAs("kafka 0.7 jar") .ofType(String.class); ArgumentAcceptingOptionSpec numStreamsOpt - = parser.accepts("num.streams", "Number of consumer streams") + = parser.accepts("num-streams", "Number of consumer streams.") .withRequiredArg() .describedAs("Number of consumer threads") .ofType(Integer.class) .defaultsTo(1); ArgumentAcceptingOptionSpec whitelistOpt - = parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster") + = parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster.") .withRequiredArg() .describedAs("Java regex (String)") .ofType(String.class); ArgumentAcceptingOptionSpec blacklistOpt - = parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster") + = parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster.") .withRequiredArg() .describedAs("Java regex (String)") .ofType(String.class); ArgumentAcceptingOptionSpec queueSizeOpt - = parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer") + = parser.accepts("queue-size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer.") .withRequiredArg() .describedAs("Queue size in terms of number of messages") .ofType(Integer.class) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 4f3c4c8..a78b2b3 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -78,20 +78,20 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { info("Starting mirror maker") val parser = new OptionParser - val consumerConfigOpt = parser.accepts("consumer.config", - "Embedded consumer config for consuming from the source cluster.") + val consumerConfigOpt = parser.accepts("consumer-config", + "REQUIRED: Embedded consumer config for consuming from the source cluster.") .withRequiredArg() .describedAs("config file") .ofType(classOf[String]) - val producerConfigOpt = parser.accepts("producer.config", - "Embedded producer config.") + val producerConfigOpt = parser.accepts("producer-config", + "REQUIRED: Embedded producer config.") .withRequiredArg() .describedAs("config file") .ofType(classOf[String]) - val numStreamsOpt = parser.accepts("num.streams", - "Number of consumption streams.") + val numStreamsOpt = parser.accepts("num-streams", + "Number of consumer streams.") .withRequiredArg() .describedAs("Number of threads") .ofType(classOf[java.lang.Integer]) diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala index d073acf..87fccff 100644 --- a/core/src/main/scala/kafka/tools/PerfConfig.scala +++ b/core/src/main/scala/kafka/tools/PerfConfig.scala @@ -22,7 +22,7 @@ import joptsimple.OptionParser class PerfConfig(args: Array[String]) { val parser = new OptionParser - val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume") + val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.") .withRequiredArg .describedAs("count") .ofType(classOf[java.lang.Long]) @@ -39,8 +39,8 @@ class PerfConfig(args: Array[String]) { .ofType(classOf[String]) .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS") val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + - "interval as configured by reporting-interval") - val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ") + "interval as configured by reporting-interval.") + val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats. ") val messageSizeOpt = parser.accepts("message-size", "The size of each message.") .withRequiredArg .describedAs("size") @@ -51,7 +51,7 @@ class PerfConfig(args: Array[String]) { .describedAs("size") .ofType(classOf[java.lang.Integer]) .defaultsTo(200) - val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") + val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec.") .withRequiredArg .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3") .ofType(classOf[java.lang.Integer]) diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index bc25cd2..30c4aa7 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -70,28 +70,28 @@ object ProducerPerformance extends Logging { } class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) { - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.") + val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") .withRequiredArg .describedAs("hostname:port,..,hostname:port") .ofType(classOf[String]) - val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to") + val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to.") .withRequiredArg .describedAs("topic1,topic2..") .ofType(classOf[String]) - val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") + val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.") .withRequiredArg() .ofType(classOf[java.lang.Integer]) .defaultsTo(3000) - val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number") + val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number.") .withRequiredArg() .ofType(classOf[java.lang.Integer]) .defaultsTo(3) - val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds") + val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds.") .withRequiredArg() .ofType(classOf[java.lang.Integer]) .defaultsTo(100) - val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + - "to complete") + val producerRequestRequiredAcksOpt = parser.accepts("request-required-acks", "Number of acks required for producer request " + + "to complete.") .withRequiredArg() .ofType(classOf[java.lang.Integer]) .defaultsTo(-1) @@ -108,20 +108,24 @@ object ProducerPerformance extends Logging { .withRequiredArg() .describedAs("initial message id") .ofType(classOf[java.lang.Integer]) - val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends") + val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends.") .withRequiredArg() .describedAs("message send time gap") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) - val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") + val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled.") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") + "set, the csv metrics will be outputed here.") .withRequiredArg .describedAs("metrics dictory") .ofType(classOf[java.lang.String]) val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") val options = parser.parse(args: _*) + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt) val topicsStr = options.valueOf(topicsOpt) diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 2b8537b..737e674 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -70,15 +70,15 @@ object ReplayLogProducer extends Logging { .describedAs("zookeeper url") .ofType(classOf[String]) .defaultsTo("127.0.0.1:2181") - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: the broker list must be specified.") + val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") .withRequiredArg .describedAs("hostname:port") .ofType(classOf[String]) - val inputTopicOpt = parser.accepts("inputtopic", "REQUIRED: The topic to consume from.") + val inputTopicOpt = parser.accepts("input-topic", "REQUIRED: The topic to consume from.") .withRequiredArg .describedAs("input-topic") .ofType(classOf[String]) - val outputTopicOpt = parser.accepts("outputtopic", "REQUIRED: The topic to produce to") + val outputTopicOpt = parser.accepts("output-topic", "REQUIRED: The topic to produce to.") .withRequiredArg .describedAs("output-topic") .ofType(classOf[String]) @@ -103,8 +103,13 @@ object ReplayLogProducer extends Logging { .describedAs("producer properties") .ofType(classOf[String]) val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") + val helpOpt = parser.accepts("help", "Print this message.") val options = parser.parse(args : _*) + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, inputTopicOpt) diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index ba6ddd7..d091979 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -77,7 +77,7 @@ object ReplicaVerificationTool extends Logging { .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) - val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.") + val topicWhiteListOpt = parser.accepts("whitelist", "White list of topics to verify replica consistency. Defaults to all topics.") .withRequiredArg .describedAs("Java regex (String)") .ofType(classOf[String]) @@ -87,16 +87,22 @@ object ReplicaVerificationTool extends Logging { .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) .defaultsTo(-1L) - val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.") + val reportIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Long]) .defaultsTo(30 * 1000L) + val helpOpt = parser.accepts("help", "Print this message.") if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") val options = parser.parse(args : _*) + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) val regex = options.valueOf(topicWhiteListOpt) diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala index 900f7df..52f0556 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala @@ -128,7 +128,7 @@ object SimpleConsumerPerformance { .describedAs("partition") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) - val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.") + val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size of each request.") .withRequiredArg .describedAs("bytes") .ofType(classOf[java.lang.Integer]) @@ -140,6 +140,10 @@ object SimpleConsumerPerformance { .defaultsTo("SimpleConsumerPerformanceClient") val options = parser.parse(args : _*) + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, urlOpt) diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index b4f903b..7580cfe 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -44,7 +44,7 @@ object SimpleConsumerShell extends Logging { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val partitionIdOpt = parser.accepts("partition", "The partition to consume from.") + val partitionIdOpt = parser.accepts("partition", "REQUIRED: The topic partition to consume from.") .withRequiredArg .describedAs("partition") .ofType(classOf[java.lang.Integer]) @@ -54,7 +54,7 @@ object SimpleConsumerShell extends Logging { .describedAs("replica id") .ofType(classOf[java.lang.Integer]) .defaultsTo(UseLeaderReplica) - val offsetOpt = parser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end") + val offsetOpt = parser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end.") .withRequiredArg .describedAs("consume offset") .ofType(classOf[java.lang.Long]) @@ -64,7 +64,7 @@ object SimpleConsumerShell extends Logging { .describedAs("clientId") .ofType(classOf[String]) .defaultsTo("SimpleConsumerShell") - val fetchSizeOpt = parser.accepts("fetchsize", "The fetch size of each request.") + val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.") .withRequiredArg .describedAs("fetchsize") .ofType(classOf[java.lang.Integer]) @@ -74,17 +74,17 @@ object SimpleConsumerShell extends Logging { .describedAs("class") .ofType(classOf[String]) .defaultsTo(classOf[DefaultMessageFormatter].getName) - val messageFormatterArgOpt = parser.accepts("property") + val messageFormatterArgOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message formatter.") .withRequiredArg .describedAs("prop") .ofType(classOf[String]) - val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator") + val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator.") val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) - val maxMessagesOpt = parser.accepts("max-messages", "The number of messages to consume") + val maxMessagesOpt = parser.accepts("max-messages", "The number of messages to consume.") .withRequiredArg .describedAs("max-messages") .ofType(classOf[java.lang.Integer]) @@ -93,11 +93,17 @@ object SimpleConsumerShell extends Logging { "skip it instead of halt.") val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages") + val helpOpt = parser.accepts("help", "Print this message.") if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.") val options = parser.parse(args : _*) + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, partitionIdOpt) val topic = options.valueOf(topicOpt) diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index b34b8c7..1321e1c 100644 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -57,35 +57,35 @@ object StateChangeLogMerger extends Logging { // Parse input arguments. val parser = new OptionParser - val filesOpt = parser.accepts("logs", "Comma separated list of state change logs or a regex for the log file names") + val filesOpt = parser.accepts("logs", "Comma separated list of state change logs or a regex for the log file names.") .withRequiredArg .describedAs("file1,file2,...") .ofType(classOf[String]) - val regexOpt = parser.accepts("logs-regex", "Regex to match the state change log files to be merged") + val regexOpt = parser.accepts("logs-regex", "Regex to match the state change log files to be merged.") .withRequiredArg .describedAs("for example: /tmp/state-change.log*") .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "The topic whose state change logs should be merged") + val topicOpt = parser.accepts("topic", "The topic whose state change logs should be merged.") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids whose state change logs should be merged") + val partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids whose state change logs should be merged.") .withRequiredArg .describedAs("0,1,2,...") .ofType(classOf[String]) - val startTimeOpt = parser.accepts("start-time", "The earliest timestamp of state change log entries to be merged") + val startTimeOpt = parser.accepts("start-time", "The earliest timestamp of state change log entries to be merged.") .withRequiredArg - .describedAs("start timestamp in the format " + dateFormat) + .describedAs("start timestamp in the format " + dateFormat.toPattern()) .ofType(classOf[String]) .defaultsTo("0000-00-00 00:00:00,000") - val endTimeOpt = parser.accepts("end-time", "The latest timestamp of state change log entries to be merged") + val endTimeOpt = parser.accepts("end-time", "The latest timestamp of state change log entries to be merged.") .withRequiredArg - .describedAs("end timestamp in the format " + dateFormat) + .describedAs("end timestamp in the format " + dateFormat.toPattern()) .ofType(classOf[String]) .defaultsTo("9999-12-31 23:59:59,999") if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "A tool for merging the log files from several brokers to reconnstruct a unified history of what happened.") + CommandLineUtils.printUsageAndDie(parser, "A tool for merging the log files from several brokers to reconstruct a unified history of what happened.") val options = parser.parse(args : _*) diff --git a/core/src/main/scala/kafka/tools/TestLogCleaning.scala b/core/src/main/scala/kafka/tools/TestLogCleaning.scala index af496f7..caf1588 100644 --- a/core/src/main/scala/kafka/tools/TestLogCleaning.scala +++ b/core/src/main/scala/kafka/tools/TestLogCleaning.scala @@ -47,7 +47,7 @@ object TestLogCleaning { def main(args: Array[String]) { val parser = new OptionParser - val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.") + val numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to send or consume.") .withRequiredArg .describedAs("count") .ofType(classOf[java.lang.Long]) @@ -57,7 +57,7 @@ object TestLogCleaning { .describedAs("count") .ofType(classOf[java.lang.Integer]) .defaultsTo(5) - val brokerOpt = parser.accepts("broker", "Url to connect to.") + val brokerOpt = parser.accepts("broker", "REQUIRED: Url to connect to.") .withRequiredArg .describedAs("url") .ofType(classOf[String]) @@ -71,7 +71,8 @@ object TestLogCleaning { .describedAs("percent") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) - val zkConnectOpt = parser.accepts("zk", "Zk url.") + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") .withRequiredArg .describedAs("url") .ofType(classOf[String]) diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala index aef8361..39f4355 100644 --- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala +++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala @@ -25,9 +25,10 @@ object VerifyConsumerRebalance extends Logging { def main(args: Array[String]) { val parser = new OptionParser() - val zkConnectOpt = parser.accepts("zookeeper.connect", "ZooKeeper connect string."). + val zkConnectOpt = parser.accepts("zookeeper", "The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over."). withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) - val groupOpt = parser.accepts("group", "Consumer group."). + val groupOpt = parser.accepts("group", "REQUIRED: Consumer group."). withRequiredArg().ofType(classOf[String]) parser.accepts("help", "Print this message.") diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 7211c25..a000b0e 100644 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -57,7 +57,7 @@ object TestLinearWriteSpeed { .describedAs("num_files") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.") + val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Long]) @@ -67,12 +67,13 @@ object TestLinearWriteSpeed { .describedAs("mb") .ofType(classOf[java.lang.Integer]) .defaultsTo(Integer.MAX_VALUE) - val flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes") + val flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes.") .withRequiredArg() .describedAs("message_count") .ofType(classOf[java.lang.Long]) .defaultsTo(Long.MaxValue) - val compressionCodecOpt = parser.accepts("compression", "The compression codec to use") + val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', or 'lz4'." + + "If specified without value, then it defaults to 'none'") .withRequiredArg .describedAs("codec") .ofType(classOf[java.lang.String]) diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index a106379..ed6bfe5 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -179,7 +179,8 @@ object TestOffsetManager { def main(args: Array[String]) { val parser = new OptionParser - val zookeeperOpt = parser.accepts("zookeeper", "The ZooKeeper connection URL.") + val zookeeperOpt = parser.accepts("zookeeper", "The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") .withRequiredArg .describedAs("ZooKeeper URL") .ofType(classOf[java.lang.String]) @@ -203,13 +204,13 @@ object TestOffsetManager { .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val numThreadsOpt = parser.accepts("thread-count", "Number of commit threads.") + val numThreadsOpt = parser.accepts("threads", "Number of commit threads.") .withRequiredArg .describedAs("threads") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val reportingIntervalOpt = parser.accepts("reporting-interval-ms", "Interval at which stats are reported.") + val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") .withRequiredArg .describedAs("interval (ms)") .ofType(classOf[java.lang.Integer]) -- 1.9.5 (Apple Git-50.3)