From 8913ecbfdcd4c95951c4f3055a5abb2b91d68824 Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Thu, 4 Sep 2014 15:37:44 -0600 Subject: [PATCH 1/8] Patch for KAFKA-1618 --- core/src/main/scala/kafka/tools/ConsoleProducer.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index da4dad4..ac0bbdc 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -22,11 +22,10 @@ import kafka.message._ import kafka.serializer._ import kafka.utils.CommandLineUtils import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage} - import java.util.Properties import java.io._ - import joptsimple._ +import scala.util.matching.Regex object ConsoleProducer { @@ -112,6 +111,7 @@ object ConsoleProducer { .withRequiredArg .describedAs("broker-list") .ofType(classOf[String]) + val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." + "If specified without value, than it defaults to 'gzip'") @@ -219,7 +219,12 @@ object ConsoleProducer { import scala.collection.JavaConversions._ val useNewProducer = options.has(useNewProducerOpt) val topic = options.valueOf(topicOpt) - val brokerList = options.valueOf(brokerListOpt) + val regex = new Regex(":[0-9].*") + val brokerList = if(regex.findAllMatchIn(options.valueOf(brokerListOpt)).size==0) + options.valueOf(brokerListOpt)+":9091" + else + options.valueOf(brokerListOpt) + val sync = options.has(syncOpt) val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) val compressionCodec = if (options.has(compressionCodecOpt)) -- 1.8.5.2 (Apple Git-48) From e88510fefd9fa777cefd6a5e52eaf7bd0d1a17dd Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Mon, 15 Sep 2014 12:27:59 -0600 Subject: [PATCH 2/8] KAFKA-16188 Exception thrown when running console producer with no port number for the broker --- .../main/scala/kafka/tools/ConsoleProducer.scala | 172 ++++++++++++--------- .../main/scala/kafka/tools/GetOffsetShell.scala | 58 +++---- .../scala/kafka/tools/ProducerPerformance.scala | 9 +- .../main/scala/kafka/tools/ReplayLogProducer.scala | 27 ++-- .../kafka/tools/ReplicaVerificationTool.scala | 4 + .../scala/kafka/tools/SimpleConsumerShell.scala | 151 +++++++++--------- 6 files changed, 228 insertions(+), 193 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index ac0bbdc..19b7cfc 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -21,7 +21,7 @@ import kafka.common._ import kafka.message._ import kafka.serializer._ import kafka.utils.CommandLineUtils -import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage} +import kafka.producer.{ NewShinyProducer, OldProducer, KeyedMessage } import java.util.Properties import java.io._ import joptsimple._ @@ -39,60 +39,60 @@ object ConsoleProducer { reader.init(System.in, props) try { - val producer = - if(config.useNewProducer) { - import org.apache.kafka.clients.producer.ProducerConfig + val producer = + if (config.useNewProducer) { + import org.apache.kafka.clients.producer.ProducerConfig - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) - props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) - props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) - props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) - props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) - if(config.queueEnqueueTimeoutMs != -1) - props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) - props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) + props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) + props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) + props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) + props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) + if (config.queueEnqueueTimeoutMs != -1) + props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) + props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") - new NewShinyProducer(props) - } else { - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec) - props.put("producer.type", if(config.sync) "sync" else "async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("message.send.max.retries", config.messageSendMaxRetries.toString) - props.put("retry.backoff.ms", config.retryBackoffMs.toString) - props.put("queue.buffering.max.ms", config.sendTimeout.toString) - props.put("queue.buffering.max.messages", config.queueSize.toString) - props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) - props.put("request.required.acks", config.requestRequiredAcks.toString) - props.put("request.timeout.ms", config.requestTimeoutMs.toString) - props.put("key.serializer.class", config.keyEncoderClass) - props.put("serializer.class", config.valueEncoderClass) - props.put("send.buffer.bytes", config.socketBuffer.toString) - props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) - props.put("client.id", "console-producer") + new NewShinyProducer(props) + } else { + props.put("metadata.broker.list", config.brokerList) + props.put("compression.codec", config.compressionCodec) + props.put("producer.type", if (config.sync) "sync" else "async") + props.put("batch.num.messages", config.batchSize.toString) + props.put("message.send.max.retries", config.messageSendMaxRetries.toString) + props.put("retry.backoff.ms", config.retryBackoffMs.toString) + props.put("queue.buffering.max.ms", config.sendTimeout.toString) + props.put("queue.buffering.max.messages", config.queueSize.toString) + props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) + props.put("request.required.acks", config.requestRequiredAcks.toString) + props.put("request.timeout.ms", config.requestTimeoutMs.toString) + props.put("key.serializer.class", config.keyEncoderClass) + props.put("serializer.class", config.valueEncoderClass) + props.put("send.buffer.bytes", config.socketBuffer.toString) + props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) + props.put("client.id", "console-producer") - new OldProducer(props) - } + new OldProducer(props) + } - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - producer.close() - } - }) + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + producer.close() + } + }) - var message: KeyedMessage[Array[Byte], Array[Byte]] = null - do { - message = reader.readMessage() - if(message != null) - producer.send(message.topic, message.key, message.message) - } while(message != null) + var message: KeyedMessage[Array[Byte], Array[Byte]] = null + do { + message = reader.readMessage() + if (message != null) + producer.send(message.topic, message.key, message.message) + } while (message != null) } catch { case e: Exception => e.printStackTrace @@ -111,13 +111,13 @@ object ConsoleProducer { .withRequiredArg .describedAs("broker-list") .ofType(classOf[String]) - + val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." + - "If specified without value, than it defaults to 'gzip'") - .withOptionalArg() - .describedAs("compression-codec") - .ofType(classOf[String]) + "If specified without value, than it defaults to 'gzip'") + .withOptionalArg() + .describedAs("compression-codec") + .ofType(classOf[String]) val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.") .withRequiredArg .describedAs("size") @@ -163,13 +163,13 @@ object ConsoleProducer { .withRequiredArg .describedAs("metadata expiration interval") .ofType(classOf[java.lang.Long]) - .defaultsTo(5*60*1000L) + .defaultsTo(5 * 60 * 1000L) val metadataFetchTimeoutMsOpt = parser.accepts("metadata-fetch-timeout-ms", "The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that topic.") .withRequiredArg .describedAs("metadata fetch timeout") .ofType(classOf[java.lang.Long]) - .defaultsTo(60*1000L) + .defaultsTo(60 * 1000L) val maxMemoryBytesOpt = parser.accepts("max-memory-bytes", "The total memory used by the producer to buffer records waiting to be sent to the server.") .withRequiredArg @@ -203,7 +203,7 @@ object ConsoleProducer { .withRequiredArg .describedAs("size") .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024*100) + .defaultsTo(1024 * 100) val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + "This allows custom configuration for a user-defined message reader.") .withRequiredArg @@ -211,8 +211,8 @@ object ConsoleProducer { .ofType(classOf[String]) val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") - val options = parser.parse(args : _*) - if(args.length == 0) + val options = parser.parse(args: _*) + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.") CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt) @@ -220,18 +220,18 @@ object ConsoleProducer { val useNewProducer = options.has(useNewProducerOpt) val topic = options.valueOf(topicOpt) val regex = new Regex(":[0-9].*") - val brokerList = if(regex.findAllMatchIn(options.valueOf(brokerListOpt)).size==0) - options.valueOf(brokerListOpt)+":9091" - else - options.valueOf(brokerListOpt) - + + PortValidator.validateAndDie(parser,options.valueOf(brokerListOpt)) + + val brokerList = options.valueOf(brokerListOpt) + val sync = options.has(syncOpt) val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) val compressionCodec = if (options.has(compressionCodecOpt)) - if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty) - DefaultCompressionCodec.name - else compressionCodecOptionValue - else NoCompressionCodec.name + if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty) + DefaultCompressionCodec.name + else compressionCodecOptionValue + else NoCompressionCodec.name val batchSize = options.valueOf(batchSizeOpt) val sendTimeout = options.valueOf(sendTimeoutOpt) val queueSize = options.valueOf(queueSizeOpt) @@ -268,11 +268,11 @@ object ConsoleProducer { override def init(inputStream: InputStream, props: Properties) { topic = props.getProperty("topic") - if(props.containsKey("parse.key")) + if (props.containsKey("parse.key")) parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true") - if(props.containsKey("key.separator")) + if (props.containsKey("key.separator")) keySeparator = props.getProperty("key.separator") - if(props.containsKey("ignore.error")) + if (props.containsKey("ignore.error")) ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true") reader = new BufferedReader(new InputStreamReader(inputStream)) } @@ -284,18 +284,40 @@ object ConsoleProducer { case (line, true) => line.indexOf(keySeparator) match { case -1 => - if(ignoreError) + if (ignoreError) new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) else throw new KafkaException("No key found on line " + lineNumber + ": " + line) case n => new KeyedMessage[Array[Byte], Array[Byte]](topic, - line.substring(0, n).getBytes, - (if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes()) + line.substring(0, n).getBytes, + (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes()) } case (line, false) => new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) } } } + + object PortValidator { + def validate(hostPort: String) = { + val regex = new Regex(":[0-9]") + + val hostPorts: Array[String] = if (hostPort.contains(',')) + hostPort.split(",") + else + Array(hostPort) + val validHostPort = hostPorts.filter { + hostPortData => + regex.findAllMatchIn(hostPortData).size > 0 + } + + !(validHostPort.isEmpty) && validHostPort.size==hostPorts.length + } + + def validateAndDie(parser:OptionParser,hostPort:String) { + if (!validate(hostPort)) + CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092,check usage below for more \n ") + } + } } diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 9c6064e..1d4358f 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -20,52 +20,54 @@ package kafka.tools import kafka.consumer._ import joptsimple._ -import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} +import kafka.api.{ PartitionOffsetRequestInfo, OffsetRequest } import kafka.common.TopicAndPartition import kafka.client.ClientUtils import kafka.utils.CommandLineUtils - object GetOffsetShell { def main(args: Array[String]): Unit = { val parser = new OptionParser val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") - .withRequiredArg - .describedAs("hostname:port,...,hostname:port") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("hostname:port,...,hostname:port") + .ofType(classOf[String]) val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions") - .withRequiredArg - .describedAs("partition ids") - .ofType(classOf[String]) - .defaultsTo("") + .withRequiredArg + .describedAs("partition ids") + .ofType(classOf[String]) + .defaultsTo("") val timeOpt = parser.accepts("time", "timestamp of the offsets before that") - .withRequiredArg - .describedAs("timestamp/-1(latest)/-2(earliest)") - .ofType(classOf[java.lang.Long]) + .withRequiredArg + .describedAs("timestamp/-1(latest)/-2(earliest)") + .ofType(classOf[java.lang.Long]) val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) - - if(args.length == 0) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1000) + + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") - val options = parser.parse(args : _*) + val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt) + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + val clientId = "GetOffsetShell" + val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) val topic = options.valueOf(topicOpt) var partitionList = options.valueOf(partitionOpt) @@ -74,13 +76,13 @@ object GetOffsetShell { val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata - if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { + if (topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + "kafka-list-topic.sh to verify") System.exit(1) } val partitions = - if(partitionList == "") { + if (partitionList == "") { topicsMetadata.head.partitionsMetadata.map(_.partitionId) } else { partitionList.split(",").map(_.toInt).toSeq diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index fc3e724..ea814a7 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -18,12 +18,12 @@ package kafka.tools import kafka.metrics.KafkaMetricsReporter -import kafka.producer.{OldProducer, NewShinyProducer} -import kafka.utils.{VerifiableProperties, Logging, CommandLineUtils} +import kafka.producer.{ OldProducer, NewShinyProducer } +import kafka.utils.{ VerifiableProperties, Logging, CommandLineUtils } import kafka.message.CompressionCodec import kafka.serializer._ -import java.util.concurrent.{CountDownLatch, Executors} +import java.util.concurrent.{ CountDownLatch, Executors } import java.util.concurrent.atomic.AtomicLong import java.util._ import java.text.SimpleDateFormat @@ -125,6 +125,8 @@ object ProducerPerformance extends Logging { val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt) + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + val topicsStr = options.valueOf(topicsOpt) val topics = topicsStr.split(",") val numMessages = options.valueOf(numMessagesOpt).longValue @@ -132,6 +134,7 @@ object ProducerPerformance extends Logging { val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) val hideHeader = options.has(hideHeaderOpt) val brokerList = options.valueOf(brokerListOpt) + val messageSize = options.valueOf(messageSizeOpt).intValue var isFixedSize = !options.has(varyMessageSizeOpt) var isSync = options.has(syncOpt) diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 69be31c..13334bd 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -18,12 +18,12 @@ package kafka.tools import joptsimple.OptionParser -import java.util.concurrent.{Executors, CountDownLatch} +import java.util.concurrent.{ Executors, CountDownLatch } import java.util.Properties import kafka.consumer._ -import kafka.utils.{CommandLineUtils, Logging, ZkUtils} +import kafka.utils.{ CommandLineUtils, Logging, ZkUtils } import kafka.api.OffsetRequest -import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig} +import org.apache.kafka.clients.producer.{ ProducerRecord, KafkaProducer, ProducerConfig } object ReplayLogProducer extends Logging { @@ -45,7 +45,7 @@ object ReplayLogProducer extends Logging { consumerProps.put("zookeeper.connect", config.zkConnect) consumerProps.put("consumer.timeout.ms", "10000") consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString) - consumerProps.put("fetch.message.max.bytes", (1024*1024).toString) + consumerProps.put("fetch.message.max.bytes", (1024 * 1024).toString) consumerProps.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString) val consumerConfig = new ConsumerConfig(consumerProps) val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) @@ -104,12 +104,15 @@ object ReplayLogProducer extends Logging { .ofType(classOf[String]) val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") - val options = parser.parse(args : _*) - + val options = parser.parse(args: _*) + CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, inputTopicOpt) + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + val zkConnect = options.valueOf(zkConnectOpt) val brokerList = options.valueOf(brokerListOpt) + val numMessages = options.valueOf(numMessagesOpt).intValue val numThreads = options.valueOf(numThreadsOpt).intValue val inputTopic = options.valueOf(inputTopicOpt) @@ -130,28 +133,28 @@ object ReplayLogProducer extends Logging { var messageCount: Int = 0 try { val iter = - if(config.numMessages >= 0) + if (config.numMessages >= 0) stream.slice(0, config.numMessages) else stream for (messageAndMetadata <- iter) { try { val response = producer.send(new ProducerRecord(config.outputTopic, - messageAndMetadata.key(), messageAndMetadata.message())) - if(config.isSync) { + messageAndMetadata.key(), messageAndMetadata.message())) + if (config.isSync) { response.get() } messageCount += 1 - }catch { + } catch { case ie: Exception => error("Skipping this message", ie) } } - }catch { + } catch { case e: ConsumerTimeoutException => error("consumer thread timing out", e) } info("Sent " + messageCount + " messages") shutdownLatch.countDown - info("thread finished execution !" ) + info("thread finished execution !") } def shutdown() { diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index af47836..bf5aba1 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -98,6 +98,8 @@ object ReplicaVerificationTool extends Logging { val options = parser.parse(args : _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) + + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) val regex = options.valueOf(topicWhiteListOpt) val topicWhiteListFiler = new Whitelist(regex) @@ -114,6 +116,8 @@ object ReplicaVerificationTool extends Logging { val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue val reportInterval = options.valueOf(reportIntervalOpt).longValue + + // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 36314f4..fd80dae 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -21,7 +21,7 @@ import joptsimple._ import kafka.utils._ import kafka.consumer._ import kafka.client.ClientUtils -import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} +import kafka.api.{ OffsetRequest, FetchRequestBuilder, Request } import kafka.cluster.Broker import scala.collection.JavaConversions._ import kafka.common.TopicAndPartition @@ -37,69 +37,71 @@ object SimpleConsumerShell extends Logging { val parser = new OptionParser val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") - .withRequiredArg - .describedAs("hostname:port,...,hostname:port") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("hostname:port,...,hostname:port") + .ofType(classOf[String]) val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) val partitionIdOpt = parser.accepts("partition", "The partition to consume from.") - .withRequiredArg - .describedAs("partition") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) + .withRequiredArg + .describedAs("partition") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) val replicaIdOpt = parser.accepts("replica", "The replica id to consume from, default -1 means leader broker.") - .withRequiredArg - .describedAs("replica id") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(UseLeaderReplica) + .withRequiredArg + .describedAs("replica id") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(UseLeaderReplica) val offsetOpt = parser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end") - .withRequiredArg - .describedAs("consume offset") - .ofType(classOf[java.lang.Long]) - .defaultsTo(OffsetRequest.EarliestTime) + .withRequiredArg + .describedAs("consume offset") + .ofType(classOf[java.lang.Long]) + .defaultsTo(OffsetRequest.EarliestTime) val clientIdOpt = parser.accepts("clientId", "The ID of this client.") - .withRequiredArg - .describedAs("clientId") - .ofType(classOf[String]) - .defaultsTo("SimpleConsumerShell") + .withRequiredArg + .describedAs("clientId") + .ofType(classOf[String]) + .defaultsTo("SimpleConsumerShell") val fetchSizeOpt = parser.accepts("fetchsize", "The fetch size of each request.") - .withRequiredArg - .describedAs("fetchsize") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024 * 1024) + .withRequiredArg + .describedAs("fetchsize") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024 * 1024) val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") - .withRequiredArg - .describedAs("class") - .ofType(classOf[String]) - .defaultsTo(classOf[DefaultMessageFormatter].getName) + .withRequiredArg + .describedAs("class") + .ofType(classOf[String]) + .defaultsTo(classOf[DefaultMessageFormatter].getName) val messageFormatterArgOpt = parser.accepts("property") - .withRequiredArg - .describedAs("prop") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("prop") + .ofType(classOf[String]) val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator") val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1000) val maxMessagesOpt = parser.accepts("max-messages", "The number of messages to consume") - .withRequiredArg - .describedAs("max-messages") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(Integer.MAX_VALUE) + .withRequiredArg + .describedAs("max-messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(Integer.MAX_VALUE) val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + - "skip it instead of halt.") + "skip it instead of halt.") val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", - "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages") - - if(args.length == 0) + "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages") + + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.") - val options = parser.parse(args : _*) + val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, partitionIdOpt) + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + val topic = options.valueOf(topicOpt) val partitionId = options.valueOf(partitionIdOpt).intValue() val replicaId = options.valueOf(replicaIdOpt).intValue() @@ -110,23 +112,23 @@ object SimpleConsumerShell extends Logging { val maxMessages = options.valueOf(maxMessagesOpt).intValue val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false - val printOffsets = if(options.has(printOffsetOpt)) true else false + val printOffsets = if (options.has(printOffsetOpt)) true else false val noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt) val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) val fetchRequestBuilder = new FetchRequestBuilder() - .clientId(clientId) - .replicaId(Request.DebuggingConsumerId) - .maxWait(maxWaitMs) - .minBytes(ConsumerConfig.MinFetchBytes) + .clientId(clientId) + .replicaId(Request.DebuggingConsumerId) + .maxWait(maxWaitMs) + .minBytes(ConsumerConfig.MinFetchBytes) // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata - if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { + if (topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata)) System.exit(1) } @@ -134,7 +136,7 @@ object SimpleConsumerShell extends Logging { // validating partition id val partitionsMetadata = topicsMetadata(0).partitionsMetadata val partitionMetadataOpt = partitionsMetadata.find(p => p.partitionId == partitionId) - if(!partitionMetadataOpt.isDefined) { + if (!partitionMetadataOpt.isDefined) { System.err.println("Error: partition %d does not exist for topic %s".format(partitionId, topic)) System.exit(1) } @@ -142,17 +144,16 @@ object SimpleConsumerShell extends Logging { // validating replica id and initializing target broker var fetchTargetBroker: Broker = null var replicaOpt: Option[Broker] = null - if(replicaId == UseLeaderReplica) { + if (replicaId == UseLeaderReplica) { replicaOpt = partitionMetadataOpt.get.leader - if(!replicaOpt.isDefined) { + if (!replicaOpt.isDefined) { System.err.println("Error: user speicifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(replicaId, topic, partitionId)) System.exit(1) } - } - else { + } else { val replicasForPartition = partitionMetadataOpt.get.replicas replicaOpt = replicasForPartition.find(r => r.id == replicaId) - if(!replicaOpt.isDefined) { + if (!replicaOpt.isDefined) { System.err.println("Error: replica %d does not exist for partition (%s, %d)".format(replicaId, topic, partitionId)) System.exit(1) } @@ -160,16 +161,16 @@ object SimpleConsumerShell extends Logging { fetchTargetBroker = replicaOpt.get // initializing starting offset - if(startingOffset < OffsetRequest.EarliestTime) { + if (startingOffset < OffsetRequest.EarliestTime) { System.err.println("Invalid starting offset: %d".format(startingOffset)) System.exit(1) } if (startingOffset < 0) { val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout, - ConsumerConfig.SocketBufferSize, clientId) + ConsumerConfig.SocketBufferSize, clientId) try { startingOffset = simpleConsumer.earliestOrLatestOffset(TopicAndPartition(topic, partitionId), startingOffset, - Request.DebuggingConsumerId) + Request.DebuggingConsumerId) } catch { case t: Throwable => System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t)) @@ -184,19 +185,19 @@ object SimpleConsumerShell extends Logging { val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(formatterArgs) - val replicaString = if(replicaId > 0) "leader" else "replica" + val replicaString = if (replicaId > 0) "leader" else "replica" info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]" - .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset)) - val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId) + .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset)) + val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64 * 1024, clientId) val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() { def run() { var offset = startingOffset var numMessagesConsumed = 0 try { - while(numMessagesConsumed < maxMessages) { + while (numMessagesConsumed < maxMessages) { val fetchRequest = fetchRequestBuilder - .addFetch(topic, partitionId, offset, fetchSize) - .build() + .addFetch(topic, partitionId, offset, fetchSize) + .build() val fetchResponse = simpleConsumer.fetch(fetchRequest) val messageSet = fetchResponse.messageSet(topic, partitionId) if (messageSet.validBytes <= 0 && noWaitAtEndOfLog) { @@ -204,14 +205,14 @@ object SimpleConsumerShell extends Logging { return } debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset) - for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages)) { + for (messageAndOffset <- messageSet if (numMessagesConsumed < maxMessages)) { try { offset = messageAndOffset.nextOffset - if(printOffsets) + if (printOffsets) System.out.println("next offset = " + offset) val message = messageAndOffset.message - val key = if(message.hasKey) Utils.readBytes(message.key) else null - formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out) + val key = if (message.hasKey) Utils.readBytes(message.key) else null + formatter.writeTo(key, if (message.isNull) null else Utils.readBytes(message.payload), System.out) numMessagesConsumed += 1 } catch { case e: Throwable => @@ -220,7 +221,7 @@ object SimpleConsumerShell extends Logging { else throw e } - if(System.out.checkError()) { + if (System.out.checkError()) { // This means no one is listening to our output stream any more, time to shutdown System.err.println("Unable to write to standard out, closing consumer.") formatter.close() @@ -232,7 +233,7 @@ object SimpleConsumerShell extends Logging { } catch { case e: Throwable => error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic, partitionId, replicaId, offset), e) - }finally { + } finally { info("Consumed " + numMessagesConsumed + " messages") } } -- 1.8.5.2 (Apple Git-48) From b1e292ae90658a6d7d30ce7f198b1a848cc22887 Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Sun, 21 Sep 2014 11:42:43 -0600 Subject: [PATCH 3/8] KAFKA-1618 Exception thrown when running console producer with no port number for the broker --- .../main/scala/kafka/tools/ConsoleConsumer.scala | 112 +++++++------ .../main/scala/kafka/tools/ConsoleProducer.scala | 26 +-- .../main/scala/kafka/tools/GetOffsetShell.scala | 2 +- .../scala/kafka/tools/ProducerPerformance.scala | 4 +- .../main/scala/kafka/tools/ReplayLogProducer.scala | 2 +- .../kafka/tools/ReplicaVerificationTool.scala | 176 ++++++++++----------- .../scala/kafka/tools/SimpleConsumerShell.scala | 2 +- core/src/main/scala/kafka/tools/ToolsUtils.scala | 29 ++++ 8 files changed, 178 insertions(+), 175 deletions(-) create mode 100644 core/src/main/scala/kafka/tools/ToolsUtils.scala diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 323fc85..855586d 100644 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -27,7 +27,7 @@ import kafka.message._ import kafka.serializer._ import kafka.utils._ import kafka.metrics.KafkaMetricsReporter -import kafka.consumer.{Blacklist,Whitelist,ConsumerConfig,Consumer} +import kafka.consumer.{ Blacklist, Whitelist, ConsumerConfig, Consumer } /** * Consumer that dumps messages out to standard out. @@ -38,55 +38,55 @@ object ConsoleConsumer extends Logging { def main(args: Array[String]) { val parser = new OptionParser val topicIdOpt = parser.accepts("topic", "The topic id to consume on.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") - .withRequiredArg - .describedAs("whitelist") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("whitelist") + .ofType(classOf[String]) val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.") - .withRequiredArg - .describedAs("blacklist") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("blacklist") + .ofType(classOf[String]) val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") - .withRequiredArg - .describedAs("config file") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("config file") + .ofType(classOf[String]) val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") - .withRequiredArg - .describedAs("class") - .ofType(classOf[String]) - .defaultsTo(classOf[DefaultMessageFormatter].getName) + .withRequiredArg + .describedAs("class") + .ofType(classOf[String]) + .defaultsTo(classOf[DefaultMessageFormatter].getName) val messageFormatterArgOpt = parser.accepts("property") - .withRequiredArg - .describedAs("prop") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("prop") + .ofType(classOf[String]) val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + - "start with the earliest message present in the log rather than the latest message.") + "start with the earliest message present in the log rather than the latest message.") val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") - .withRequiredArg - .describedAs("num_messages") - .ofType(classOf[java.lang.Integer]) + .withRequiredArg + .describedAs("num_messages") + .ofType(classOf[java.lang.Integer]) val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + - "skip it instead of halt.") + "skip it instead of halt.") val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") + "set, the csv metrics will be outputed here") .withRequiredArg .describedAs("metrics dictory") .ofType(classOf[java.lang.String]) - if(args.length == 0) + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") - + var groupIdPassed = true val options: OptionSet = tryParse(parser, args) CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) @@ -113,46 +113,44 @@ object ConsoleConsumer extends Logging { KafkaMetricsReporter.startReporters(verifiableProps) } - - val consumerProps = if (options.has(consumerConfigOpt)) Utils.loadProps(options.valueOf(consumerConfigOpt)) else new Properties() - if(!consumerProps.containsKey("group.id")) { - consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) - groupIdPassed=false + if (!consumerProps.containsKey("group.id")) { + consumerProps.put("group.id", "console-consumer-" + new Random().nextInt(100000)) + groupIdPassed = false } - consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") + consumerProps.put("auto.offset.reset", if (options.has(resetBeginningOpt)) "smallest" else "largest") consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - if(!consumerProps.containsKey("dual.commit.enabled")) - consumerProps.put("dual.commit.enabled","false") - if(!consumerProps.containsKey("offsets.storage")) - consumerProps.put("offsets.storage","zookeeper") + if (!consumerProps.containsKey("dual.commit.enabled")) + consumerProps.put("dual.commit.enabled", "false") + if (!consumerProps.containsKey("offsets.storage")) + consumerProps.put("offsets.storage", "zookeeper") if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) && - checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) { - System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id") - +". Please use --delete-consumer-offsets to delete previous offsets metadata") + checkZkPathExists(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id") + "/offsets")) { + System.err.println("Found previous offset information for this group " + consumerProps.getProperty("group.id") + + ". Please use --delete-consumer-offsets to delete previous offsets metadata") System.exit(1) } - if(options.has(deleteConsumerOffsetsOpt)) + if (options.has(deleteConsumerOffsetsOpt)) ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id")) val config = new ConsumerConfig(consumerProps) val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) - val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 + val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 val connector = Consumer.create(config) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { connector.shutdown() // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - if(!groupIdPassed) + if (!groupIdPassed) ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id")) } }) @@ -162,12 +160,12 @@ object ConsoleConsumer extends Logging { formatter.init(formatterArgs) try { val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) - val iter = if(maxMessages >= 0) + val iter = if (maxMessages >= 0) stream.slice(0, maxMessages) else stream - for(messageAndTopic <- iter) { + for (messageAndTopic <- iter) { try { formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) numMessages += 1 @@ -178,7 +176,7 @@ object ConsoleConsumer extends Logging { else throw e } - if(System.out.checkError()) { + if (System.out.checkError()) { // This means no one is listening to our output stream any more, time to shutdown System.err.println("Unable to write to standard out, closing consumer.") System.err.println("Consumed %d messages".format(numMessages)) @@ -198,7 +196,7 @@ object ConsoleConsumer extends Logging { def tryParse(parser: OptionParser, args: Array[String]) = { try { - parser.parse(args : _*) + parser.parse(args: _*) } catch { case e: OptionException => { Utils.croak(e.getMessage) @@ -209,7 +207,7 @@ object ConsoleConsumer extends Logging { def checkZkPathExists(zkUrl: String, path: String): Boolean = { try { - val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer); + val zk = new ZkClient(zkUrl, 30 * 1000, 30 * 1000, ZKStringSerializer); zk.exists(path) } catch { case _: Throwable => false @@ -229,16 +227,16 @@ class DefaultMessageFormatter extends MessageFormatter { var lineSeparator = "\n".getBytes override def init(props: Properties) { - if(props.containsKey("print.key")) + if (props.containsKey("print.key")) printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") - if(props.containsKey("key.separator")) + if (props.containsKey("key.separator")) keySeparator = props.getProperty("key.separator").getBytes - if(props.containsKey("line.separator")) + if (props.containsKey("line.separator")) lineSeparator = props.getProperty("line.separator").getBytes } def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { - if(printKey) { + if (printKey) { output.write(if (key == null) "null".getBytes() else key) output.write(keySeparator) } diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 19b7cfc..4a9175f 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -219,9 +219,8 @@ object ConsoleProducer { import scala.collection.JavaConversions._ val useNewProducer = options.has(useNewProducerOpt) val topic = options.valueOf(topicOpt) - val regex = new Regex(":[0-9].*") - - PortValidator.validateAndDie(parser,options.valueOf(brokerListOpt)) + + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) val brokerList = options.valueOf(brokerListOpt) @@ -299,25 +298,4 @@ object ConsoleProducer { } } - object PortValidator { - def validate(hostPort: String) = { - val regex = new Regex(":[0-9]") - - val hostPorts: Array[String] = if (hostPort.contains(',')) - hostPort.split(",") - else - Array(hostPort) - val validHostPort = hostPorts.filter { - hostPortData => - regex.findAllMatchIn(hostPortData).size > 0 - } - - !(validHostPort.isEmpty) && validHostPort.size==hostPorts.length - } - - def validateAndDie(parser:OptionParser,hostPort:String) { - if (!validate(hostPort)) - CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092,check usage below for more \n ") - } - } } diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 1d4358f..2869aaa 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -64,7 +64,7 @@ object GetOffsetShell { CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt) - ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) val clientId = "GetOffsetShell" diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index ea814a7..8458c28 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -125,8 +125,8 @@ object ProducerPerformance extends Logging { val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt) - ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) - + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) + val topicsStr = options.valueOf(topicsOpt) val topics = topicsStr.split(",") val numMessages = options.valueOf(numMessagesOpt).longValue diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 13334bd..43fe852 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -108,7 +108,7 @@ object ReplayLogProducer extends Logging { CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, inputTopicOpt) - ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) val zkConnect = options.valueOf(zkConnectOpt) val brokerList = options.valueOf(brokerListOpt) diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index bf5aba1..bfb739e 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -19,17 +19,17 @@ package kafka.tools import joptsimple.OptionParser import kafka.cluster.Broker -import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet} +import kafka.message.{ MessageSet, MessageAndOffset, ByteBufferMessageSet } import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference import kafka.client.ClientUtils -import java.util.regex.{PatternSyntaxException, Pattern} +import java.util.regex.{ PatternSyntaxException, Pattern } import kafka.api._ import java.text.SimpleDateFormat import java.util.Date -import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.common.{ ErrorMapping, TopicAndPartition } import kafka.utils._ -import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer} +import kafka.consumer.{ ConsumerConfig, Whitelist, SimpleConsumer } /** * For verifying the consistency among replicas. @@ -53,7 +53,7 @@ import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer} */ object ReplicaVerificationTool extends Logging { - val clientId= "replicaVerificationTool" + val clientId = "replicaVerificationTool" val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS" val dateFormat = new SimpleDateFormat(dateFormatString) @@ -64,50 +64,49 @@ object ReplicaVerificationTool extends Logging { def main(args: Array[String]): Unit = { val parser = new OptionParser val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") - .withRequiredArg - .describedAs("hostname:port,...,hostname:port") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("hostname:port,...,hostname:port") + .ofType(classOf[String]) val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.") - .withRequiredArg - .describedAs("bytes") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(ConsumerConfig.FetchSize) + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(ConsumerConfig.FetchSize) val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1000) val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.") - .withRequiredArg - .describedAs("Java regex (String)") - .ofType(classOf[String]) - .defaultsTo(".*") + .withRequiredArg + .describedAs("Java regex (String)") + .ofType(classOf[String]) + .defaultsTo(".*") val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.") - .withRequiredArg - .describedAs("timestamp/-1(latest)/-2(earliest)") - .ofType(classOf[java.lang.Long]) - .defaultsTo(-1L) + .withRequiredArg + .describedAs("timestamp/-1(latest)/-2(earliest)") + .ofType(classOf[java.lang.Long]) + .defaultsTo(-1L) val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Long]) - .defaultsTo(30 * 1000L) - - if(args.length == 0) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Long]) + .defaultsTo(30 * 1000L) + + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") - val options = parser.parse(args : _*) + val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) - - ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) val regex = options.valueOf(topicWhiteListOpt) val topicWhiteListFiler = new Whitelist(regex) try { Pattern.compile(regex) - } - catch { + } catch { case e: PatternSyntaxException => throw new RuntimeException(regex + " is an invalid regex.") } @@ -116,66 +115,64 @@ object ReplicaVerificationTool extends Logging { val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue val reportInterval = options.valueOf(reportIntervalOpt).longValue - - + // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( - topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) - true - else - false - ) + topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) + true + else + false) val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap( topicMetadataResponse => topicMetadataResponse.partitionsMetadata.flatMap( partitionMetadata => partitionMetadata.replicas.map(broker => - TopicPartitionReplica(topic = topicMetadataResponse.topic, partitionId = partitionMetadata.partitionId, replicaId = broker.id)) - ) - ) + TopicPartitionReplica(topic = topicMetadataResponse.topic, partitionId = partitionMetadata.partitionId, replicaId = broker.id)))) debug("Selected topic partitions: " + topicPartitionReplicaList) val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId) - .map { case (brokerId, partitions) => - brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } } + .map { + case (brokerId, partitions) => + brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } + } debug("Topic partitions per broker: " + topicAndPartitionsPerBroker) val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] = - topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) - .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } + topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) + .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( topicMetadataResponse => topicMetadataResponse.partitionsMetadata.map( partitionMetadata => - (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)) - ).groupBy(_._2) - .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { - case(topicAndPartition, leaderId) => topicAndPartition }) + (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id))).groupBy(_._2) + .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { + case (topicAndPartition, leaderId) => topicAndPartition + }) debug("Leaders per broker: " + leadersPerBroker) val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, - leadersPerBroker, - topicAndPartitionsPerBroker.size, - brokerMap, - initialOffsetTime, - reportInterval) + leadersPerBroker, + topicAndPartitionsPerBroker.size, + brokerMap, + initialOffsetTime, + reportInterval) // create all replica fetcher threads val verificationBrokerId = topicAndPartitionsPerBroker.head._1 val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map { case (brokerId, topicAndPartitions) => new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId, - sourceBroker = brokerMap(brokerId), - topicAndPartitions = topicAndPartitions, - replicaBuffer = replicaBuffer, - socketTimeout = 30000, - socketBufferSize = 256000, - fetchSize = fetchSize, - maxWait = maxWaitMs, - minBytes = 1, - doVerification = (brokerId == verificationBrokerId)) + sourceBroker = brokerMap(brokerId), + topicAndPartitions = topicAndPartitions, + replicaBuffer = replicaBuffer, + socketTimeout = 30000, + socketBufferSize = 256000, + fetchSize = fetchSize, + maxWait = maxWaitMs, + minBytes = 1, + doVerification = (brokerId == verificationBrokerId)) } Runtime.getRuntime.addShutdownHook(new Thread() { @@ -190,18 +187,18 @@ object ReplicaVerificationTool extends Logging { } } -private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) +private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) private case class ReplicaAndMessageIterator(replicaId: Int, iterator: Iterator[MessageAndOffset]) private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long) private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int], - leadersPerBroker: Map[Int, Seq[TopicAndPartition]], - expectedNumFetchers: Int, - brokerMap: Map[Int, Broker], - initialOffsetTime: Long, - reportInterval: Long) extends Logging { + leadersPerBroker: Map[Int, Seq[TopicAndPartition]], + expectedNumFetchers: Int, + brokerMap: Map[Int, Broker], + initialOffsetTime: Long, + reportInterval: Long) extends Logging { private val fetchOffsetMap = new Pool[TopicAndPartition, Long] private val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]] private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers)) @@ -245,7 +242,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa val offsetRequest = OffsetRequest(initialOffsetMap) val offsetResponse = consumer.getOffsetsBefore(offsetRequest) assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse)) - offsetResponse.partitionErrorAndOffsets.foreach{ + offsetResponse.partitionErrorAndOffsets.foreach { case (topicAndPartition, partitionOffsetResponse) => fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head) } @@ -266,18 +263,19 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) { debug("Verifying " + topicAndPartition) assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition), - "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " - + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") + "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " + + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") val messageIteratorMap = fetchResponsePerReplica.map { - case(replicaId, fetchResponse) => - replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator} + case (replicaId, fetchResponse) => + replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator + } val maxHw = fetchResponsePerReplica.values.map(_.hw).max // Iterate one message at a time from every replica, until high watermark is reached. var isMessageInAllReplicas = true while (isMessageInAllReplicas) { var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None - for ( (replicaId, messageIterator) <- messageIteratorMap) { + for ((replicaId, messageIterator) <- messageIteratorMap) { try { if (messageIterator.hasNext) { val messageAndOffset = messageIterator.next() @@ -289,7 +287,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa messageInfoFromFirstReplicaOpt match { case None => messageInfoFromFirstReplicaOpt = Some( - MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum)) + MessageInfo(replicaId, messageAndOffset.offset, messageAndOffset.nextOffset, messageAndOffset.message.checksum)) case Some(messageInfoFromFirstReplica) => if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) { println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition @@ -310,14 +308,14 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } catch { case t: Throwable => throw new RuntimeException("Error in processing replica %d in partition %s at offset %d." - .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) + .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) } } if (isMessageInAllReplicas) { val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset fetchOffsetMap.put(topicAndPartition, nextOffset) debug(expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas match at offset " + - nextOffset + " for " + topicAndPartition) + nextOffset + " for " + topicAndPartition) } } if (maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) { @@ -338,15 +336,15 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartitions: Iterable[TopicAndPartition], - replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) + replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, + fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) extends ShutdownableThread(name) { val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, ReplicaVerificationTool.clientId) val fetchRequestBuilder = new FetchRequestBuilder(). - clientId(ReplicaVerificationTool.clientId). - replicaId(Request.DebuggingConsumerId). - maxWait(maxWait). - minBytes(minBytes) + clientId(ReplicaVerificationTool.clientId). + replicaId(Request.DebuggingConsumerId). + maxWait(maxWait). + minBytes(minBytes) override def doWork() { @@ -371,7 +369,7 @@ private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartiti if (response != null) { response.data.foreach { - case(topicAndPartition, partitionData) => + case (topicAndPartition, partitionData) => replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData) } } else { diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index fd80dae..223d1b3 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -100,7 +100,7 @@ object SimpleConsumerShell extends Logging { val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, partitionIdOpt) - ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) val topic = options.valueOf(topicOpt) val partitionId = options.valueOf(partitionIdOpt).intValue() diff --git a/core/src/main/scala/kafka/tools/ToolsUtils.scala b/core/src/main/scala/kafka/tools/ToolsUtils.scala new file mode 100644 index 0000000..6cebb67 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ToolsUtils.scala @@ -0,0 +1,29 @@ +package kafka.tools + +import scala.util.matching.Regex +import kafka.utils.CommandLineUtils +import joptsimple.OptionParser + +object ToolsUtils extends App { + + private def validate(hostPort: String) = { + val regex = new Regex(":[0-9]+$") + + val hostPorts: Array[String] = if(hostPort.contains(',')) + hostPort.split(",") + else + Array(hostPort) + val validHostPort = hostPorts.filter { + hostPortData => + regex.findAllMatchIn(hostPortData).size > 0 + } + + !(validHostPort.isEmpty) && validHostPort.size == hostPorts.length + } + + def validatePortAndDie(parser: OptionParser, hostPort: String) { + if (!validate(hostPort)) + CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092,check usage below for more \n ") + } + +} \ No newline at end of file -- 1.8.5.2 (Apple Git-48) From 936cc153d5a20bc16cd4928f1bb111a692917d7f Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Sun, 21 Sep 2014 13:03:28 -0600 Subject: [PATCH 4/8] Revert "KAFKA-1618 Exception thrown when running console producer with no port number for the broker" This reverts commit b1e292ae90658a6d7d30ce7f198b1a848cc22887. --- .../main/scala/kafka/tools/ConsoleConsumer.scala | 112 ++++++------- .../main/scala/kafka/tools/ConsoleProducer.scala | 26 ++- .../main/scala/kafka/tools/GetOffsetShell.scala | 2 +- .../scala/kafka/tools/ProducerPerformance.scala | 4 +- .../main/scala/kafka/tools/ReplayLogProducer.scala | 2 +- .../kafka/tools/ReplicaVerificationTool.scala | 176 +++++++++++---------- .../scala/kafka/tools/SimpleConsumerShell.scala | 2 +- core/src/main/scala/kafka/tools/ToolsUtils.scala | 29 ---- 8 files changed, 175 insertions(+), 178 deletions(-) delete mode 100644 core/src/main/scala/kafka/tools/ToolsUtils.scala diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 855586d..323fc85 100644 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -27,7 +27,7 @@ import kafka.message._ import kafka.serializer._ import kafka.utils._ import kafka.metrics.KafkaMetricsReporter -import kafka.consumer.{ Blacklist, Whitelist, ConsumerConfig, Consumer } +import kafka.consumer.{Blacklist,Whitelist,ConsumerConfig,Consumer} /** * Consumer that dumps messages out to standard out. @@ -38,55 +38,55 @@ object ConsoleConsumer extends Logging { def main(args: Array[String]) { val parser = new OptionParser val topicIdOpt = parser.accepts("topic", "The topic id to consume on.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") - .withRequiredArg - .describedAs("whitelist") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("whitelist") + .ofType(classOf[String]) val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.") - .withRequiredArg - .describedAs("blacklist") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("blacklist") + .ofType(classOf[String]) val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") - .withRequiredArg - .describedAs("config file") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("config file") + .ofType(classOf[String]) val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") - .withRequiredArg - .describedAs("class") - .ofType(classOf[String]) - .defaultsTo(classOf[DefaultMessageFormatter].getName) + .withRequiredArg + .describedAs("class") + .ofType(classOf[String]) + .defaultsTo(classOf[DefaultMessageFormatter].getName) val messageFormatterArgOpt = parser.accepts("property") - .withRequiredArg - .describedAs("prop") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("prop") + .ofType(classOf[String]) val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + - "start with the earliest message present in the log rather than the latest message.") + "start with the earliest message present in the log rather than the latest message.") val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") - .withRequiredArg - .describedAs("num_messages") - .ofType(classOf[java.lang.Integer]) + .withRequiredArg + .describedAs("num_messages") + .ofType(classOf[java.lang.Integer]) val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + - "skip it instead of halt.") + "skip it instead of halt.") val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") + "set, the csv metrics will be outputed here") .withRequiredArg .describedAs("metrics dictory") .ofType(classOf[java.lang.String]) - if (args.length == 0) + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") - + var groupIdPassed = true val options: OptionSet = tryParse(parser, args) CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) @@ -113,44 +113,46 @@ object ConsoleConsumer extends Logging { KafkaMetricsReporter.startReporters(verifiableProps) } + + val consumerProps = if (options.has(consumerConfigOpt)) Utils.loadProps(options.valueOf(consumerConfigOpt)) else new Properties() - if (!consumerProps.containsKey("group.id")) { - consumerProps.put("group.id", "console-consumer-" + new Random().nextInt(100000)) - groupIdPassed = false + if(!consumerProps.containsKey("group.id")) { + consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) + groupIdPassed=false } - consumerProps.put("auto.offset.reset", if (options.has(resetBeginningOpt)) "smallest" else "largest") + consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - if (!consumerProps.containsKey("dual.commit.enabled")) - consumerProps.put("dual.commit.enabled", "false") - if (!consumerProps.containsKey("offsets.storage")) - consumerProps.put("offsets.storage", "zookeeper") + if(!consumerProps.containsKey("dual.commit.enabled")) + consumerProps.put("dual.commit.enabled","false") + if(!consumerProps.containsKey("offsets.storage")) + consumerProps.put("offsets.storage","zookeeper") if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) && - checkZkPathExists(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id") + "/offsets")) { - System.err.println("Found previous offset information for this group " + consumerProps.getProperty("group.id") - + ". Please use --delete-consumer-offsets to delete previous offsets metadata") + checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) { + System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id") + +". Please use --delete-consumer-offsets to delete previous offsets metadata") System.exit(1) } - if (options.has(deleteConsumerOffsetsOpt)) + if(options.has(deleteConsumerOffsetsOpt)) ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id")) val config = new ConsumerConfig(consumerProps) val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) - val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 + val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 val connector = Consumer.create(config) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { connector.shutdown() // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - if (!groupIdPassed) + if(!groupIdPassed) ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id")) } }) @@ -160,12 +162,12 @@ object ConsoleConsumer extends Logging { formatter.init(formatterArgs) try { val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) - val iter = if (maxMessages >= 0) + val iter = if(maxMessages >= 0) stream.slice(0, maxMessages) else stream - for (messageAndTopic <- iter) { + for(messageAndTopic <- iter) { try { formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) numMessages += 1 @@ -176,7 +178,7 @@ object ConsoleConsumer extends Logging { else throw e } - if (System.out.checkError()) { + if(System.out.checkError()) { // This means no one is listening to our output stream any more, time to shutdown System.err.println("Unable to write to standard out, closing consumer.") System.err.println("Consumed %d messages".format(numMessages)) @@ -196,7 +198,7 @@ object ConsoleConsumer extends Logging { def tryParse(parser: OptionParser, args: Array[String]) = { try { - parser.parse(args: _*) + parser.parse(args : _*) } catch { case e: OptionException => { Utils.croak(e.getMessage) @@ -207,7 +209,7 @@ object ConsoleConsumer extends Logging { def checkZkPathExists(zkUrl: String, path: String): Boolean = { try { - val zk = new ZkClient(zkUrl, 30 * 1000, 30 * 1000, ZKStringSerializer); + val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer); zk.exists(path) } catch { case _: Throwable => false @@ -227,16 +229,16 @@ class DefaultMessageFormatter extends MessageFormatter { var lineSeparator = "\n".getBytes override def init(props: Properties) { - if (props.containsKey("print.key")) + if(props.containsKey("print.key")) printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") - if (props.containsKey("key.separator")) + if(props.containsKey("key.separator")) keySeparator = props.getProperty("key.separator").getBytes - if (props.containsKey("line.separator")) + if(props.containsKey("line.separator")) lineSeparator = props.getProperty("line.separator").getBytes } def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { - if (printKey) { + if(printKey) { output.write(if (key == null) "null".getBytes() else key) output.write(keySeparator) } diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 4a9175f..19b7cfc 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -219,8 +219,9 @@ object ConsoleProducer { import scala.collection.JavaConversions._ val useNewProducer = options.has(useNewProducerOpt) val topic = options.valueOf(topicOpt) - - ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) + val regex = new Regex(":[0-9].*") + + PortValidator.validateAndDie(parser,options.valueOf(brokerListOpt)) val brokerList = options.valueOf(brokerListOpt) @@ -298,4 +299,25 @@ object ConsoleProducer { } } + object PortValidator { + def validate(hostPort: String) = { + val regex = new Regex(":[0-9]") + + val hostPorts: Array[String] = if (hostPort.contains(',')) + hostPort.split(",") + else + Array(hostPort) + val validHostPort = hostPorts.filter { + hostPortData => + regex.findAllMatchIn(hostPortData).size > 0 + } + + !(validHostPort.isEmpty) && validHostPort.size==hostPorts.length + } + + def validateAndDie(parser:OptionParser,hostPort:String) { + if (!validate(hostPort)) + CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092,check usage below for more \n ") + } + } } diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 2869aaa..1d4358f 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -64,7 +64,7 @@ object GetOffsetShell { CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt) - ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) val clientId = "GetOffsetShell" diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index 8458c28..ea814a7 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -125,8 +125,8 @@ object ProducerPerformance extends Logging { val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt) - ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) - + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + val topicsStr = options.valueOf(topicsOpt) val topics = topicsStr.split(",") val numMessages = options.valueOf(numMessagesOpt).longValue diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 43fe852..13334bd 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -108,7 +108,7 @@ object ReplayLogProducer extends Logging { CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, inputTopicOpt) - ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) val zkConnect = options.valueOf(zkConnectOpt) val brokerList = options.valueOf(brokerListOpt) diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index bfb739e..bf5aba1 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -19,17 +19,17 @@ package kafka.tools import joptsimple.OptionParser import kafka.cluster.Broker -import kafka.message.{ MessageSet, MessageAndOffset, ByteBufferMessageSet } +import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet} import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference import kafka.client.ClientUtils -import java.util.regex.{ PatternSyntaxException, Pattern } +import java.util.regex.{PatternSyntaxException, Pattern} import kafka.api._ import java.text.SimpleDateFormat import java.util.Date -import kafka.common.{ ErrorMapping, TopicAndPartition } +import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.utils._ -import kafka.consumer.{ ConsumerConfig, Whitelist, SimpleConsumer } +import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer} /** * For verifying the consistency among replicas. @@ -53,7 +53,7 @@ import kafka.consumer.{ ConsumerConfig, Whitelist, SimpleConsumer } */ object ReplicaVerificationTool extends Logging { - val clientId = "replicaVerificationTool" + val clientId= "replicaVerificationTool" val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS" val dateFormat = new SimpleDateFormat(dateFormatString) @@ -64,49 +64,50 @@ object ReplicaVerificationTool extends Logging { def main(args: Array[String]): Unit = { val parser = new OptionParser val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") - .withRequiredArg - .describedAs("hostname:port,...,hostname:port") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("hostname:port,...,hostname:port") + .ofType(classOf[String]) val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.") - .withRequiredArg - .describedAs("bytes") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(ConsumerConfig.FetchSize) + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(ConsumerConfig.FetchSize) val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1000) val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.") - .withRequiredArg - .describedAs("Java regex (String)") - .ofType(classOf[String]) - .defaultsTo(".*") + .withRequiredArg + .describedAs("Java regex (String)") + .ofType(classOf[String]) + .defaultsTo(".*") val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.") - .withRequiredArg - .describedAs("timestamp/-1(latest)/-2(earliest)") - .ofType(classOf[java.lang.Long]) - .defaultsTo(-1L) + .withRequiredArg + .describedAs("timestamp/-1(latest)/-2(earliest)") + .ofType(classOf[java.lang.Long]) + .defaultsTo(-1L) val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Long]) - .defaultsTo(30 * 1000L) - - if (args.length == 0) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Long]) + .defaultsTo(30 * 1000L) + + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") - val options = parser.parse(args: _*) + val options = parser.parse(args : _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) - - ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) + + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) val regex = options.valueOf(topicWhiteListOpt) val topicWhiteListFiler = new Whitelist(regex) try { Pattern.compile(regex) - } catch { + } + catch { case e: PatternSyntaxException => throw new RuntimeException(regex + " is an invalid regex.") } @@ -115,64 +116,66 @@ object ReplicaVerificationTool extends Logging { val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue val reportInterval = options.valueOf(reportIntervalOpt).longValue - + + // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( - topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) - true - else - false) + topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) + true + else + false + ) val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap( topicMetadataResponse => topicMetadataResponse.partitionsMetadata.flatMap( partitionMetadata => partitionMetadata.replicas.map(broker => - TopicPartitionReplica(topic = topicMetadataResponse.topic, partitionId = partitionMetadata.partitionId, replicaId = broker.id)))) + TopicPartitionReplica(topic = topicMetadataResponse.topic, partitionId = partitionMetadata.partitionId, replicaId = broker.id)) + ) + ) debug("Selected topic partitions: " + topicPartitionReplicaList) val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId) - .map { - case (brokerId, partitions) => - brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } - } + .map { case (brokerId, partitions) => + brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } } debug("Topic partitions per broker: " + topicAndPartitionsPerBroker) val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] = - topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) - .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } + topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) + .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( topicMetadataResponse => topicMetadataResponse.partitionsMetadata.map( partitionMetadata => - (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id))).groupBy(_._2) - .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { - case (topicAndPartition, leaderId) => topicAndPartition - }) + (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)) + ).groupBy(_._2) + .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { + case(topicAndPartition, leaderId) => topicAndPartition }) debug("Leaders per broker: " + leadersPerBroker) val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, - leadersPerBroker, - topicAndPartitionsPerBroker.size, - brokerMap, - initialOffsetTime, - reportInterval) + leadersPerBroker, + topicAndPartitionsPerBroker.size, + brokerMap, + initialOffsetTime, + reportInterval) // create all replica fetcher threads val verificationBrokerId = topicAndPartitionsPerBroker.head._1 val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map { case (brokerId, topicAndPartitions) => new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId, - sourceBroker = brokerMap(brokerId), - topicAndPartitions = topicAndPartitions, - replicaBuffer = replicaBuffer, - socketTimeout = 30000, - socketBufferSize = 256000, - fetchSize = fetchSize, - maxWait = maxWaitMs, - minBytes = 1, - doVerification = (brokerId == verificationBrokerId)) + sourceBroker = brokerMap(brokerId), + topicAndPartitions = topicAndPartitions, + replicaBuffer = replicaBuffer, + socketTimeout = 30000, + socketBufferSize = 256000, + fetchSize = fetchSize, + maxWait = maxWaitMs, + minBytes = 1, + doVerification = (brokerId == verificationBrokerId)) } Runtime.getRuntime.addShutdownHook(new Thread() { @@ -187,18 +190,18 @@ object ReplicaVerificationTool extends Logging { } } -private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) +private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) private case class ReplicaAndMessageIterator(replicaId: Int, iterator: Iterator[MessageAndOffset]) private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long) private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int], - leadersPerBroker: Map[Int, Seq[TopicAndPartition]], - expectedNumFetchers: Int, - brokerMap: Map[Int, Broker], - initialOffsetTime: Long, - reportInterval: Long) extends Logging { + leadersPerBroker: Map[Int, Seq[TopicAndPartition]], + expectedNumFetchers: Int, + brokerMap: Map[Int, Broker], + initialOffsetTime: Long, + reportInterval: Long) extends Logging { private val fetchOffsetMap = new Pool[TopicAndPartition, Long] private val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]] private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers)) @@ -242,7 +245,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa val offsetRequest = OffsetRequest(initialOffsetMap) val offsetResponse = consumer.getOffsetsBefore(offsetRequest) assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse)) - offsetResponse.partitionErrorAndOffsets.foreach { + offsetResponse.partitionErrorAndOffsets.foreach{ case (topicAndPartition, partitionOffsetResponse) => fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head) } @@ -263,19 +266,18 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) { debug("Verifying " + topicAndPartition) assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition), - "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " - + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") + "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " + + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") val messageIteratorMap = fetchResponsePerReplica.map { - case (replicaId, fetchResponse) => - replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator - } + case(replicaId, fetchResponse) => + replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator} val maxHw = fetchResponsePerReplica.values.map(_.hw).max // Iterate one message at a time from every replica, until high watermark is reached. var isMessageInAllReplicas = true while (isMessageInAllReplicas) { var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None - for ((replicaId, messageIterator) <- messageIteratorMap) { + for ( (replicaId, messageIterator) <- messageIteratorMap) { try { if (messageIterator.hasNext) { val messageAndOffset = messageIterator.next() @@ -287,7 +289,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa messageInfoFromFirstReplicaOpt match { case None => messageInfoFromFirstReplicaOpt = Some( - MessageInfo(replicaId, messageAndOffset.offset, messageAndOffset.nextOffset, messageAndOffset.message.checksum)) + MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum)) case Some(messageInfoFromFirstReplica) => if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) { println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition @@ -308,14 +310,14 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } catch { case t: Throwable => throw new RuntimeException("Error in processing replica %d in partition %s at offset %d." - .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) + .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) } } if (isMessageInAllReplicas) { val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset fetchOffsetMap.put(topicAndPartition, nextOffset) debug(expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas match at offset " + - nextOffset + " for " + topicAndPartition) + nextOffset + " for " + topicAndPartition) } } if (maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) { @@ -336,15 +338,15 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartitions: Iterable[TopicAndPartition], - replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) + replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, + fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) extends ShutdownableThread(name) { val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, ReplicaVerificationTool.clientId) val fetchRequestBuilder = new FetchRequestBuilder(). - clientId(ReplicaVerificationTool.clientId). - replicaId(Request.DebuggingConsumerId). - maxWait(maxWait). - minBytes(minBytes) + clientId(ReplicaVerificationTool.clientId). + replicaId(Request.DebuggingConsumerId). + maxWait(maxWait). + minBytes(minBytes) override def doWork() { @@ -369,7 +371,7 @@ private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartiti if (response != null) { response.data.foreach { - case (topicAndPartition, partitionData) => + case(topicAndPartition, partitionData) => replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData) } } else { diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 223d1b3..fd80dae 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -100,7 +100,7 @@ object SimpleConsumerShell extends Logging { val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, partitionIdOpt) - ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) + ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) val topic = options.valueOf(topicOpt) val partitionId = options.valueOf(partitionIdOpt).intValue() diff --git a/core/src/main/scala/kafka/tools/ToolsUtils.scala b/core/src/main/scala/kafka/tools/ToolsUtils.scala deleted file mode 100644 index 6cebb67..0000000 --- a/core/src/main/scala/kafka/tools/ToolsUtils.scala +++ /dev/null @@ -1,29 +0,0 @@ -package kafka.tools - -import scala.util.matching.Regex -import kafka.utils.CommandLineUtils -import joptsimple.OptionParser - -object ToolsUtils extends App { - - private def validate(hostPort: String) = { - val regex = new Regex(":[0-9]+$") - - val hostPorts: Array[String] = if(hostPort.contains(',')) - hostPort.split(",") - else - Array(hostPort) - val validHostPort = hostPorts.filter { - hostPortData => - regex.findAllMatchIn(hostPortData).size > 0 - } - - !(validHostPort.isEmpty) && validHostPort.size == hostPorts.length - } - - def validatePortAndDie(parser: OptionParser, hostPort: String) { - if (!validate(hostPort)) - CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092,check usage below for more \n ") - } - -} \ No newline at end of file -- 1.8.5.2 (Apple Git-48) From f3613eb2ed9c7706cc4460ce3dab86d6ac305106 Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Sun, 21 Sep 2014 13:09:58 -0600 Subject: [PATCH 5/8] KAFKA-1618 Exception thrown when running console producer with no port --- .../main/scala/kafka/tools/ConsoleProducer.scala | 25 ++----------------- .../main/scala/kafka/tools/GetOffsetShell.scala | 2 +- .../scala/kafka/tools/ProducerPerformance.scala | 2 +- .../main/scala/kafka/tools/ReplayLogProducer.scala | 2 +- .../kafka/tools/ReplicaVerificationTool.scala | 2 +- .../scala/kafka/tools/SimpleConsumerShell.scala | 2 +- core/src/main/scala/kafka/tools/ToolsUtils.scala | 29 ++++++++++++++++++++++ 7 files changed, 36 insertions(+), 28 deletions(-) create mode 100644 core/src/main/scala/kafka/tools/ToolsUtils.scala diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 19b7cfc..8ac85c4 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -220,8 +220,8 @@ object ConsoleProducer { val useNewProducer = options.has(useNewProducerOpt) val topic = options.valueOf(topicOpt) val regex = new Regex(":[0-9].*") - - PortValidator.validateAndDie(parser,options.valueOf(brokerListOpt)) + + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) val brokerList = options.valueOf(brokerListOpt) @@ -299,25 +299,4 @@ object ConsoleProducer { } } - object PortValidator { - def validate(hostPort: String) = { - val regex = new Regex(":[0-9]") - - val hostPorts: Array[String] = if (hostPort.contains(',')) - hostPort.split(",") - else - Array(hostPort) - val validHostPort = hostPorts.filter { - hostPortData => - regex.findAllMatchIn(hostPortData).size > 0 - } - - !(validHostPort.isEmpty) && validHostPort.size==hostPorts.length - } - - def validateAndDie(parser:OptionParser,hostPort:String) { - if (!validate(hostPort)) - CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092,check usage below for more \n ") - } - } } diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 1d4358f..2869aaa 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -64,7 +64,7 @@ object GetOffsetShell { CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt) - ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) val clientId = "GetOffsetShell" diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index ea814a7..ac00766 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -125,7 +125,7 @@ object ProducerPerformance extends Logging { val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt) - ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) val topicsStr = options.valueOf(topicsOpt) val topics = topicsStr.split(",") diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 13334bd..43fe852 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -108,7 +108,7 @@ object ReplayLogProducer extends Logging { CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, inputTopicOpt) - ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) val zkConnect = options.valueOf(zkConnectOpt) val brokerList = options.valueOf(brokerListOpt) diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index bf5aba1..7b210df 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -99,7 +99,7 @@ object ReplicaVerificationTool extends Logging { val options = parser.parse(args : _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) - ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) val regex = options.valueOf(topicWhiteListOpt) val topicWhiteListFiler = new Whitelist(regex) diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index fd80dae..223d1b3 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -100,7 +100,7 @@ object SimpleConsumerShell extends Logging { val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, partitionIdOpt) - ConsoleProducer.PortValidator.validateAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) val topic = options.valueOf(topicOpt) val partitionId = options.valueOf(partitionIdOpt).intValue() diff --git a/core/src/main/scala/kafka/tools/ToolsUtils.scala b/core/src/main/scala/kafka/tools/ToolsUtils.scala new file mode 100644 index 0000000..6cebb67 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ToolsUtils.scala @@ -0,0 +1,29 @@ +package kafka.tools + +import scala.util.matching.Regex +import kafka.utils.CommandLineUtils +import joptsimple.OptionParser + +object ToolsUtils extends App { + + private def validate(hostPort: String) = { + val regex = new Regex(":[0-9]+$") + + val hostPorts: Array[String] = if(hostPort.contains(',')) + hostPort.split(",") + else + Array(hostPort) + val validHostPort = hostPorts.filter { + hostPortData => + regex.findAllMatchIn(hostPortData).size > 0 + } + + !(validHostPort.isEmpty) && validHostPort.size == hostPorts.length + } + + def validatePortAndDie(parser: OptionParser, hostPort: String) { + if (!validate(hostPort)) + CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092,check usage below for more \n ") + } + +} \ No newline at end of file -- 1.8.5.2 (Apple Git-48) From 0ea224e8c5fea9c191855909b05f9120be89a750 Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Sun, 21 Sep 2014 13:14:48 -0600 Subject: [PATCH 6/8] KAFKA-1618 Exception thrown when running console producer with no port- removed regex variable --- core/src/main/scala/kafka/tools/ConsoleProducer.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 8ac85c4..4a9175f 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -219,7 +219,6 @@ object ConsoleProducer { import scala.collection.JavaConversions._ val useNewProducer = options.has(useNewProducerOpt) val topic = options.valueOf(topicOpt) - val regex = new Regex(":[0-9].*") ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) -- 1.8.5.2 (Apple Git-48) From 9f6406358ee10846c2bf3887a7f8a0f50206d4ca Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Mon, 22 Sep 2014 21:53:32 -0600 Subject: [PATCH 7/8] KAFKA-1618 Exception thrown when running console producer with no port number for the broker --- .../main/scala/kafka/tools/ConsoleProducer.scala | 55 ++++--- .../main/scala/kafka/tools/GetOffsetShell.scala | 8 +- .../scala/kafka/tools/ProducerPerformance.scala | 46 +++--- .../main/scala/kafka/tools/ReplayLogProducer.scala | 15 +- .../kafka/tools/ReplicaVerificationTool.scala | 174 ++++++++++----------- .../scala/kafka/tools/SimpleConsumerShell.scala | 40 ++--- core/src/main/scala/kafka/tools/ToolsUtils.scala | 2 +- 7 files changed, 173 insertions(+), 167 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 4a9175f..175da1a 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,13 +19,13 @@ package kafka.tools import kafka.common._ import kafka.message._ +import kafka.producer.{KeyedMessage, NewShinyProducer, OldProducer} import kafka.serializer._ import kafka.utils.CommandLineUtils -import kafka.producer.{ NewShinyProducer, OldProducer, KeyedMessage } -import java.util.Properties import java.io._ +import java.util.Properties import joptsimple._ -import scala.util.matching.Regex + object ConsoleProducer { @@ -40,7 +40,7 @@ object ConsoleProducer { try { val producer = - if (config.useNewProducer) { + if(config.useNewProducer) { import org.apache.kafka.clients.producer.ProducerConfig props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) @@ -53,7 +53,7 @@ object ConsoleProducer { props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) - if (config.queueEnqueueTimeoutMs != -1) + if(config.queueEnqueueTimeoutMs != -1) props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) @@ -63,7 +63,7 @@ object ConsoleProducer { } else { props.put("metadata.broker.list", config.brokerList) props.put("compression.codec", config.compressionCodec) - props.put("producer.type", if (config.sync) "sync" else "async") + props.put("producer.type", if(config.sync) "sync" else "async") props.put("batch.num.messages", config.batchSize.toString) props.put("message.send.max.retries", config.messageSendMaxRetries.toString) props.put("retry.backoff.ms", config.retryBackoffMs.toString) @@ -90,7 +90,7 @@ object ConsoleProducer { var message: KeyedMessage[Array[Byte], Array[Byte]] = null do { message = reader.readMessage() - if (message != null) + if(message != null) producer.send(message.topic, message.key, message.message) } while (message != null) } catch { @@ -101,6 +101,14 @@ object ConsoleProducer { System.exit(0) } + trait MessageReader { + def init(inputStream: InputStream, props: Properties) {} + + def readMessage(): KeyedMessage[Array[Byte], Array[Byte]] + + def close() {} + } + class ProducerConfig(args: Array[String]) { val parser = new OptionParser val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.") @@ -130,24 +138,24 @@ object ConsoleProducer { val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.") .withRequiredArg .ofType(classOf[java.lang.Long]) - .defaultsTo(100) + .defaultsTo(100L) val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + " a message will queue awaiting suffient batch size. The value is given in ms.") .withRequiredArg .describedAs("timeout_ms") .ofType(classOf[java.lang.Long]) - .defaultsTo(1000) + .defaultsTo(1000L) val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " + " messages will queue awaiting suffient batch size.") .withRequiredArg .describedAs("queue_size") .ofType(classOf[java.lang.Long]) - .defaultsTo(10000) + .defaultsTo(10000L) val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue") .withRequiredArg .describedAs("queue enqueuetimeout ms") .ofType(classOf[java.lang.Long]) - .defaultsTo(Int.MaxValue) + .defaultsTo(Long.MaxValue) val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests") .withRequiredArg .describedAs("request required acks") @@ -212,11 +220,12 @@ object ConsoleProducer { val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") val options = parser.parse(args: _*) - if (args.length == 0) + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.") CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt) import scala.collection.JavaConversions._ + val useNewProducer = options.has(useNewProducerOpt) val topic = options.valueOf(topicOpt) @@ -226,8 +235,8 @@ object ConsoleProducer { val sync = options.has(syncOpt) val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) - val compressionCodec = if (options.has(compressionCodecOpt)) - if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty) + val compressionCodec = if(options.has(compressionCodecOpt)) + if(compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty) DefaultCompressionCodec.name else compressionCodecOptionValue else NoCompressionCodec.name @@ -251,12 +260,6 @@ object ConsoleProducer { val metadataFetchTimeoutMs = options.valueOf(metadataFetchTimeoutMsOpt) } - trait MessageReader { - def init(inputStream: InputStream, props: Properties) {} - def readMessage(): KeyedMessage[Array[Byte], Array[Byte]] - def close() {} - } - class LineMessageReader extends MessageReader { var topic: String = null var reader: BufferedReader = null @@ -267,11 +270,11 @@ object ConsoleProducer { override def init(inputStream: InputStream, props: Properties) { topic = props.getProperty("topic") - if (props.containsKey("parse.key")) + if(props.containsKey("parse.key")) parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true") - if (props.containsKey("key.separator")) + if(props.containsKey("key.separator")) keySeparator = props.getProperty("key.separator") - if (props.containsKey("ignore.error")) + if(props.containsKey("ignore.error")) ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true") reader = new BufferedReader(new InputStreamReader(inputStream)) } @@ -283,14 +286,14 @@ object ConsoleProducer { case (line, true) => line.indexOf(keySeparator) match { case -1 => - if (ignoreError) + if(ignoreError) new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) else throw new KafkaException("No key found on line " + lineNumber + ": " + line) case n => new KeyedMessage[Array[Byte], Array[Byte]](topic, line.substring(0, n).getBytes, - (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes()) + (if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes()) } case (line, false) => new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 2869aaa..81f9b4b 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -20,7 +20,7 @@ package kafka.tools import kafka.consumer._ import joptsimple._ -import kafka.api.{ PartitionOffsetRequestInfo, OffsetRequest } +import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} import kafka.common.TopicAndPartition import kafka.client.ClientUtils import kafka.utils.CommandLineUtils @@ -57,7 +57,7 @@ object GetOffsetShell { .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) - if (args.length == 0) + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") val options = parser.parse(args: _*) @@ -76,13 +76,13 @@ object GetOffsetShell { val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata - if (topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { + if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + "kafka-list-topic.sh to verify") System.exit(1) } val partitions = - if (partitionList == "") { + if(partitionList == "") { topicsMetadata.head.partitionsMetadata.map(_.partitionId) } else { partitionList.split(",").map(_.toInt).toSeq diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index ac00766..8f56353 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,12 +18,12 @@ package kafka.tools import kafka.metrics.KafkaMetricsReporter -import kafka.producer.{ OldProducer, NewShinyProducer } -import kafka.utils.{ VerifiableProperties, Logging, CommandLineUtils } +import kafka.producer.{OldProducer, NewShinyProducer} +import kafka.utils.{VerifiableProperties, Logging, CommandLineUtils} import kafka.message.CompressionCodec import kafka.serializer._ -import java.util.concurrent.{ CountDownLatch, Executors } +import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.AtomicLong import java.util._ import java.text.SimpleDateFormat @@ -41,7 +41,7 @@ object ProducerPerformance extends Logging { val logger = Logger.getLogger(getClass) val config = new ProducerPerfConfig(args) - if (!config.isFixedSize) + if(!config.isFixedSize) logger.info("WARN: Throughput will be slower due to changing message size per request") val totalBytesSent = new AtomicLong(0) @@ -51,7 +51,7 @@ object ProducerPerformance extends Logging { val startMs = System.currentTimeMillis val rand = new java.util.Random - if (!config.hideHeader) + if(!config.hideHeader) println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " + "total.data.sent.in.nMsg, nMsg.sec") @@ -125,8 +125,8 @@ object ProducerPerformance extends Logging { val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt) - ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) - + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) + val topicsStr = options.valueOf(topicsOpt) val topics = topicsStr.split(",") val numMessages = options.valueOf(numMessagesOpt).longValue @@ -143,7 +143,7 @@ object ProducerPerformance extends Logging { val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) val seqIdMode = options.has(initialMessageIdOpt) var initialMessageId: Int = 0 - if (seqIdMode) + if(seqIdMode) initialMessageId = options.valueOf(initialMessageIdOpt).intValue() val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue() val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue() @@ -153,11 +153,11 @@ object ProducerPerformance extends Logging { val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) - if (csvMetricsReporterEnabled) { + if(csvMetricsReporterEnabled) { val props = new Properties() props.put("kafka.metrics.polling.interval.secs", "1") props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") - if (options.has(metricsDirectoryOpt)) + if(options.has(metricsDirectoryOpt)) props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) else props.put("kafka.csv.metrics.dir", "kafka_metrics") @@ -170,18 +170,18 @@ object ProducerPerformance extends Logging { } class ProducerThread(val threadId: Int, - val config: ProducerPerfConfig, - val totalBytesSent: AtomicLong, - val totalMessagesSent: AtomicLong, - val allDone: CountDownLatch, - val rand: Random) extends Runnable { + val config: ProducerPerfConfig, + val totalBytesSent: AtomicLong, + val totalMessagesSent: AtomicLong, + val allDone: CountDownLatch, + val rand: Random) extends Runnable { val seqIdNumDigit = 10 // no. of digits for max int value val messagesPerThread = config.numMessages / config.numThreads debug("Messages per thread = " + messagesPerThread) val props = new Properties() val producer = - if (config.useNewProducer) { + if(config.useNewProducer) { import org.apache.kafka.clients.producer.ProducerConfig props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) @@ -196,7 +196,7 @@ object ProducerPerformance extends Logging { props.put("metadata.broker.list", config.brokerList) props.put("compression.codec", config.compressionCodec.codec.toString) props.put("send.buffer.bytes", (64 * 1024).toString) - if (!config.isSync) { + if(!config.isSync) { props.put("producer.type", "async") props.put("batch.num.messages", config.batchSize.toString) props.put("queue.enqueue.timeout.ms", "-1") @@ -212,7 +212,8 @@ object ProducerPerformance extends Logging { } // generate the sequential message ID - private val SEP = ":" // message field separator + private val SEP = ":" + // message field separator private val messageIdLabel = "MessageID" private val threadIdLabel = "ThreadID" private val topicLabel = "Topic" @@ -240,8 +241,8 @@ object ProducerPerformance extends Logging { } private def generateProducerData(topic: String, messageId: Long): Array[Byte] = { - val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize) - if (config.seqIdMode) { + val msgSize = if(config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize) + if(config.seqIdMode) { val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId generateMessageWithSeqId(topic, seqId, msgSize) } else { @@ -263,7 +264,7 @@ object ProducerPerformance extends Logging { producer.send(topic, BigInteger.valueOf(i).toByteArray, message) bytesSent += message.size nSends += 1 - if (config.messageSendGapMs > 0) + if(config.messageSendGapMs > 0) Thread.sleep(config.messageSendGapMs) }) } catch { @@ -281,4 +282,5 @@ object ProducerPerformance extends Logging { allDone.countDown() } } + } diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 43fe852..b40c52e 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,12 +18,12 @@ package kafka.tools import joptsimple.OptionParser -import java.util.concurrent.{ Executors, CountDownLatch } +import java.util.concurrent.{Executors, CountDownLatch} import java.util.Properties import kafka.consumer._ -import kafka.utils.{ CommandLineUtils, Logging, ZkUtils } +import kafka.utils.{CommandLineUtils, Logging, ZkUtils} import kafka.api.OffsetRequest -import org.apache.kafka.clients.producer.{ ProducerRecord, KafkaProducer, ProducerConfig } +import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig} object ReplayLogProducer extends Logging { @@ -119,7 +119,9 @@ object ReplayLogProducer extends Logging { val outputTopic = options.valueOf(outputTopicOpt) val reportingInterval = options.valueOf(reportingIntervalOpt).intValue val isSync = options.has(syncOpt) + import scala.collection.JavaConversions._ + val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt)) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) } @@ -133,7 +135,7 @@ object ReplayLogProducer extends Logging { var messageCount: Int = 0 try { val iter = - if (config.numMessages >= 0) + if(config.numMessages >= 0) stream.slice(0, config.numMessages) else stream @@ -141,7 +143,7 @@ object ReplayLogProducer extends Logging { try { val response = producer.send(new ProducerRecord(config.outputTopic, messageAndMetadata.key(), messageAndMetadata.message())) - if (config.isSync) { + if(config.isSync) { response.get() } messageCount += 1 @@ -163,4 +165,5 @@ object ReplayLogProducer extends Logging { } } + } diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 7b210df..bfb739e 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -19,17 +19,17 @@ package kafka.tools import joptsimple.OptionParser import kafka.cluster.Broker -import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet} +import kafka.message.{ MessageSet, MessageAndOffset, ByteBufferMessageSet } import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference import kafka.client.ClientUtils -import java.util.regex.{PatternSyntaxException, Pattern} +import java.util.regex.{ PatternSyntaxException, Pattern } import kafka.api._ import java.text.SimpleDateFormat import java.util.Date -import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.common.{ ErrorMapping, TopicAndPartition } import kafka.utils._ -import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer} +import kafka.consumer.{ ConsumerConfig, Whitelist, SimpleConsumer } /** * For verifying the consistency among replicas. @@ -53,7 +53,7 @@ import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer} */ object ReplicaVerificationTool extends Logging { - val clientId= "replicaVerificationTool" + val clientId = "replicaVerificationTool" val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS" val dateFormat = new SimpleDateFormat(dateFormatString) @@ -64,41 +64,41 @@ object ReplicaVerificationTool extends Logging { def main(args: Array[String]): Unit = { val parser = new OptionParser val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") - .withRequiredArg - .describedAs("hostname:port,...,hostname:port") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("hostname:port,...,hostname:port") + .ofType(classOf[String]) val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.") - .withRequiredArg - .describedAs("bytes") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(ConsumerConfig.FetchSize) + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(ConsumerConfig.FetchSize) val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1000) val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.") - .withRequiredArg - .describedAs("Java regex (String)") - .ofType(classOf[String]) - .defaultsTo(".*") + .withRequiredArg + .describedAs("Java regex (String)") + .ofType(classOf[String]) + .defaultsTo(".*") val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.") - .withRequiredArg - .describedAs("timestamp/-1(latest)/-2(earliest)") - .ofType(classOf[java.lang.Long]) - .defaultsTo(-1L) + .withRequiredArg + .describedAs("timestamp/-1(latest)/-2(earliest)") + .ofType(classOf[java.lang.Long]) + .defaultsTo(-1L) val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Long]) - .defaultsTo(30 * 1000L) - - if(args.length == 0) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Long]) + .defaultsTo(30 * 1000L) + + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") - val options = parser.parse(args : _*) + val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) - + ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) val regex = options.valueOf(topicWhiteListOpt) @@ -106,8 +106,7 @@ object ReplicaVerificationTool extends Logging { try { Pattern.compile(regex) - } - catch { + } catch { case e: PatternSyntaxException => throw new RuntimeException(regex + " is an invalid regex.") } @@ -116,66 +115,64 @@ object ReplicaVerificationTool extends Logging { val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue val reportInterval = options.valueOf(reportIntervalOpt).longValue - - + // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( - topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) - true - else - false - ) + topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) + true + else + false) val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap( topicMetadataResponse => topicMetadataResponse.partitionsMetadata.flatMap( partitionMetadata => partitionMetadata.replicas.map(broker => - TopicPartitionReplica(topic = topicMetadataResponse.topic, partitionId = partitionMetadata.partitionId, replicaId = broker.id)) - ) - ) + TopicPartitionReplica(topic = topicMetadataResponse.topic, partitionId = partitionMetadata.partitionId, replicaId = broker.id)))) debug("Selected topic partitions: " + topicPartitionReplicaList) val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId) - .map { case (brokerId, partitions) => - brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } } + .map { + case (brokerId, partitions) => + brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } + } debug("Topic partitions per broker: " + topicAndPartitionsPerBroker) val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] = - topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) - .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } + topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) + .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( topicMetadataResponse => topicMetadataResponse.partitionsMetadata.map( partitionMetadata => - (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)) - ).groupBy(_._2) - .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { - case(topicAndPartition, leaderId) => topicAndPartition }) + (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id))).groupBy(_._2) + .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { + case (topicAndPartition, leaderId) => topicAndPartition + }) debug("Leaders per broker: " + leadersPerBroker) val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, - leadersPerBroker, - topicAndPartitionsPerBroker.size, - brokerMap, - initialOffsetTime, - reportInterval) + leadersPerBroker, + topicAndPartitionsPerBroker.size, + brokerMap, + initialOffsetTime, + reportInterval) // create all replica fetcher threads val verificationBrokerId = topicAndPartitionsPerBroker.head._1 val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map { case (brokerId, topicAndPartitions) => new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId, - sourceBroker = brokerMap(brokerId), - topicAndPartitions = topicAndPartitions, - replicaBuffer = replicaBuffer, - socketTimeout = 30000, - socketBufferSize = 256000, - fetchSize = fetchSize, - maxWait = maxWaitMs, - minBytes = 1, - doVerification = (brokerId == verificationBrokerId)) + sourceBroker = brokerMap(brokerId), + topicAndPartitions = topicAndPartitions, + replicaBuffer = replicaBuffer, + socketTimeout = 30000, + socketBufferSize = 256000, + fetchSize = fetchSize, + maxWait = maxWaitMs, + minBytes = 1, + doVerification = (brokerId == verificationBrokerId)) } Runtime.getRuntime.addShutdownHook(new Thread() { @@ -190,18 +187,18 @@ object ReplicaVerificationTool extends Logging { } } -private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) +private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) private case class ReplicaAndMessageIterator(replicaId: Int, iterator: Iterator[MessageAndOffset]) private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long) private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int], - leadersPerBroker: Map[Int, Seq[TopicAndPartition]], - expectedNumFetchers: Int, - brokerMap: Map[Int, Broker], - initialOffsetTime: Long, - reportInterval: Long) extends Logging { + leadersPerBroker: Map[Int, Seq[TopicAndPartition]], + expectedNumFetchers: Int, + brokerMap: Map[Int, Broker], + initialOffsetTime: Long, + reportInterval: Long) extends Logging { private val fetchOffsetMap = new Pool[TopicAndPartition, Long] private val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]] private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers)) @@ -245,7 +242,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa val offsetRequest = OffsetRequest(initialOffsetMap) val offsetResponse = consumer.getOffsetsBefore(offsetRequest) assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse)) - offsetResponse.partitionErrorAndOffsets.foreach{ + offsetResponse.partitionErrorAndOffsets.foreach { case (topicAndPartition, partitionOffsetResponse) => fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head) } @@ -266,18 +263,19 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) { debug("Verifying " + topicAndPartition) assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition), - "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " - + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") + "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " + + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") val messageIteratorMap = fetchResponsePerReplica.map { - case(replicaId, fetchResponse) => - replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator} + case (replicaId, fetchResponse) => + replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator + } val maxHw = fetchResponsePerReplica.values.map(_.hw).max // Iterate one message at a time from every replica, until high watermark is reached. var isMessageInAllReplicas = true while (isMessageInAllReplicas) { var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None - for ( (replicaId, messageIterator) <- messageIteratorMap) { + for ((replicaId, messageIterator) <- messageIteratorMap) { try { if (messageIterator.hasNext) { val messageAndOffset = messageIterator.next() @@ -289,7 +287,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa messageInfoFromFirstReplicaOpt match { case None => messageInfoFromFirstReplicaOpt = Some( - MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum)) + MessageInfo(replicaId, messageAndOffset.offset, messageAndOffset.nextOffset, messageAndOffset.message.checksum)) case Some(messageInfoFromFirstReplica) => if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) { println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition @@ -310,14 +308,14 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } catch { case t: Throwable => throw new RuntimeException("Error in processing replica %d in partition %s at offset %d." - .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) + .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) } } if (isMessageInAllReplicas) { val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset fetchOffsetMap.put(topicAndPartition, nextOffset) debug(expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas match at offset " + - nextOffset + " for " + topicAndPartition) + nextOffset + " for " + topicAndPartition) } } if (maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) { @@ -338,15 +336,15 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartitions: Iterable[TopicAndPartition], - replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) + replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, + fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) extends ShutdownableThread(name) { val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, ReplicaVerificationTool.clientId) val fetchRequestBuilder = new FetchRequestBuilder(). - clientId(ReplicaVerificationTool.clientId). - replicaId(Request.DebuggingConsumerId). - maxWait(maxWait). - minBytes(minBytes) + clientId(ReplicaVerificationTool.clientId). + replicaId(Request.DebuggingConsumerId). + maxWait(maxWait). + minBytes(minBytes) override def doWork() { @@ -371,7 +369,7 @@ private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartiti if (response != null) { response.data.foreach { - case(topicAndPartition, partitionData) => + case (topicAndPartition, partitionData) => replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData) } } else { diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 223d1b3..1ba314d 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,7 +21,7 @@ import joptsimple._ import kafka.utils._ import kafka.consumer._ import kafka.client.ClientUtils -import kafka.api.{ OffsetRequest, FetchRequestBuilder, Request } +import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} import kafka.cluster.Broker import scala.collection.JavaConversions._ import kafka.common.TopicAndPartition @@ -94,7 +94,7 @@ object SimpleConsumerShell extends Logging { val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages") - if (args.length == 0) + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.") val options = parser.parse(args: _*) @@ -111,8 +111,8 @@ object SimpleConsumerShell extends Logging { val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() val maxMessages = options.valueOf(maxMessagesOpt).intValue - val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false - val printOffsets = if (options.has(printOffsetOpt)) true else false + val skipMessageOnError = if(options.has(skipMessageOnErrorOpt)) true else false + val printOffsets = if(options.has(printOffsetOpt)) true else false val noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt) val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) @@ -128,7 +128,7 @@ object SimpleConsumerShell extends Logging { info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata - if (topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { + if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata)) System.exit(1) } @@ -136,7 +136,7 @@ object SimpleConsumerShell extends Logging { // validating partition id val partitionsMetadata = topicsMetadata(0).partitionsMetadata val partitionMetadataOpt = partitionsMetadata.find(p => p.partitionId == partitionId) - if (!partitionMetadataOpt.isDefined) { + if(!partitionMetadataOpt.isDefined) { System.err.println("Error: partition %d does not exist for topic %s".format(partitionId, topic)) System.exit(1) } @@ -144,16 +144,16 @@ object SimpleConsumerShell extends Logging { // validating replica id and initializing target broker var fetchTargetBroker: Broker = null var replicaOpt: Option[Broker] = null - if (replicaId == UseLeaderReplica) { + if(replicaId == UseLeaderReplica) { replicaOpt = partitionMetadataOpt.get.leader - if (!replicaOpt.isDefined) { + if(!replicaOpt.isDefined) { System.err.println("Error: user speicifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(replicaId, topic, partitionId)) System.exit(1) } } else { val replicasForPartition = partitionMetadataOpt.get.replicas replicaOpt = replicasForPartition.find(r => r.id == replicaId) - if (!replicaOpt.isDefined) { + if(!replicaOpt.isDefined) { System.err.println("Error: replica %d does not exist for partition (%s, %d)".format(replicaId, topic, partitionId)) System.exit(1) } @@ -161,11 +161,11 @@ object SimpleConsumerShell extends Logging { fetchTargetBroker = replicaOpt.get // initializing starting offset - if (startingOffset < OffsetRequest.EarliestTime) { + if(startingOffset < OffsetRequest.EarliestTime) { System.err.println("Invalid starting offset: %d".format(startingOffset)) System.exit(1) } - if (startingOffset < 0) { + if(startingOffset < 0) { val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout, ConsumerConfig.SocketBufferSize, clientId) try { @@ -176,7 +176,7 @@ object SimpleConsumerShell extends Logging { System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t)) System.exit(1) } finally { - if (simpleConsumer != null) + if(simpleConsumer != null) simpleConsumer.close() } } @@ -185,7 +185,7 @@ object SimpleConsumerShell extends Logging { val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(formatterArgs) - val replicaString = if (replicaId > 0) "leader" else "replica" + val replicaString = if(replicaId > 0) "leader" else "replica" info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]" .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset)) val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64 * 1024, clientId) @@ -200,7 +200,7 @@ object SimpleConsumerShell extends Logging { .build() val fetchResponse = simpleConsumer.fetch(fetchRequest) val messageSet = fetchResponse.messageSet(topic, partitionId) - if (messageSet.validBytes <= 0 && noWaitAtEndOfLog) { + if(messageSet.validBytes <= 0 && noWaitAtEndOfLog) { println("Terminating. Reached the end of partition (%s, %d) at offset %d".format(topic, partitionId, offset)) return } @@ -208,20 +208,20 @@ object SimpleConsumerShell extends Logging { for (messageAndOffset <- messageSet if (numMessagesConsumed < maxMessages)) { try { offset = messageAndOffset.nextOffset - if (printOffsets) + if(printOffsets) System.out.println("next offset = " + offset) val message = messageAndOffset.message - val key = if (message.hasKey) Utils.readBytes(message.key) else null - formatter.writeTo(key, if (message.isNull) null else Utils.readBytes(message.payload), System.out) + val key = if(message.hasKey) Utils.readBytes(message.key) else null + formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out) numMessagesConsumed += 1 } catch { case e: Throwable => - if (skipMessageOnError) + if(skipMessageOnError) error("Error processing message, skipping this message: ", e) else throw e } - if (System.out.checkError()) { + if(System.out.checkError()) { // This means no one is listening to our output stream any more, time to shutdown System.err.println("Unable to write to standard out, closing consumer.") formatter.close() diff --git a/core/src/main/scala/kafka/tools/ToolsUtils.scala b/core/src/main/scala/kafka/tools/ToolsUtils.scala index 6cebb67..c680496 100644 --- a/core/src/main/scala/kafka/tools/ToolsUtils.scala +++ b/core/src/main/scala/kafka/tools/ToolsUtils.scala @@ -22,7 +22,7 @@ object ToolsUtils extends App { } def validatePortAndDie(parser: OptionParser, hostPort: String) { - if (!validate(hostPort)) + if(!validate(hostPort)) CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092,check usage below for more \n ") } -- 1.8.5.2 (Apple Git-48) From cb4d070f716052ef07be3fdb838512149a7ede69 Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Mon, 22 Sep 2014 22:04:20 -0600 Subject: [PATCH 8/8] KAFKA-328 Write unit test for kafka server startup and shutdown API --- .../unit/kafka/server/ServerShutdownTest.scala | 31 ++++++++++++++++------ 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index ab60e9b..e5230d2 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -53,17 +53,17 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server)) // send some messages - producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*) + producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)): _*) // do a clean shutdown and check that offset checkpoint file exists server.shutdown() - for(logDir <- config.logDirs) { + for (logDir <- config.logDirs) { val OffsetCheckpointFile = new File(logDir, server.logManager.RecoveryPointCheckpointFile) assertTrue(OffsetCheckpointFile.exists) assertTrue(OffsetCheckpointFile.length() > 0) } producer.close() - + /* now restart the server and check that the written data is still readable and everything still works */ server = new KafkaServer(config) server.startup() @@ -74,10 +74,10 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[IntEncoder].getName) - val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") + val consumer = new SimpleConsumer(host, port, 1000000, 64 * 1024, "") var fetchedMessage: ByteBufferMessageSet = null - while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + while (fetchedMessage == null || fetchedMessage.validBytes == 0) { val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build()) fetchedMessage = fetched.messageSet(topic, 0) } @@ -85,10 +85,10 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val newOffset = fetchedMessage.last.nextOffset // send some more messages - producer.send(sent2.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*) + producer.send(sent2.map(m => new KeyedMessage[Int, String](topic, 0, m)): _*) fetchedMessage = null - while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + while (fetchedMessage == null || fetchedMessage.validBytes == 0) { val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build()) fetchedMessage = fetched.messageSet(topic, 0) } @@ -119,4 +119,19 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { .map(_.asInstanceOf[Thread]) .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) } + + @Test + def testServerStartupConsecutively() { + var server = new KafkaServer(config) + server.startup() + try { + server.startup() + } + catch { + case ex => { + assertTrue(ex.getMessage().contains("This scheduler has already been started!")) + } + } + server.shutdown() + } } -- 1.8.5.2 (Apple Git-48)