From 8fa0e8deeb149d18d404e346ad07f219cc2f1fac Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Thu, 25 Sep 2014 22:32:17 -0600 Subject: [PATCH] KAFKA-1618 Exception thrown when running console producer with no port number for the broker --- .../main/scala/kafka/tools/ConsoleProducer.scala | 11 +++++----- .../main/scala/kafka/tools/GetOffsetShell.scala | 6 ++++-- .../scala/kafka/tools/ProducerPerformance.scala | 3 ++- .../main/scala/kafka/tools/ReplayLogProducer.scala | 3 ++- .../kafka/tools/ReplicaVerificationTool.scala | 4 +++- .../scala/kafka/tools/SimpleConsumerShell.scala | 4 +++- core/src/main/scala/kafka/utils/ToolsUtils.scala | 25 ++++++++++++++++++++++ 7 files changed, 45 insertions(+), 11 deletions(-) create mode 100644 core/src/main/scala/kafka/utils/ToolsUtils.scala diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index da4dad4..8e9ba0b 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -20,7 +20,7 @@ package kafka.tools import kafka.common._ import kafka.message._ import kafka.serializer._ -import kafka.utils.CommandLineUtils +import kafka.utils.{ToolsUtils, CommandLineUtils} import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage} import java.util.Properties @@ -129,24 +129,24 @@ object ConsoleProducer { .defaultsTo(3) val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.") .withRequiredArg - .ofType(classOf[java.lang.Long]) + .ofType(classOf[java.lang.Integer]) .defaultsTo(100) val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + " a message will queue awaiting suffient batch size. The value is given in ms.") .withRequiredArg .describedAs("timeout_ms") - .ofType(classOf[java.lang.Long]) + .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " + " messages will queue awaiting suffient batch size.") .withRequiredArg .describedAs("queue_size") - .ofType(classOf[java.lang.Long]) + .ofType(classOf[java.lang.Integer]) .defaultsTo(10000) val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue") .withRequiredArg .describedAs("queue enqueuetimeout ms") - .ofType(classOf[java.lang.Long]) + .ofType(classOf[java.lang.Integer]) .defaultsTo(Int.MaxValue) val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests") .withRequiredArg @@ -220,6 +220,7 @@ object ConsoleProducer { val useNewProducer = options.has(useNewProducerOpt) val topic = options.valueOf(topicOpt) val brokerList = options.valueOf(brokerListOpt) + ToolsUtils.validatePortOrDie(parser,brokerList) val sync = options.has(syncOpt) val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) val compressionCodec = if (options.has(compressionCodecOpt)) diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 9c6064e..1bbdede 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -23,7 +23,7 @@ import joptsimple._ import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} import kafka.common.TopicAndPartition import kafka.client.ClientUtils -import kafka.utils.CommandLineUtils +import kafka.utils.{ToolsUtils, CommandLineUtils} object GetOffsetShell { @@ -66,7 +66,9 @@ object GetOffsetShell { CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt) val clientId = "GetOffsetShell" - val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) + val brokerList = options.valueOf(brokerListOpt) + ToolsUtils.validatePortOrDie(parser,brokerList) + val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList) val topic = options.valueOf(topicOpt) var partitionList = options.valueOf(partitionOpt) var time = options.valueOf(timeOpt).longValue diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index fc3e724..f61c7c7 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -19,7 +19,7 @@ package kafka.tools import kafka.metrics.KafkaMetricsReporter import kafka.producer.{OldProducer, NewShinyProducer} -import kafka.utils.{VerifiableProperties, Logging, CommandLineUtils} +import kafka.utils.{ToolsUtils, VerifiableProperties, Logging, CommandLineUtils} import kafka.message.CompressionCodec import kafka.serializer._ @@ -132,6 +132,7 @@ object ProducerPerformance extends Logging { val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) val hideHeader = options.has(hideHeaderOpt) val brokerList = options.valueOf(brokerListOpt) + ToolsUtils.validatePortOrDie(parser,brokerList) val messageSize = options.valueOf(messageSizeOpt).intValue var isFixedSize = !options.has(varyMessageSizeOpt) var isSync = options.has(syncOpt) diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 69be31c..3393a3d 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -21,7 +21,7 @@ import joptsimple.OptionParser import java.util.concurrent.{Executors, CountDownLatch} import java.util.Properties import kafka.consumer._ -import kafka.utils.{CommandLineUtils, Logging, ZkUtils} +import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils} import kafka.api.OffsetRequest import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig} @@ -110,6 +110,7 @@ object ReplayLogProducer extends Logging { val zkConnect = options.valueOf(zkConnectOpt) val brokerList = options.valueOf(brokerListOpt) + ToolsUtils.validatePortOrDie(parser,brokerList) val numMessages = options.valueOf(numMessagesOpt).intValue val numThreads = options.valueOf(numThreadsOpt).intValue val inputTopic = options.valueOf(inputTopicOpt) diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index af47836..ba6ddd7 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -116,7 +116,9 @@ object ReplicaVerificationTool extends Logging { val reportInterval = options.valueOf(reportIntervalOpt).longValue // getting topic metadata info("Getting topic metatdata...") - val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) + val brokerList = options.valueOf(brokerListOpt) + ToolsUtils.validatePortOrDie(parser,brokerList) + val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList) val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 36314f4..b4f903b 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -124,7 +124,9 @@ object SimpleConsumerShell extends Logging { // getting topic metadata info("Getting topic metatdata...") - val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) + val brokerList = options.valueOf(brokerListOpt) + ToolsUtils.validatePortOrDie(parser,brokerList) + val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList) val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata)) diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala new file mode 100644 index 0000000..901bcdb --- /dev/null +++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala @@ -0,0 +1,25 @@ +package kafka.utils + +import joptsimple.OptionParser +import scala.util.matching.Regex + +/** + * Created by balaji.seshadri on 9/25/14. + */ +object ToolsUtils { + + def validatePortOrDie(parser: OptionParser, hostPort: String) = { + val regex = new Regex(":[0-9]") + val hostPorts: Array[String] = if(hostPort.contains(',')) + hostPort.split(",") + else + Array(hostPort) + val validHostPort = hostPorts.filter { + hostPortData => + regex.findAllMatchIn(hostPortData).size > 0 + } + val isValid = !(validHostPort.isEmpty) && validHostPort.size == hostPorts.length + if(!isValid) + CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092,check usage below for more \n ") + } +} -- 1.8.5.2 (Apple Git-48)