diff --git a/bin/kafka-consumer-offset-checker.sh b/bin/kafka-consumer-offset-checker.sh new file mode 100755 index 0000000..c275f7e --- /dev/null +++ b/bin/kafka-consumer-offset-checker.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker $@ diff --git a/bin/kafka-mirror-maker.sh b/bin/kafka-mirror-maker.sh new file mode 100755 index 0000000..56e342c --- /dev/null +++ b/bin/kafka-mirror-maker.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker $@ diff --git a/bin/kafka-replica-verification.sh b/bin/kafka-replica-verification.sh new file mode 100755 index 0000000..ee6d19e --- /dev/null +++ b/bin/kafka-replica-verification.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplicaVerificationTool $@ diff --git a/bin/kafka-simple-consumer-perf-test.sh b/bin/kafka-simple-consumer-perf-test.sh deleted file mode 100755 index b1a5cfc..0000000 --- a/bin/kafka-simple-consumer-perf-test.sh +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then - export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M" -fi - -exec $(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerPerformance $@ diff --git a/bin/windows/kafka-consumer-offset-checker.bat b/bin/windows/kafka-consumer-offset-checker.bat new file mode 100644 index 0000000..b6967c4 --- /dev/null +++ b/bin/windows/kafka-consumer-offset-checker.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +%~dp0kafka-run-class.bat kafka.tools.ConsumerOffsetChecker %* diff --git a/bin/windows/kafka-consumer-perf-test.bat b/bin/windows/kafka-consumer-perf-test.bat new file mode 100644 index 0000000..afc2259 --- /dev/null +++ b/bin/windows/kafka-consumer-perf-test.bat @@ -0,0 +1,20 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +SetLocal +set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M +%~dp0kafka-run-class.bat kafka.tools.ConsumerPerformance %* +EndLocal diff --git a/bin/windows/kafka-mirror-maker.bat b/bin/windows/kafka-mirror-maker.bat new file mode 100644 index 0000000..819e7d8 --- /dev/null +++ b/bin/windows/kafka-mirror-maker.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +%~dp0kafka-run-class.bat kafka.tools.MirrorMaker %* diff --git a/bin/windows/kafka-preferred-replica-election.bat b/bin/windows/kafka-preferred-replica-election.bat new file mode 100644 index 0000000..a9a5b7e --- /dev/null +++ b/bin/windows/kafka-preferred-replica-election.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +%~dp0kafka-run-class.bat kafka.admin.PreferredReplicaLeaderElectionCommand %* diff --git a/bin/windows/kafka-producer-perf-test.bat b/bin/windows/kafka-producer-perf-test.bat new file mode 100644 index 0000000..a894752 --- /dev/null +++ b/bin/windows/kafka-producer-perf-test.bat @@ -0,0 +1,20 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +SetLocal +set KAFKA_HEAP_OPTS=-Xmx512M +%~dp0kafka-run-class.bat kafka.tools.ProducerPerformance %* +EndLocal diff --git a/bin/windows/kafka-reassign-partitions.bat b/bin/windows/kafka-reassign-partitions.bat new file mode 100644 index 0000000..0c13ee3 --- /dev/null +++ b/bin/windows/kafka-reassign-partitions.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +%~dp0kafka-run-class.bat kafka.admin.ReassignPartitionsCommand %* diff --git a/bin/windows/kafka-replay-log-producer.bat b/bin/windows/kafka-replay-log-producer.bat new file mode 100644 index 0000000..2aec326 --- /dev/null +++ b/bin/windows/kafka-replay-log-producer.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +%~dp0kafka-run-class.bat kafka.tools.ReplayLogProducer %* diff --git a/bin/windows/kafka-replica-verification.bat b/bin/windows/kafka-replica-verification.bat new file mode 100644 index 0000000..481db57 --- /dev/null +++ b/bin/windows/kafka-replica-verification.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +%~dp0kafka-run-class.bat kafka.tools.ReplicaVerificationTool %* diff --git a/bin/windows/kafka-simple-consumer-shell.bat b/bin/windows/kafka-simple-consumer-shell.bat new file mode 100644 index 0000000..4e6ea0c --- /dev/null +++ b/bin/windows/kafka-simple-consumer-shell.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +%~dp0kafka-run-class.bat kafka.tools.SimpleConsumerShell %* diff --git a/bin/windows/zookeeper-shell.bat b/bin/windows/zookeeper-shell.bat new file mode 100644 index 0000000..e98f069 --- /dev/null +++ b/bin/windows/zookeeper-shell.bat @@ -0,0 +1,22 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +IF [%1] EQU [] ( + echo USAGE: %0 zookeeper_host:port[/path] [args...] + EXIT /B 1 +) + +%~dp0kafka-run-class.bat org.apache.zookeeper.ZooKeeperMain -server %* diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 9b3c6ae..c791848 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -40,6 +40,10 @@ object PreferredReplicaLeaderElectionCommand extends Logging { .withRequiredArg .describedAs("urls") .ofType(classOf[String]) + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "This tool causes leadership for each partition to be transferred back to the 'preferred replica'," + + " it can be used to balance leadership among the servers.") val options = parser.parse(args : _*) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 2637586..691d69a 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -31,10 +31,8 @@ object ReassignPartitionsCommand extends Logging { // should have exactly one action val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has _) - if(actions != 1) { - opts.parser.printHelpOn(System.err) - Utils.croak("Command must include exactly one action: --generate, --execute or --verify") - } + if(actions != 1) + CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --generate, --execute or --verify") CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) @@ -58,10 +56,8 @@ object ReassignPartitionsCommand extends Logging { } def verifyAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) { - if(!opts.options.has(opts.reassignmentJsonFileOpt)) { - opts.parser.printHelpOn(System.err) - Utils.croak("If --verify option is used, command must include --reassignment-json-file that was used during the --execute option") - } + if(!opts.options.has(opts.reassignmentJsonFileOpt)) + CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option") val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) val jsonString = Utils.readFileAsString(jsonFile) val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) @@ -81,10 +77,8 @@ object ReassignPartitionsCommand extends Logging { } def generateAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) { - if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt))) { - opts.parser.printHelpOn(System.err) - Utils.croak("If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options") - } + if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt))) + CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options") val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt) val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt) val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) @@ -105,11 +99,8 @@ object ReassignPartitionsCommand extends Logging { } def executeAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) { - if(!opts.options.has(opts.reassignmentJsonFileOpt)) { - opts.parser.printHelpOn(System.err) - Utils.croak("If --execute option is used, command must include --reassignment-json-file that was output " + - "during the --generate option") - } + if(!opts.options.has(opts.reassignmentJsonFileOpt)) + CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option") val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile) val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString) @@ -185,6 +176,9 @@ object ReassignPartitionsCommand extends Logging { .withRequiredArg .describedAs("brokerlist") .ofType(classOf[String]) + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "This command moves topic partitions between replicas.") val options = parser.parse(args : _*) } diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 6788c2e..8d5c2e7 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -35,13 +35,13 @@ object TopicCommand { val opts = new TopicCommandOptions(args) + if(args.length == 0) + CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.") + // should have exactly one action val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) - if(actions != 1) { - System.err.println("Command must include exactly one action: --list, --describe, --create, --alter or --delete") - opts.parser.printHelpOn(System.err) - System.exit(1) - } + if(actions != 1) + CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete") opts.checkArgs() diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index f6bc2f1..323fc85 100644 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -84,15 +84,15 @@ object ConsoleConsumer extends Logging { .describedAs("metrics dictory") .ofType(classOf[java.lang.String]) + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") + var groupIdPassed = true val options: OptionSet = tryParse(parser, args) CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) - if (topicOrFilterOpt.size != 1) { - error("Exactly one of whitelist/blacklist/topic is required.") - parser.printHelpOn(System.err) - System.exit(1) - } + if (topicOrFilterOpt.size != 1) + CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") val topicArg = options.valueOf(topicOrFilterOpt.head) val filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) @@ -144,7 +144,7 @@ object ConsoleConsumer extends Logging { val config = new ConsumerConfig(consumerProps) val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) - val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) + val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 val connector = Consumer.create(config) @@ -217,20 +217,6 @@ object ConsoleConsumer extends Logging { } } -object MessageFormatter { - def tryParseFormatterArgs(args: Iterable[String]): Properties = { - val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0) - if(!splits.forall(_.length == 2)) { - System.err.println("Invalid parser arguments: " + args.mkString(" ")) - System.exit(1) - } - val props = new Properties - for(a <- splits) - props.put(a(0), a(1)) - props - } -} - trait MessageFormatter { def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) def init(props: Properties) {} diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index f4e07d4..da4dad4 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -212,13 +212,9 @@ object ConsoleProducer { val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") val options = parser.parse(args : _*) - for(arg <- List(topicOpt, brokerListOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.") + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt) import scala.collection.JavaConversions._ val useNewProducer = options.has(useNewProducerOpt) @@ -243,7 +239,7 @@ object ConsoleProducer { val valueEncoderClass = options.valueOf(valueEncoderOpt) val readerClass = options.valueOf(messageReaderOpt) val socketBuffer = options.valueOf(socketBufferSizeOpt) - val cmdLineProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt)) + val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt)) /* new producer related configs */ val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt) val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt) diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index 19df757..d1e7c43 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -123,6 +123,9 @@ object ConsumerOffsetChecker extends Logging { parser.accepts("broker-info", "Print broker info") parser.accepts("help", "Print this message.") + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Check the offset of your consumers.") val options = parser.parse(args : _*) @@ -131,12 +134,7 @@ object ConsumerOffsetChecker extends Logging { System.exit(0) } - for (opt <- List(groupOpt, zkConnectOpt)) - if (!options.has(opt)) { - System.err.println("Missing required argument: %s".format(opt)) - parser.printHelpOn(System.err) - System.exit(1) - } + CommandLineUtils.checkRequiredArgs(parser, options, groupOpt, zkConnectOpt) val zkConnect = options.valueOf(zkConnectOpt) diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 4688349..093c800 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import java.nio.channels.ClosedByInterruptException import org.apache.log4j.Logger import kafka.message.Message -import kafka.utils.ZkUtils +import kafka.utils.{ZkUtils, CommandLineUtils} import java.util.{ Random, Properties } import kafka.consumer._ import java.text.SimpleDateFormat @@ -120,13 +120,7 @@ object ConsumerPerformance { val options = parser.parse(args: _*) - for (arg <- List(topicOpt, zkConnectOpt)) { - if (!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, zkConnectOpt) val props = new Properties props.put("group.id", options.valueOf(groupIdOpt)) diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index f0ab02a..6daf87b 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -41,13 +41,13 @@ object DumpLogSegments { .ofType(classOf[java.lang.Integer]) .defaultsTo(5 * 1024 * 1024) val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration") + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.") val options = parser.parse(args : _*) - if(!options.has(filesOpt)) { - System.err.println("Missing required argument \"" + filesOpt + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } + + CommandLineUtils.checkRequiredArgs(parser, options, filesOpt) val print = if(options.has(printOpt)) true else false val verifyOnly = if(options.has(verifyOpt)) true else false diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala index 005231f..4d051bc 100644 --- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala @@ -19,7 +19,7 @@ package kafka.tools import java.io.FileWriter import joptsimple._ -import kafka.utils.{Logging, ZkUtils, ZKStringSerializer,ZKGroupTopicDirs} +import kafka.utils.{Logging, ZkUtils, ZKStringSerializer, ZKGroupTopicDirs, CommandLineUtils} import org.I0Itec.zkclient.ZkClient @@ -55,6 +55,9 @@ object ExportZkOffsets extends Logging { .withRequiredArg() .ofType(classOf[String]) parser.accepts("help", "Print this message.") + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Export consumer offsets to an output file.") val options = parser.parse(args : _*) @@ -63,13 +66,7 @@ object ExportZkOffsets extends Logging { System.exit(0) } - for (opt <- List(zkConnectOpt, outFileOpt)) { - if (!options.has(opt)) { - System.err.println("Missing required argument: %s".format(opt)) - parser.printHelpOn(System.err) - System.exit(1) - } - } + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt) val zkConnect = options.valueOf(zkConnectOpt) val groups = options.valuesOf(groupOpt) diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index fba652e..9c6064e 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -57,6 +57,9 @@ object GetOffsetShell { .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") val options = parser.parse(args : _*) diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index c8023ee..abe0972 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -20,7 +20,7 @@ package kafka.tools import java.io.BufferedReader import java.io.FileReader import joptsimple._ -import kafka.utils.{Logging, ZkUtils,ZKStringSerializer} +import kafka.utils.{Logging, ZkUtils,ZKStringSerializer, CommandLineUtils} import org.I0Itec.zkclient.ZkClient @@ -52,6 +52,9 @@ object ImportZkOffsets extends Logging { .withRequiredArg() .ofType(classOf[String]) parser.accepts("help", "Print this message.") + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Import offsets to zookeeper from files.") val options = parser.parse(args : _*) @@ -60,13 +63,7 @@ object ImportZkOffsets extends Logging { System.exit(0) } - for (opt <- List(inFileOpt)) { - if (!options.has(opt)) { - System.err.println("Missing required argument: %s".format(opt)) - parser.printHelpOn(System.err) - System.exit(1) - } - } + CommandLineUtils.checkRequiredArgs(parser, options, inFileOpt) val zkConnect = options.valueOf(zkConnectOpt) val partitionOffsetFile = options.valueOf(inFileOpt) diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index 747a675..1d1a120 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -26,7 +26,7 @@ import joptsimple.OptionParser import scala.collection.JavaConversions._ import scala.collection.mutable import scala.math._ -import kafka.utils.Logging +import kafka.utils.{CommandLineUtils, Logging} object JmxTool extends Logging { @@ -63,6 +63,9 @@ object JmxTool extends Logging { .describedAs("service-url") .ofType(classOf[String]) .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi") + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard output.") val options = parser.parse(args : _*) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index e75c4f8..7638391 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -95,6 +95,9 @@ object MirrorMaker extends Logging { .ofType(classOf[String]) val helpOpt = parser.accepts("help", "Print this message.") + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.") val options = parser.parse(args : _*) diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index 95cfbc1..fc3e724 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -19,7 +19,7 @@ package kafka.tools import kafka.metrics.KafkaMetricsReporter import kafka.producer.{OldProducer, NewShinyProducer} -import kafka.utils.{VerifiableProperties, Logging} +import kafka.utils.{VerifiableProperties, Logging, CommandLineUtils} import kafka.message.CompressionCodec import kafka.serializer._ @@ -123,13 +123,8 @@ object ProducerPerformance extends Logging { val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") val options = parser.parse(args: _*) - for (arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) { - if (!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } + CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt) + val topicsStr = options.valueOf(topicsOpt) val topics = topicsStr.split(",") val numMessages = options.valueOf(numMessagesOpt).longValue diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index eb71e49..69be31c 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -105,13 +105,9 @@ object ReplayLogProducer extends Logging { val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") val options = parser.parse(args : _*) - for(arg <- List(brokerListOpt, inputTopicOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } + + CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, inputTopicOpt) + val zkConnect = options.valueOf(zkConnectOpt) val brokerList = options.valueOf(brokerListOpt) val numMessages = options.valueOf(numMessagesOpt).intValue @@ -121,7 +117,7 @@ object ReplayLogProducer extends Logging { val reportingInterval = options.valueOf(reportingIntervalOpt).intValue val isSync = options.has(syncOpt) import scala.collection.JavaConversions._ - val producerProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt)) + val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt)) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) } diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 91f0728..c040f49 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -92,7 +92,9 @@ object ReplicaVerificationTool extends Logging { .describedAs("ms") .ofType(classOf[java.lang.Long]) .defaultsTo(30 * 1000L) - + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") val options = parser.parse(args : _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala index 8b8c472..7602b8d 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala @@ -141,13 +141,8 @@ object SimpleConsumerPerformance { val options = parser.parse(args : _*) - for(arg <- List(topicOpt, urlOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, urlOpt) + val url = new URI(options.valueOf(urlOpt)) val fetchSize = options.valueOf(fetchSizeOpt).intValue val fromLatest = options.has(resetBeginningOffsetOpt) diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 747e072..36314f4 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -93,15 +93,12 @@ object SimpleConsumerShell extends Logging { "skip it instead of halt.") val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages") + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.") val options = parser.parse(args : _*) - for(arg <- List(brokerListOpt, topicOpt, partitionIdOpt)) { - if(!options.has(arg)) { - error("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } + CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, partitionIdOpt) val topic = options.valueOf(topicOpt) val partitionId = options.valueOf(partitionIdOpt).intValue() @@ -117,7 +114,7 @@ object SimpleConsumerShell extends Logging { val noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt) val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) - val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) + val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) val fetchRequestBuilder = new FetchRequestBuilder() .clientId(clientId) diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index 97970fb..d298e7e 100644 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -22,7 +22,7 @@ import scala.util.matching.Regex import collection.mutable import java.util.Date import java.text.SimpleDateFormat -import kafka.utils.Logging +import kafka.utils.{Logging, CommandLineUtils} import kafka.common.Topic import java.io.{BufferedOutputStream, OutputStream} @@ -83,6 +83,9 @@ object StateChangeLogMerger extends Logging { .describedAs("end timestamp in the format " + dateFormat) .ofType(classOf[String]) .defaultsTo("9999-12-31 23:59:59,999") + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "A tool for merging the log files from several brokers to reconnstruct a unified history of what happened.") val options = parser.parse(args : _*) diff --git a/core/src/main/scala/kafka/tools/TestLogCleaning.scala b/core/src/main/scala/kafka/tools/TestLogCleaning.scala index 595dc7c..1d4ea93 100644 --- a/core/src/main/scala/kafka/tools/TestLogCleaning.scala +++ b/core/src/main/scala/kafka/tools/TestLogCleaning.scala @@ -87,15 +87,15 @@ object TestLogCleaning { val options = parser.parse(args:_*) + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "An integration test for log cleaning.") + if(options.has(dumpOpt)) { dumpLog(new File(options.valueOf(dumpOpt))) System.exit(0) } - if(!options.has(brokerOpt) || !options.has(zkConnectOpt) || !options.has(numMessagesOpt)) { - parser.printHelpOn(System.err) - System.exit(1) - } + CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt, numMessagesOpt) // parse options val messages = options.valueOf(numMessagesOpt).longValue diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala index 92c0d1f..aef8361 100644 --- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala +++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala @@ -19,7 +19,7 @@ package kafka.tools import joptsimple.OptionParser import org.I0Itec.zkclient.ZkClient -import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, ZKStringSerializer} +import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, CommandLineUtils} object VerifyConsumerRebalance extends Logging { def main(args: Array[String]) { @@ -30,6 +30,9 @@ object VerifyConsumerRebalance extends Logging { val groupOpt = parser.accepts("group", "Consumer group."). withRequiredArg().ofType(classOf[String]) parser.accepts("help", "Print this message.") + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Validate that all partitions have a consumer for a given consumer group.") val options = parser.parse(args : _*) @@ -38,12 +41,7 @@ object VerifyConsumerRebalance extends Logging { System.exit(0) } - for (opt <- List(groupOpt)) - if (!options.has(opt)) { - System.err.println("Missing required argument: %s".format(opt)) - parser.printHelpOn(System.err) - System.exit(1) - } + CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) val zkConnect = options.valueOf(zkConnectOpt) val group = options.valueOf(groupOpt) diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala index c1d8ba5..1ba605c 100644 --- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala +++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala @@ -25,37 +25,49 @@ import scala.collection.Set */ object CommandLineUtils extends Logging { + /** + * Check that all the listed options are present + */ def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { for(arg <- required) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } + if(!options.has(arg)) + printUsageAndDie(parser, "Missing required argument \"" + arg + "\"") } } + /** + * Check that none of the listed options are present + */ def checkInvalidArgs(parser: OptionParser, options: OptionSet, usedOption: OptionSpec[_], invalidOptions: Set[OptionSpec[_]]) { if(options.has(usedOption)) { for(arg <- invalidOptions) { - if(options.has(arg)) { - System.err.println("Option \"" + usedOption + "\" can't be used with option\"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } + if(options.has(arg)) + printUsageAndDie(parser, "Option \"" + usedOption + "\" can't be used with option\"" + arg + "\"") } } } + + /** + * Print usage and exit + */ + def printUsageAndDie(parser: OptionParser, message: String) { + System.err.println(message) + parser.printHelpOn(System.err) + System.exit(1) + } - def parseCommandLineArgs(args: Iterable[String]): Properties = { - val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0) - if(!splits.forall(_.length == 2)) { - System.err.println("Invalid command line properties: " + args.mkString(" ")) - System.exit(1) - } - val props = new Properties - for(a <- splits) - props.put(a(0), a(1)) - props - } - } \ No newline at end of file + /** + * Parse key-value pairs in the form key=value + */ + def parseKeyValueArgs(args: Iterable[String]): Properties = { + val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0) + if(!splits.forall(_.length == 2)) { + System.err.println("Invalid command line properties: " + args.mkString(" ")) + System.exit(1) + } + val props = new Properties + for(a <- splits) + props.put(a(0), a(1)) + props + } +} \ No newline at end of file diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index eeb8c88..7211c25 100644 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -83,13 +83,7 @@ object TestLinearWriteSpeed { val options = parser.parse(args : _*) - for(arg <- List(bytesOpt, sizeOpt, filesOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } + CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt, filesOpt) var bytesToWrite = options.valueOf(bytesOpt).longValue val bufferSize = options.valueOf(sizeOpt).intValue