From ce0665ffda6d9a36e0ea0c726a08f9eafd1e7499 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Wed, 8 Oct 2014 15:12:08 -0700 Subject: [PATCH] KAFKA-1680 Standardize command line argument parsing and usage messages. Handle command line parsing in a CommandLineUtils helper that properly catches OptionExceptions and reports them. Make each command line utility use a single description for all usage messages. Ensure all command line utilities have a help option and required options are marked as such. --- .../PreferredReplicaLeaderElectionCommand.scala | 13 +++---- .../kafka/admin/ReassignPartitionsCommand.scala | 19 +++++----- core/src/main/scala/kafka/admin/TopicCommand.scala | 33 +++++++++-------- .../main/scala/kafka/tools/ConsoleConsumer.scala | 23 ++++-------- .../main/scala/kafka/tools/ConsoleProducer.scala | 12 ++++--- .../scala/kafka/tools/ConsumerOffsetChecker.scala | 19 ++++------ .../scala/kafka/tools/ConsumerPerformance.scala | 7 ++-- .../main/scala/kafka/tools/DumpLogSegments.scala | 11 +++--- .../main/scala/kafka/tools/ExportZkOffsets.scala | 23 +++++------- .../main/scala/kafka/tools/GetOffsetShell.scala | 15 ++++---- .../main/scala/kafka/tools/ImportZkOffsets.scala | 23 +++++------- core/src/main/scala/kafka/tools/JmxTool.scala | 13 +++---- core/src/main/scala/kafka/tools/MirrorMaker.scala | 21 +++++------ .../scala/kafka/tools/ProducerPerformance.scala | 10 ++++-- .../main/scala/kafka/tools/ReplayLogProducer.scala | 13 ++++--- .../kafka/tools/ReplicaVerificationTool.scala | 14 ++++---- .../kafka/tools/SimpleConsumerPerformance.scala | 7 ++-- .../scala/kafka/tools/SimpleConsumerShell.scala | 14 ++++---- .../scala/kafka/tools/StateChangeLogMerger.scala | 9 ++--- .../main/scala/kafka/tools/TestLogCleaning.scala | 15 ++++---- .../kafka/tools/VerifyConsumerRebalance.scala | 17 ++++----- .../main/scala/kafka/utils/CommandLineUtils.scala | 41 ++++++++++++++++++---- core/src/main/scala/kafka/utils/ToolsUtils.scala | 4 +-- .../scala/other/kafka/TestLinearWriteSpeed.scala | 10 ++++-- 24 files changed, 202 insertions(+), 184 deletions(-) diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index c791848..db8017d 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -40,14 +40,15 @@ object PreferredReplicaLeaderElectionCommand extends Logging { .withRequiredArg .describedAs("urls") .ofType(classOf[String]) - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "This tool causes leadership for each partition to be transferred back to the 'preferred replica'," + - " it can be used to balance leadership among the servers.") + val helpOpt = parser.accepts("help", "Print usage information.") - val options = parser.parse(args : _*) + val usageDescription = "This tool causes leadership for each partition to be transferred back to the 'preferred replica'," + + " it can be used to balance leadership among the servers." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, zkConnectOpt) val zkConnect = options.valueOf(zkConnectOpt) var zkClient: ZkClient = null diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 691d69a..a9db586 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -32,9 +32,9 @@ object ReassignPartitionsCommand extends Logging { // should have exactly one action val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has _) if(actions != 1) - CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --generate, --execute or --verify") + CommandLineUtils.printUsageErrorAndDie(opts.parser, opts.usageDescription, "Command must include exactly one action: --generate, --execute or --verify") - CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) + CommandLineUtils.checkRequiredArgs(opts.parser, opts.usageDescription, opts.options, opts.zkConnectOpt) val zkConnect = opts.options.valueOf(opts.zkConnectOpt) var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) @@ -57,7 +57,7 @@ object ReassignPartitionsCommand extends Logging { def verifyAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) { if(!opts.options.has(opts.reassignmentJsonFileOpt)) - CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option") + CommandLineUtils.printUsageErrorAndDie(opts.parser, opts.usageDescription, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option") val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) val jsonString = Utils.readFileAsString(jsonFile) val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) @@ -78,7 +78,7 @@ object ReassignPartitionsCommand extends Logging { def generateAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) { if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt))) - CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options") + CommandLineUtils.printUsageErrorAndDie(opts.parser, opts.usageDescription, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options") val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt) val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt) val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) @@ -100,7 +100,7 @@ object ReassignPartitionsCommand extends Logging { def executeAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) { if(!opts.options.has(opts.reassignmentJsonFileOpt)) - CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option") + CommandLineUtils.printUsageErrorAndDie(opts.parser, opts.usageDescription, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option") val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile) val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString) @@ -149,6 +149,7 @@ object ReassignPartitionsCommand extends Logging { class ReassignPartitionsCommandOptions(args: Array[String]) { val parser = new OptionParser + val usageDescription = "Moves topic partitions between replicas." val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + "form host:port. Multiple URLS can be given to allow fail-over.") @@ -176,11 +177,11 @@ object ReassignPartitionsCommand extends Logging { .withRequiredArg .describedAs("brokerlist") .ofType(classOf[String]) - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "This command moves topic partitions between replicas.") + val helpOpt = parser.accepts("help", "Print usage information.") - val options = parser.parse(args : _*) + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) } } diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 7672c5a..a174458 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -34,16 +34,12 @@ import org.apache.kafka.common.utils.Utils.formatAddress object TopicCommand { def main(args: Array[String]): Unit = { - val opts = new TopicCommandOptions(args) - if(args.length == 0) - CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.") - // should have exactly one action val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) if(actions != 1) - CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete") + CommandLineUtils.printUsageErrorAndDie(opts.parser, opts.usageDescription, "Command must include exactly one action: --list, --describe, --create, --alter or --delete") opts.checkArgs() @@ -86,7 +82,7 @@ object TopicCommand { val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs) } else { - CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) + CommandLineUtils.checkRequiredArgs(opts.parser, opts.usageDescription, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) val partitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs) @@ -234,6 +230,7 @@ object TopicCommand { class TopicCommandOptions(args: Array[String]) { val parser = new OptionParser + val usageDescription = "Create, delete, describe, or change a topic." val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over.") .withRequiredArg @@ -282,28 +279,30 @@ object TopicCommand { val topicsWithOverridesOpt = parser.accepts("topics-with-overrides", "if set when describing topics, only show topics that have overridden configs") - val options = parser.parse(args : _*) + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt) def checkArgs() { // check required args - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, zkConnectOpt) if (!options.has(listOpt) && !options.has(describeOpt)) - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, topicOpt) // check invalid args - CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, + CommandLineUtils.checkInvalidArgs(parser, usageDescription, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt)) + CommandLineUtils.checkInvalidArgs(parser, usageDescription, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt)) + CommandLineUtils.checkInvalidArgs(parser, usageDescription, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt)) + CommandLineUtils.checkInvalidArgs(parser, usageDescription, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt)) + CommandLineUtils.checkInvalidArgs(parser, usageDescription, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(alterOpt, createOpt) + partitionsOpt + replicationFactorOpt) - CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt, + CommandLineUtils.checkInvalidArgs(parser, usageDescription, options, reportUnderReplicatedPartitionsOpt, allTopicLevelOpts -- Set(describeOpt) + reportUnavailablePartitionsOpt + topicsWithOverridesOpt) - CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt, + CommandLineUtils.checkInvalidArgs(parser, usageDescription, options, reportUnavailablePartitionsOpt, allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt) - CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt, + CommandLineUtils.checkInvalidArgs(parser, usageDescription, options, topicsWithOverridesOpt, allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt) } } diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 323fc85..d4f7f21 100644 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -83,16 +83,18 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("metrics dictory") .ofType(classOf[java.lang.String]) + val helpOpt = parser.accepts("help", "Print usage information.") - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") + val usageDescription = "The console consumer is a tool that reads data from Kafka and outputs it to standard output." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) var groupIdPassed = true - val options: OptionSet = tryParse(parser, args) - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, zkConnectOpt) val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) if (topicOrFilterOpt.size != 1) - CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") + CommandLineUtils.printUsageErrorAndDie(parser, usageDescription, "Exactly one of whitelist/blacklist/topic is required.") val topicArg = options.valueOf(topicOrFilterOpt.head) val filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) @@ -196,17 +198,6 @@ object ConsoleConsumer extends Logging { connector.shutdown() } - def tryParse(parser: OptionParser, args: Array[String]) = { - try { - parser.parse(args : _*) - } catch { - case e: OptionException => { - Utils.croak(e.getMessage) - null - } - } - } - def checkZkPathExists(zkUrl: String, path: String): Boolean = { try { val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer); diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 8e9ba0b..62866ce 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -104,6 +104,7 @@ object ConsoleProducer { class ProducerConfig(args: Array[String]) { val parser = new OptionParser + val usageDescription = "Read data from standard input and publish it to Kafka." val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.") .withRequiredArg .describedAs("topic") @@ -210,17 +211,18 @@ object ConsoleProducer { .describedAs("prop") .ofType(classOf[String]) val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") + val helpOpt = parser.accepts("help", "Print usage information.") - val options = parser.parse(args : _*) - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.") - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt) + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, topicOpt, brokerListOpt) import scala.collection.JavaConversions._ val useNewProducer = options.has(useNewProducerOpt) val topic = options.valueOf(topicOpt) val brokerList = options.valueOf(brokerListOpt) - ToolsUtils.validatePortOrDie(parser,brokerList) + ToolsUtils.validatePortOrDie(parser,brokerList, usageDescription) val sync = options.has(syncOpt) val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) val compressionCodec = if (options.has(compressionCodecOpt)) diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d1e7c43..5c90439 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -109,7 +109,7 @@ object ConsumerOffsetChecker extends Logging { def main(args: Array[String]) { val parser = new OptionParser() - val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect string."). + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: ZooKeeper connect string."). withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) val topicsOpt = parser.accepts("topic", "Comma-separated list of consumer topics (all topics if absent)."). @@ -122,19 +122,14 @@ object ConsumerOffsetChecker extends Logging { withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(3000) parser.accepts("broker-info", "Print broker info") - parser.accepts("help", "Print this message.") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Check the offset of your consumers.") + val helpOpt = parser.accepts("help", "Print usage information.") - val options = parser.parse(args : _*) + val usageDescription = "Check the offset of your consumers." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) - if (options.has("help")) { - parser.printHelpOn(System.out) - System.exit(0) - } - - CommandLineUtils.checkRequiredArgs(parser, options, groupOpt, zkConnectOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, groupOpt, zkConnectOpt) val zkConnect = options.valueOf(zkConnectOpt) diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 093c800..b34f5f9 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -118,9 +118,12 @@ object ConsumerPerformance { .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val options = parser.parse(args: _*) + val usageDescription = "Performance test for the full zookeeper consumer." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, zkConnectOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, topicOpt, zkConnectOpt) val props = new Properties props.put("group.id", options.valueOf(groupIdOpt)) diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 8e9d47b..3849ce4 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -50,13 +50,14 @@ object DumpLogSegments { .withOptionalArg() .ofType(classOf[java.lang.String]) .defaultsTo("kafka.serializer.StringDecoder") + val helpOpt = parser.accepts("help", "Print usage information.") - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.") - - val options = parser.parse(args : _*) + val usageDescription = "Performance test for the full zookeeper consumer." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) - CommandLineUtils.checkRequiredArgs(parser, options, filesOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, filesOpt) val print = if(options.has(printOpt)) true else false val verifyOnly = if(options.has(verifyOpt)) true else false diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala index 4d051bc..3c90b7c 100644 --- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala @@ -51,23 +51,18 @@ object ExportZkOffsets extends Logging { val groupOpt = parser.accepts("group", "Consumer group.") .withRequiredArg() .ofType(classOf[String]) - val outFileOpt = parser.accepts("output-file", "Output file") + val outFileOpt = parser.accepts("output-file", "REQUIRED: Output file") .withRequiredArg() .ofType(classOf[String]) - parser.accepts("help", "Print this message.") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Export consumer offsets to an output file.") - - val options = parser.parse(args : _*) - - if (options.has("help")) { - parser.printHelpOn(System.out) - System.exit(0) - } - - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt) + val helpOpt = parser.accepts("help", "Print usage information.") + + val usageDescription = "Retrieve offsets of broker partitions." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, zkConnectOpt, outFileOpt) + val zkConnect = options.valueOf(zkConnectOpt) val groups = options.valuesOf(groupOpt) val outfile = options.valueOf(outFileOpt) diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 3d9293e..2e095e6 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -43,7 +43,7 @@ object GetOffsetShell { .describedAs("partition ids") .ofType(classOf[String]) .defaultsTo("") - val timeOpt = parser.accepts("time", "timestamp of the offsets before that") + val timeOpt = parser.accepts("time", "REQUIRED: timestamp of the offsets before that") .withRequiredArg .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) @@ -57,17 +57,18 @@ object GetOffsetShell { .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") + val helpOpt = parser.accepts("help", "Print usage information.") - val options = parser.parse(args : _*) + val usageDescription = "Gets offset for a topic." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) - CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, brokerListOpt, topicOpt, timeOpt) val clientId = "GetOffsetShell" val brokerList = options.valueOf(brokerListOpt) - ToolsUtils.validatePortOrDie(parser, brokerList) + ToolsUtils.validatePortOrDie(parser, usageDescription, brokerList) val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList) val topic = options.valueOf(topicOpt) var partitionList = options.valueOf(partitionOpt) diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index abe0972..2bfceff 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -48,22 +48,17 @@ object ImportZkOffsets extends Logging { .withRequiredArg() .defaultsTo("localhost:2181") .ofType(classOf[String]) - val inFileOpt = parser.accepts("input-file", "Input file") + val inFileOpt = parser.accepts("input-file", "REQUIRED: Input file") .withRequiredArg() .ofType(classOf[String]) - parser.accepts("help", "Print this message.") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Import offsets to zookeeper from files.") - - val options = parser.parse(args : _*) - - if (options.has("help")) { - parser.printHelpOn(System.out) - System.exit(0) - } - - CommandLineUtils.checkRequiredArgs(parser, options, inFileOpt) + val helpOpt = parser.accepts("help", "Print usage information.") + + val usageDescription = "Gets offset for a topic." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) + + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, inFileOpt) val zkConnect = options.valueOf(zkConnectOpt) val partitionOffsetFile = options.valueOf(inFileOpt) diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index 1d1a120..285d2ae 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -63,16 +63,11 @@ object JmxTool extends Logging { .describedAs("service-url") .ofType(classOf[String]) .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard output.") - val options = parser.parse(args : _*) - - if(options.has(helpOpt)) { - parser.printHelpOn(System.out) - System.exit(0) - } + val usageDescription = "Dump JMX values to standard output." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt)) val interval = options.valueOf(reportingIntervalOpt).intValue diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b8698ee..a506ba9 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -46,14 +46,14 @@ object MirrorMaker extends Logging { val parser = new OptionParser val consumerConfigOpt = parser.accepts("consumer.config", - "Consumer config to consume from a source cluster. " + + "REQUIRED: Consumer config to consume from a source cluster. " + "You may specify multiple of these.") .withRequiredArg() .describedAs("config file") .ofType(classOf[String]) val producerConfigOpt = parser.accepts("producer.config", - "Embedded producer config.") + "REQUIRED: Embedded producer config.") .withRequiredArg() .describedAs("config file") .ofType(classOf[String]) @@ -94,19 +94,14 @@ object MirrorMaker extends Logging { .describedAs("Java regex (String)") .ofType(classOf[String]) - val helpOpt = parser.accepts("help", "Print this message.") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.") - - val options = parser.parse(args : _*) + val helpOpt = parser.accepts("help", "Print usage information.") - if (options.has(helpOpt)) { - parser.printHelpOn(System.out) - System.exit(0) - } + val usageDescription = "Continuously copy data between two Kafka clusters." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) - CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, consumerConfigOpt, producerConfigOpt) if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) { println("Exactly one of whitelist or blacklist is required.") System.exit(1) diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index f61c7c7..1d07240 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -122,8 +122,12 @@ object ProducerPerformance extends Logging { .ofType(classOf[java.lang.String]) val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") - val options = parser.parse(args: _*) - CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt) + val usageDescription = "Load test for the producer." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) + + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, topicsOpt, brokerListOpt, numMessagesOpt) val topicsStr = options.valueOf(topicsOpt) val topics = topicsStr.split(",") @@ -132,7 +136,7 @@ object ProducerPerformance extends Logging { val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) val hideHeader = options.has(hideHeaderOpt) val brokerList = options.valueOf(brokerListOpt) - ToolsUtils.validatePortOrDie(parser,brokerList) + ToolsUtils.validatePortOrDie(parser, usageDescription, brokerList) val messageSize = options.valueOf(messageSizeOpt).intValue var isFixedSize = !options.has(varyMessageSizeOpt) var isSync = options.has(syncOpt) diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 3393a3d..db45134 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -64,6 +64,8 @@ object ReplayLogProducer extends Logging { class Config(args: Array[String]) { val parser = new OptionParser + val usageDescription = "Consumes from one topic and replays those messages as a producer to another topic" + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over.") .withRequiredArg @@ -103,14 +105,17 @@ object ReplayLogProducer extends Logging { .describedAs("producer properties") .ofType(classOf[String]) val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") + val helpOpt = parser.accepts("help", "Print usage information.") + + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) - val options = parser.parse(args : _*) - - CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, inputTopicOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, brokerListOpt, inputTopicOpt, outputTopicOpt) val zkConnect = options.valueOf(zkConnectOpt) val brokerList = options.valueOf(brokerListOpt) - ToolsUtils.validatePortOrDie(parser,brokerList) + ToolsUtils.validatePortOrDie(parser, usageDescription, brokerList) val numMessages = options.valueOf(numMessagesOpt).intValue val numThreads = options.valueOf(numThreadsOpt).intValue val inputTopic = options.valueOf(inputTopicOpt) diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index ba6ddd7..26854da 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -63,6 +63,7 @@ object ReplicaVerificationTool extends Logging { def main(args: Array[String]): Unit = { val parser = new OptionParser + val usageDescription = "Validate that all replicas for a set of topics have the same data." val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") .withRequiredArg .describedAs("hostname:port,...,hostname:port") @@ -92,12 +93,13 @@ object ReplicaVerificationTool extends Logging { .describedAs("ms") .ofType(classOf[java.lang.Long]) .defaultsTo(30 * 1000L) - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") + val helpOpt = parser.accepts("help", "Print usage information.") - val options = parser.parse(args : _*) - CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) + + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, brokerListOpt) val regex = options.valueOf(topicWhiteListOpt) val topicWhiteListFiler = new Whitelist(regex) @@ -117,7 +119,7 @@ object ReplicaVerificationTool extends Logging { // getting topic metadata info("Getting topic metatdata...") val brokerList = options.valueOf(brokerListOpt) - ToolsUtils.validatePortOrDie(parser,brokerList) + ToolsUtils.validatePortOrDie(parser, usageDescription, brokerList) val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList) val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala index 7602b8d..2a4627a 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala @@ -139,9 +139,12 @@ object SimpleConsumerPerformance { .ofType(classOf[String]) .defaultsTo("SimpleConsumerPerformanceClient") - val options = parser.parse(args : _*) + val usageDescription = "Performance test for the simple consumer" + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, urlOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, topicOpt, urlOpt) val url = new URI(options.valueOf(urlOpt)) val fetchSize = options.valueOf(fetchSizeOpt).intValue diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index b4f903b..44a3fa5 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -93,12 +93,14 @@ object SimpleConsumerShell extends Logging { "skip it instead of halt.") val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.") + val helpOpt = parser.accepts("help", "Print usage information.") - val options = parser.parse(args : _*) - CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, partitionIdOpt) + val usageDescription = "A low-level tool for fetching data directly from a particular replica." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) + + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, brokerListOpt, topicOpt, partitionIdOpt) val topic = options.valueOf(topicOpt) val partitionId = options.valueOf(partitionIdOpt).intValue() @@ -125,7 +127,7 @@ object SimpleConsumerShell extends Logging { // getting topic metadata info("Getting topic metatdata...") val brokerList = options.valueOf(brokerListOpt) - ToolsUtils.validatePortOrDie(parser,brokerList) + ToolsUtils.validatePortOrDie(parser, usageDescription, brokerList) val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList) val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index d298e7e..fa2db8b 100644 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -83,12 +83,13 @@ object StateChangeLogMerger extends Logging { .describedAs("end timestamp in the format " + dateFormat) .ofType(classOf[String]) .defaultsTo("9999-12-31 23:59:59,999") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "A tool for merging the log files from several brokers to reconnstruct a unified history of what happened.") + val helpOpt = parser.accepts("help", "Print usage information.") + val usageDescription = "A tool for merging the log files from several brokers to reconnstruct a unified history of what happened." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) - val options = parser.parse(args : _*) if ((!options.has(filesOpt) && !options.has(regexOpt)) || (options.has(filesOpt) && options.has(regexOpt))) { System.err.println("Provide arguments to exactly one of the two options \"" + filesOpt + "\" or \"" + regexOpt + "\"") parser.printHelpOn(System.err) diff --git a/core/src/main/scala/kafka/tools/TestLogCleaning.scala b/core/src/main/scala/kafka/tools/TestLogCleaning.scala index 1d4ea93..d25c5e4 100644 --- a/core/src/main/scala/kafka/tools/TestLogCleaning.scala +++ b/core/src/main/scala/kafka/tools/TestLogCleaning.scala @@ -71,7 +71,7 @@ object TestLogCleaning { .describedAs("percent") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) - val zkConnectOpt = parser.accepts("zk", "Zk url.") + val zkConnectOpt = parser.accepts("zk", "REQUIRED: Zk url.") .withRequiredArg .describedAs("url") .ofType(classOf[String]) @@ -84,18 +84,19 @@ object TestLogCleaning { .withRequiredArg .describedAs("directory") .ofType(classOf[String]) - - val options = parser.parse(args:_*) - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "An integration test for log cleaning.") + val helpOpt = parser.accepts("help", "Print usage information.") + + val usageDescription = "An integration test for log cleaning." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) if(options.has(dumpOpt)) { dumpLog(new File(options.valueOf(dumpOpt))) System.exit(0) } - CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt, numMessagesOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, brokerOpt, zkConnectOpt, numMessagesOpt) // parse options val messages = options.valueOf(numMessagesOpt).longValue diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala index aef8361..da2cf9d 100644 --- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala +++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala @@ -29,19 +29,14 @@ object VerifyConsumerRebalance extends Logging { withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) val groupOpt = parser.accepts("group", "Consumer group."). withRequiredArg().ofType(classOf[String]) - parser.accepts("help", "Print this message.") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Validate that all partitions have a consumer for a given consumer group.") + val helpOpt = parser.accepts("help", "Print usage information.") - val options = parser.parse(args : _*) + val usageDescription = "Validate that all partitions have a consumer for a given consumer group." + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) - if (options.has("help")) { - parser.printHelpOn(System.out) - System.exit(0) - } - - CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, groupOpt) val zkConnect = options.valueOf(zkConnectOpt) val group = options.valueOf(groupOpt) diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala index 086a624..efbed19 100644 --- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala +++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala @@ -16,7 +16,7 @@ */ package kafka.utils -import joptsimple.{OptionSpec, OptionSet, OptionParser} +import joptsimple.{OptionSpec, OptionSet, OptionParser, OptionException} import scala.collection.Set import java.util.Properties @@ -28,35 +28,62 @@ object CommandLineUtils extends Logging { /** * Check that all the listed options are present */ - def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { + def checkRequiredArgs(parser: OptionParser, description: String, options: OptionSet, required: OptionSpec[_]*) { for(arg <- required) { if(!options.has(arg)) - printUsageAndDie(parser, "Missing required argument \"" + arg + "\"") + printUsageErrorAndDie(parser, description, "Missing required argument \"" + arg + "\"") } } /** * Check that none of the listed options are present */ - def checkInvalidArgs(parser: OptionParser, options: OptionSet, usedOption: OptionSpec[_], invalidOptions: Set[OptionSpec[_]]) { + def checkInvalidArgs(parser: OptionParser, description: String, options: OptionSet, usedOption: OptionSpec[_], invalidOptions: Set[OptionSpec[_]]) { if(options.has(usedOption)) { for(arg <- invalidOptions) { if(options.has(arg)) - printUsageAndDie(parser, "Option \"" + usedOption + "\" can't be used with option\"" + arg + "\"") + printUsageErrorAndDie(parser, description, "Option \"" + usedOption + "\" can't be used with option\"" + arg + "\"") } } } /** - * Print usage and exit + * Print usage on stderr and exit with an error code */ - def printUsageAndDie(parser: OptionParser, message: String) { + def printUsageErrorAndDie(parser: OptionParser, description: String, message: String) { System.err.println(message) + System.err.println() + System.err.println(description) parser.printHelpOn(System.err) System.exit(1) } /** + * Print usage on stdout and exit cleanly + */ + def printUsageAndExit(parser: OptionParser, description: String) { + System.out.println(description) + parser.printHelpOn(System.out) + System.exit(0) + } + + /** + * Parses command line arguments using the given parser, returning the resulting OptionSet. If a parsing exception + * occurs, the error is reported along with usage information to the user and the process exits + */ + def parseArgsOrDie(parser: OptionParser, description: String, args: Array[String]): OptionSet = { + try { + parser.parse(args : _*) + } + catch { + case e: OptionException => { + CommandLineUtils.printUsageErrorAndDie(parser, description, e.getMessage) + null + } + } + } + + /** * Parse key-value pairs in the form key=value */ def parseKeyValueArgs(args: Iterable[String]): Properties = { diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala index fef9392..6a7adc2 100644 --- a/core/src/main/scala/kafka/utils/ToolsUtils.scala +++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala @@ -20,7 +20,7 @@ import joptsimple.OptionParser object ToolsUtils { - def validatePortOrDie(parser: OptionParser, hostPort: String) = { + def validatePortOrDie(parser: OptionParser, description: String, hostPort: String) = { val hostPorts: Array[String] = if(hostPort.contains(',')) hostPort.split(",") else @@ -31,6 +31,6 @@ object ToolsUtils { } val isValid = !(validHostPort.isEmpty) && validHostPort.size == hostPorts.length if(!isValid) - CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092\n ") + CommandLineUtils.printUsageErrorAndDie(parser, description, "Please provide valid host:port like host1:9091,host2:9092\n ") } } diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 7211c25..47ebe35 100644 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -80,10 +80,14 @@ object TestLinearWriteSpeed { val mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.") val channelOpt = parser.accepts("channel", "Do writes to file channesl.") val logOpt = parser.accepts("log", "Do writes to kafka logs.") - - val options = parser.parse(args : _*) + val helpOpt = parser.accepts("help", "Print usage information.") + + val usageDescription = "Tests linear write speed" + val options = CommandLineUtils.parseArgsOrDie(parser, usageDescription, args) + if(options.has(helpOpt)) + CommandLineUtils.printUsageAndExit(parser, usageDescription) - CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt, filesOpt) + CommandLineUtils.checkRequiredArgs(parser, usageDescription, options, bytesOpt, sizeOpt, filesOpt) var bytesToWrite = options.valueOf(bytesOpt).longValue val bufferSize = options.valueOf(sizeOpt).intValue -- 2.1.2