From 8913ecbfdcd4c95951c4f3055a5abb2b91d68824 Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Thu, 4 Sep 2014 15:37:44 -0600 Subject: [PATCH 1/2] Patch for KAFKA-1618 --- core/src/main/scala/kafka/tools/ConsoleProducer.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index da4dad4..ac0bbdc 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -22,11 +22,10 @@ import kafka.message._ import kafka.serializer._ import kafka.utils.CommandLineUtils import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage} - import java.util.Properties import java.io._ - import joptsimple._ +import scala.util.matching.Regex object ConsoleProducer { @@ -112,6 +111,7 @@ object ConsoleProducer { .withRequiredArg .describedAs("broker-list") .ofType(classOf[String]) + val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." + "If specified without value, than it defaults to 'gzip'") @@ -219,7 +219,12 @@ object ConsoleProducer { import scala.collection.JavaConversions._ val useNewProducer = options.has(useNewProducerOpt) val topic = options.valueOf(topicOpt) - val brokerList = options.valueOf(brokerListOpt) + val regex = new Regex(":[0-9].*") + val brokerList = if(regex.findAllMatchIn(options.valueOf(brokerListOpt)).size==0) + options.valueOf(brokerListOpt)+":9091" + else + options.valueOf(brokerListOpt) + val sync = options.has(syncOpt) val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) val compressionCodec = if (options.has(compressionCodecOpt)) -- 1.8.5.2 (Apple Git-48) From e88510fefd9fa777cefd6a5e52eaf7bd0d1a17dd Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Mon, 15 Sep 2014 12:27:59 -0600 Subject: [PATCH 2/2] KAFKA-16188 Exception thrown when running console producer with no port number for the broker --- .../main/scala/kafka/tools/ConsoleProducer.scala | 172 ++++++++++++--------- .../main/scala/kafka/tools/GetOffsetShell.scala | 58 +++---- .../scala/kafka/tools/ProducerPerformance.scala | 9 +- .../main/scala/kafka/tools/ReplayLogProducer.scala | 27 ++-- .../kafka/tools/ReplicaVerificationTool.scala | 4 + .../scala/kafka/tools/SimpleConsumerShell.scala | 151 +++++++++--------- 6 files changed, 228 insertions(+), 193 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index ac0bbdc..19b7cfc 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -21,7 +21,7 @@ import kafka.common._ import kafka.message._ import kafka.serializer._ import kafka.utils.CommandLineUtils -import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage} +import kafka.producer.{ NewShinyProducer, OldProducer, KeyedMessage } import java.util.Properties import java.io._ import joptsimple._ @@ -39,60 +39,60 @@ object ConsoleProducer { reader.init(System.in, props) try { - val producer = - if(config.useNewProducer) { - import org.apache.kafka.clients.producer.ProducerConfig + val producer = + if (config.useNewProducer) { + import org.apache.kafka.clients.producer.ProducerConfig - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) - props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) - props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) - props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) - props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) - if(config.queueEnqueueTimeoutMs != -1) - props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) - props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) + props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) + props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) + props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) + props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) + if (config.queueEnqueueTimeoutMs != -1) + props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) + props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") - new NewShinyProducer(props) - } else { - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec) - props.put("producer.type", if(config.sync) "sync" else "async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("message.send.max.retries", config.messageSendMaxRetries.toString) - props.put("retry.backoff.ms", config.retryBackoffMs.toString) - props.put("queue.buffering.max.ms", config.sendTimeout.toString) - props.put("queue.buffering.max.messages", config.queueSize.toString) - props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) - props.put("request.required.acks", config.requestRequiredAcks.toString) - props.put("request.timeout.ms", config.requestTimeoutMs.toString) - props.put("key.serializer.class", config.keyEncoderClass) - props.put("serializer.class", config.valueEncoderClass) - props.put("send.buffer.bytes", config.socketBuffer.toString) - props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) - props.put("client.id", "console-producer") + new NewShinyProducer(props) + } else { + props.put("metadata.broker.list", config.brokerList) + props.put("compression.codec", config.compressionCodec) + props.put("producer.type", if (config.sync) "sync" else "async") + props.put("batch.num.messages", config.batchSize.toString) + props.put("message.send.max.retries", config.messageSendMaxRetries.toString) + props.put("retry.backoff.ms", config.retryBackoffMs.toString) + props.put("queue.buffering.max.ms", config.sendTimeout.toString) + props.put("queue.buffering.max.messages", config.queueSize.toString) + props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) + props.put("request.required.acks", config.requestRequiredAcks.toString) + props.put("request.timeout.ms", config.requestTimeoutMs.toString) + props.put("key.serializer.class", config.keyEncoderClass) + props.put("serializer.class", config.valueEncoderClass) + props.put("send.buffer.bytes", config.socketBuffer.toString) + props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) + props.put("client.id", "console-producer") - new OldProducer(props) - } + new OldProducer(props) + } - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - producer.close() - } - }) + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + producer.close() + } + }) - var message: KeyedMessage[Array[Byte], Array[Byte]] = null - do { - message = reader.readMessage() - if(message != null) - producer.send(message.topic, message.key, message.message) - } while(message != null) + var message: KeyedMessage[Array[Byte], Array[Byte]] = null + do { + message = reader.readMessage() + if (message != null) + producer.send(message.topic, message.key, message.message) + } while (message != null) } catch { case e: Exception => e.printStackTrace @@ -111,13 +111,13 @@ object ConsoleProducer { .withRequiredArg .describedAs("broker-list") .ofType(classOf[String]) - + val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." + - "If specified without value, than it defaults to 'gzip'") - .withOptionalArg() - .describedAs("compression-codec") - .ofType(classOf[String]) + "If specified without value, than it defaults to 'gzip'") + .withOptionalArg() + .describedAs("compression-codec") + .ofType(classOf[String]) val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.") .withRequiredArg .describedAs("size") @@ -163,13 +163,13 @@ object ConsoleProducer { .withRequiredArg .describedAs("metadata expiration interval") .ofType(classOf[java.lang.Long]) - .defaultsTo(5*60*1000L) + .defaultsTo(5 * 60 * 1000L) val metadataFetchTimeoutMsOpt = parser.accepts("metadata-fetch-timeout-ms", "The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that topic.") .withRequiredArg .describedAs("metadata fetch timeout") .ofType(classOf[java.lang.Long]) - .defaultsTo(60*1000L) + .defaultsTo(60 * 1000L) val maxMemoryBytesOpt = parser.accepts("max-memory-bytes", "The total memory used by the producer to buffer records waiting to be sent to the server.") .withRequiredArg @@ -203,7 +203,7 @@ object ConsoleProducer { .withRequiredArg .describedAs("size") .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024*100) + .defaultsTo(1024 * 100) val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + "This allows custom configuration for a user-defined message reader.") .withRequiredArg @@ -211,8 +211,8 @@ object ConsoleProducer { .ofType(classOf[String]) val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") - val options = parser.parse(args : _*) - if(args.length == 0) + val options = parser.parse(args: _*) + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.") CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt) @@ -220,18 +220,18 @@ object ConsoleProducer { val useNewProducer = options.has(useNewProducerOpt) val topic = options.valueOf(topicOpt) val regex = new Regex(":[0-9].*") - val brokerList = if(regex.findAllMatchIn(options.valueOf(brokerListOpt)).size==0) - options.valueOf(brokerListOpt)+":9091" - else - options.valueOf(brokerListOpt) - + + PortValidator.validateAndDie(parser,options.valueOf(brokerListOpt)) + + val brokerList = options.valueOf(brokerListOpt) + val sync = options.has(syncOpt) val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) val compressionCodec = if (options.has(compressionCodecOpt)) - if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty) - DefaultCompressionCodec.name - else compressionCodecOptionValue - else NoCompressionCodec.name + if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty) + DefaultCompressionCodec.name + else compressionCodecOptionValue + else NoCompressionCodec.name val batchSize = options.valueOf(batchSizeOpt) val sendTimeout = options.valueOf(sendTimeoutOpt) val queueSize = options.valueOf(queueSizeOpt) @@ -268,11 +268,11 @@ object ConsoleProducer { override def init(inputStream: InputStream, props: Properties) { topic = props.getProperty("topic") - if(props.containsKey("parse.key")) + if (props.containsKey("parse.key")) parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true") - if(props.containsKey("key.separator")) + if (props.containsKey("key.separator")) keySeparator = props.getProperty("key.separator") - if(props.containsKey("ignore.error")) + if (props.containsKey("ignore.error")) ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true") reader = new BufferedReader(new InputStreamReader(inputStream)) } @@ -284,18 +284,40 @@ object ConsoleProducer { case (line, true) => line.indexOf(keySeparator) match { case -1 => - if(ignoreError) + if (ignoreError) new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) else throw new KafkaException("No key found on line " + lineNumber + ": " + line) case n => new KeyedMessage[Array[Byte], Array[Byte]](topic, - line.substring(0, n).getBytes, - (if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes()) + line.substring(0, n).getBytes, + (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes()) } case (line, false) => new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) } } } + + object PortValidator { + def validate(hostPort: String) = { + val regex = new Regex(":[0-9]") + + val hostPorts: Array[String] = if (hostPort.contains(',')) + hostPort.split(",") + else + Array(hostPort) + val validHostPort = hostPorts.filter { + hostPortData => + regex.findAllMatchIn(hostPortData).size > 0 + } + + !(validHostPort.isEmpty) && validHostPort.size==hostPorts.length + } + + def validateAndDie(parser:OptionParser,hostPort:String) { + if (!validate(hostPort)) + CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092,check usage below for more \n ") + } + } } diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 9c6064e..1d4358f 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -20,52 +20,54 @@ package kafka.tools import kafka.consumer._ import joptsimple._ -import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} +import kafka.api.{ PartitionOffsetRequestInfo, OffsetRequest } import kafka.common.TopicAndPartition import kafka.client.ClientUtils import kafka.utils.CommandLineUtils - object GetOffsetShell { def main(args: Array[String]): Unit = { val parser = new OptionParser val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") - .withRequiredArg - .describedAs("hostname:port,...,hostname:port") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("hostname:port,...,hostname:port") + .ofType(classOf[String]) val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions") - .withRequiredArg - .describedAs("partition ids") - .ofType(classOf[String]) - .defaultsTo("") + .withRequiredArg + .describedAs("partition ids") + .ofType(classOf[String]) + .defaultsTo("") val timeOpt = parser.accepts("time", "timestamp of the offsets before that") - .withRequiredArg - .describedAs("timestamp/-1(latest)/-2(earliest)") - .ofType(classOf[java.lang.Long]) + .withRequiredArg + .describedAs("timestamp/-1(latest)/-2(earliest)") + .ofType(classOf[java.lang.Long]) val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) - - if(args.length == 0) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1000) + + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") - val options = parser.parse(args : _*) + val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt) + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + val clientId = "GetOffsetShell" + val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) val topic = options.valueOf(topicOpt) var partitionList = options.valueOf(partitionOpt) @@ -74,13 +76,13 @@ object GetOffsetShell { val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata - if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { + if (topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + "kafka-list-topic.sh to verify") System.exit(1) } val partitions = - if(partitionList == "") { + if (partitionList == "") { topicsMetadata.head.partitionsMetadata.map(_.partitionId) } else { partitionList.split(",").map(_.toInt).toSeq diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index fc3e724..ea814a7 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -18,12 +18,12 @@ package kafka.tools import kafka.metrics.KafkaMetricsReporter -import kafka.producer.{OldProducer, NewShinyProducer} -import kafka.utils.{VerifiableProperties, Logging, CommandLineUtils} +import kafka.producer.{ OldProducer, NewShinyProducer } +import kafka.utils.{ VerifiableProperties, Logging, CommandLineUtils } import kafka.message.CompressionCodec import kafka.serializer._ -import java.util.concurrent.{CountDownLatch, Executors} +import java.util.concurrent.{ CountDownLatch, Executors } import java.util.concurrent.atomic.AtomicLong import java.util._ import java.text.SimpleDateFormat @@ -125,6 +125,8 @@ object ProducerPerformance extends Logging { val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt) + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + val topicsStr = options.valueOf(topicsOpt) val topics = topicsStr.split(",") val numMessages = options.valueOf(numMessagesOpt).longValue @@ -132,6 +134,7 @@ object ProducerPerformance extends Logging { val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) val hideHeader = options.has(hideHeaderOpt) val brokerList = options.valueOf(brokerListOpt) + val messageSize = options.valueOf(messageSizeOpt).intValue var isFixedSize = !options.has(varyMessageSizeOpt) var isSync = options.has(syncOpt) diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 69be31c..13334bd 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -18,12 +18,12 @@ package kafka.tools import joptsimple.OptionParser -import java.util.concurrent.{Executors, CountDownLatch} +import java.util.concurrent.{ Executors, CountDownLatch } import java.util.Properties import kafka.consumer._ -import kafka.utils.{CommandLineUtils, Logging, ZkUtils} +import kafka.utils.{ CommandLineUtils, Logging, ZkUtils } import kafka.api.OffsetRequest -import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig} +import org.apache.kafka.clients.producer.{ ProducerRecord, KafkaProducer, ProducerConfig } object ReplayLogProducer extends Logging { @@ -45,7 +45,7 @@ object ReplayLogProducer extends Logging { consumerProps.put("zookeeper.connect", config.zkConnect) consumerProps.put("consumer.timeout.ms", "10000") consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString) - consumerProps.put("fetch.message.max.bytes", (1024*1024).toString) + consumerProps.put("fetch.message.max.bytes", (1024 * 1024).toString) consumerProps.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString) val consumerConfig = new ConsumerConfig(consumerProps) val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) @@ -104,12 +104,15 @@ object ReplayLogProducer extends Logging { .ofType(classOf[String]) val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") - val options = parser.parse(args : _*) - + val options = parser.parse(args: _*) + CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, inputTopicOpt) + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + val zkConnect = options.valueOf(zkConnectOpt) val brokerList = options.valueOf(brokerListOpt) + val numMessages = options.valueOf(numMessagesOpt).intValue val numThreads = options.valueOf(numThreadsOpt).intValue val inputTopic = options.valueOf(inputTopicOpt) @@ -130,28 +133,28 @@ object ReplayLogProducer extends Logging { var messageCount: Int = 0 try { val iter = - if(config.numMessages >= 0) + if (config.numMessages >= 0) stream.slice(0, config.numMessages) else stream for (messageAndMetadata <- iter) { try { val response = producer.send(new ProducerRecord(config.outputTopic, - messageAndMetadata.key(), messageAndMetadata.message())) - if(config.isSync) { + messageAndMetadata.key(), messageAndMetadata.message())) + if (config.isSync) { response.get() } messageCount += 1 - }catch { + } catch { case ie: Exception => error("Skipping this message", ie) } } - }catch { + } catch { case e: ConsumerTimeoutException => error("consumer thread timing out", e) } info("Sent " + messageCount + " messages") shutdownLatch.countDown - info("thread finished execution !" ) + info("thread finished execution !") } def shutdown() { diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index af47836..bf5aba1 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -98,6 +98,8 @@ object ReplicaVerificationTool extends Logging { val options = parser.parse(args : _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) + + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) val regex = options.valueOf(topicWhiteListOpt) val topicWhiteListFiler = new Whitelist(regex) @@ -114,6 +116,8 @@ object ReplicaVerificationTool extends Logging { val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue val reportInterval = options.valueOf(reportIntervalOpt).longValue + + // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 36314f4..fd80dae 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -21,7 +21,7 @@ import joptsimple._ import kafka.utils._ import kafka.consumer._ import kafka.client.ClientUtils -import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} +import kafka.api.{ OffsetRequest, FetchRequestBuilder, Request } import kafka.cluster.Broker import scala.collection.JavaConversions._ import kafka.common.TopicAndPartition @@ -37,69 +37,71 @@ object SimpleConsumerShell extends Logging { val parser = new OptionParser val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") - .withRequiredArg - .describedAs("hostname:port,...,hostname:port") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("hostname:port,...,hostname:port") + .ofType(classOf[String]) val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) val partitionIdOpt = parser.accepts("partition", "The partition to consume from.") - .withRequiredArg - .describedAs("partition") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) + .withRequiredArg + .describedAs("partition") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) val replicaIdOpt = parser.accepts("replica", "The replica id to consume from, default -1 means leader broker.") - .withRequiredArg - .describedAs("replica id") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(UseLeaderReplica) + .withRequiredArg + .describedAs("replica id") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(UseLeaderReplica) val offsetOpt = parser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end") - .withRequiredArg - .describedAs("consume offset") - .ofType(classOf[java.lang.Long]) - .defaultsTo(OffsetRequest.EarliestTime) + .withRequiredArg + .describedAs("consume offset") + .ofType(classOf[java.lang.Long]) + .defaultsTo(OffsetRequest.EarliestTime) val clientIdOpt = parser.accepts("clientId", "The ID of this client.") - .withRequiredArg - .describedAs("clientId") - .ofType(classOf[String]) - .defaultsTo("SimpleConsumerShell") + .withRequiredArg + .describedAs("clientId") + .ofType(classOf[String]) + .defaultsTo("SimpleConsumerShell") val fetchSizeOpt = parser.accepts("fetchsize", "The fetch size of each request.") - .withRequiredArg - .describedAs("fetchsize") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024 * 1024) + .withRequiredArg + .describedAs("fetchsize") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024 * 1024) val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") - .withRequiredArg - .describedAs("class") - .ofType(classOf[String]) - .defaultsTo(classOf[DefaultMessageFormatter].getName) + .withRequiredArg + .describedAs("class") + .ofType(classOf[String]) + .defaultsTo(classOf[DefaultMessageFormatter].getName) val messageFormatterArgOpt = parser.accepts("property") - .withRequiredArg - .describedAs("prop") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("prop") + .ofType(classOf[String]) val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator") val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1000) val maxMessagesOpt = parser.accepts("max-messages", "The number of messages to consume") - .withRequiredArg - .describedAs("max-messages") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(Integer.MAX_VALUE) + .withRequiredArg + .describedAs("max-messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(Integer.MAX_VALUE) val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + - "skip it instead of halt.") + "skip it instead of halt.") val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", - "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages") - - if(args.length == 0) + "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages") + + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.") - val options = parser.parse(args : _*) + val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, partitionIdOpt) + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + val topic = options.valueOf(topicOpt) val partitionId = options.valueOf(partitionIdOpt).intValue() val replicaId = options.valueOf(replicaIdOpt).intValue() @@ -110,23 +112,23 @@ object SimpleConsumerShell extends Logging { val maxMessages = options.valueOf(maxMessagesOpt).intValue val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false - val printOffsets = if(options.has(printOffsetOpt)) true else false + val printOffsets = if (options.has(printOffsetOpt)) true else false val noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt) val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) val fetchRequestBuilder = new FetchRequestBuilder() - .clientId(clientId) - .replicaId(Request.DebuggingConsumerId) - .maxWait(maxWaitMs) - .minBytes(ConsumerConfig.MinFetchBytes) + .clientId(clientId) + .replicaId(Request.DebuggingConsumerId) + .maxWait(maxWaitMs) + .minBytes(ConsumerConfig.MinFetchBytes) // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata - if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { + if (topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata)) System.exit(1) } @@ -134,7 +136,7 @@ object SimpleConsumerShell extends Logging { // validating partition id val partitionsMetadata = topicsMetadata(0).partitionsMetadata val partitionMetadataOpt = partitionsMetadata.find(p => p.partitionId == partitionId) - if(!partitionMetadataOpt.isDefined) { + if (!partitionMetadataOpt.isDefined) { System.err.println("Error: partition %d does not exist for topic %s".format(partitionId, topic)) System.exit(1) } @@ -142,17 +144,16 @@ object SimpleConsumerShell extends Logging { // validating replica id and initializing target broker var fetchTargetBroker: Broker = null var replicaOpt: Option[Broker] = null - if(replicaId == UseLeaderReplica) { + if (replicaId == UseLeaderReplica) { replicaOpt = partitionMetadataOpt.get.leader - if(!replicaOpt.isDefined) { + if (!replicaOpt.isDefined) { System.err.println("Error: user speicifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(replicaId, topic, partitionId)) System.exit(1) } - } - else { + } else { val replicasForPartition = partitionMetadataOpt.get.replicas replicaOpt = replicasForPartition.find(r => r.id == replicaId) - if(!replicaOpt.isDefined) { + if (!replicaOpt.isDefined) { System.err.println("Error: replica %d does not exist for partition (%s, %d)".format(replicaId, topic, partitionId)) System.exit(1) } @@ -160,16 +161,16 @@ object SimpleConsumerShell extends Logging { fetchTargetBroker = replicaOpt.get // initializing starting offset - if(startingOffset < OffsetRequest.EarliestTime) { + if (startingOffset < OffsetRequest.EarliestTime) { System.err.println("Invalid starting offset: %d".format(startingOffset)) System.exit(1) } if (startingOffset < 0) { val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout, - ConsumerConfig.SocketBufferSize, clientId) + ConsumerConfig.SocketBufferSize, clientId) try { startingOffset = simpleConsumer.earliestOrLatestOffset(TopicAndPartition(topic, partitionId), startingOffset, - Request.DebuggingConsumerId) + Request.DebuggingConsumerId) } catch { case t: Throwable => System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t)) @@ -184,19 +185,19 @@ object SimpleConsumerShell extends Logging { val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(formatterArgs) - val replicaString = if(replicaId > 0) "leader" else "replica" + val replicaString = if (replicaId > 0) "leader" else "replica" info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]" - .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset)) - val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId) + .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset)) + val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64 * 1024, clientId) val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() { def run() { var offset = startingOffset var numMessagesConsumed = 0 try { - while(numMessagesConsumed < maxMessages) { + while (numMessagesConsumed < maxMessages) { val fetchRequest = fetchRequestBuilder - .addFetch(topic, partitionId, offset, fetchSize) - .build() + .addFetch(topic, partitionId, offset, fetchSize) + .build() val fetchResponse = simpleConsumer.fetch(fetchRequest) val messageSet = fetchResponse.messageSet(topic, partitionId) if (messageSet.validBytes <= 0 && noWaitAtEndOfLog) { @@ -204,14 +205,14 @@ object SimpleConsumerShell extends Logging { return } debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset) - for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages)) { + for (messageAndOffset <- messageSet if (numMessagesConsumed < maxMessages)) { try { offset = messageAndOffset.nextOffset - if(printOffsets) + if (printOffsets) System.out.println("next offset = " + offset) val message = messageAndOffset.message - val key = if(message.hasKey) Utils.readBytes(message.key) else null - formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out) + val key = if (message.hasKey) Utils.readBytes(message.key) else null + formatter.writeTo(key, if (message.isNull) null else Utils.readBytes(message.payload), System.out) numMessagesConsumed += 1 } catch { case e: Throwable => @@ -220,7 +221,7 @@ object SimpleConsumerShell extends Logging { else throw e } - if(System.out.checkError()) { + if (System.out.checkError()) { // This means no one is listening to our output stream any more, time to shutdown System.err.println("Unable to write to standard out, closing consumer.") formatter.close() @@ -232,7 +233,7 @@ object SimpleConsumerShell extends Logging { } catch { case e: Throwable => error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic, partitionId, replicaId, offset), e) - }finally { + } finally { info("Consumed " + numMessagesConsumed + " messages") } } -- 1.8.5.2 (Apple Git-48)