Index: core/src/main/scala/kafka/tools/ConsoleProducer.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/tools/ConsoleProducer.scala (date 1411445060000) +++ core/src/main/scala/kafka/tools/ConsoleProducer.scala (date 1411618627000) @@ -229,7 +229,7 @@ val useNewProducer = options.has(useNewProducerOpt) val topic = options.valueOf(topicOpt) - ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortOrDie(parser, options.valueOf(brokerListOpt)) val brokerList = options.valueOf(brokerListOpt) @@ -282,8 +282,8 @@ override def readMessage() = { lineNumber += 1 (reader.readLine(), parseKey) match { - case (null, _) => null + case(null, _) => null - case (line, true) => + case(line, true) => line.indexOf(keySeparator) match { case -1 => if(ignoreError) @@ -295,7 +295,7 @@ line.substring(0, n).getBytes, (if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes()) } - case (line, false) => + case(line, false) => new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) } } Index: core/src/main/scala/kafka/tools/GetOffsetShell.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/tools/GetOffsetShell.scala (date 1411445060000) +++ core/src/main/scala/kafka/tools/GetOffsetShell.scala (date 1411618627000) @@ -64,7 +64,7 @@ CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt) - ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortOrDie(parser, options.valueOf(brokerListOpt)) val clientId = "GetOffsetShell" Index: core/src/main/scala/kafka/tools/ProducerPerformance.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/tools/ProducerPerformance.scala (date 1411445060000) +++ core/src/main/scala/kafka/tools/ProducerPerformance.scala (date 1411618627000) @@ -125,7 +125,7 @@ val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt) - ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortOrDie(parser, options.valueOf(brokerListOpt)) val topicsStr = options.valueOf(topicsOpt) val topics = topicsStr.split(",") @@ -136,22 +136,21 @@ val brokerList = options.valueOf(brokerListOpt) val messageSize = options.valueOf(messageSizeOpt).intValue - var isFixedSize = !options.has(varyMessageSizeOpt) - var isSync = options.has(syncOpt) - var batchSize = options.valueOf(batchSizeOpt).intValue - var numThreads = options.valueOf(numThreadsOpt).intValue val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) val seqIdMode = options.has(initialMessageIdOpt) - var initialMessageId: Int = 0 - if(seqIdMode) - initialMessageId = options.valueOf(initialMessageIdOpt).intValue() val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue() val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue() val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue() val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue() val useNewProducer = options.has(useNewProducerOpt) - + if(seqIdMode) + initialMessageId = options.valueOf(initialMessageIdOpt).intValue() val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) + val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue() + var isFixedSize = !options.has(varyMessageSizeOpt) + var isSync = options.has(syncOpt) + var batchSize = options.valueOf(batchSizeOpt).intValue + var numThreads = options.valueOf(numThreadsOpt).intValue if(csvMetricsReporterEnabled) { val props = new Properties() @@ -165,8 +164,7 @@ val verifiableProps = new VerifiableProperties(props) KafkaMetricsReporter.startReporters(verifiableProps) } - - val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue() + var initialMessageId: Int = 0 } class ProducerThread(val threadId: Int, @@ -219,44 +217,13 @@ private val topicLabel = "Topic" private var leftPaddedSeqId: String = "" - private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = { - // Each thread gets a unique range of sequential no. for its ids. - // Eg. 1000 msg in 10 threads => 100 msg per thread - // thread 0 IDs : 0 ~ 99 - // thread 1 IDs : 100 ~ 199 - // thread 2 IDs : 200 ~ 299 - // . . . - leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId)) - - val msgHeader = topicLabel + SEP + - topic + SEP + - threadIdLabel + SEP + - threadId + SEP + - messageIdLabel + SEP + - leftPaddedSeqId + SEP - - val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x') - debug(seqMsgString) - return seqMsgString.getBytes() - } - - private def generateProducerData(topic: String, messageId: Long): Array[Byte] = { - val msgSize = if(config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize) - if(config.seqIdMode) { - val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId - generateMessageWithSeqId(topic, seqId, msgSize) - } else { - new Array[Byte](msgSize) - } - } - override def run { var bytesSent = 0L var nSends = 0 var i: Long = 0L var message: Array[Byte] = null - while (i < messagesPerThread) { + while(i < messagesPerThread) { try { config.topics.foreach( topic => { @@ -280,6 +247,37 @@ totalBytesSent.addAndGet(bytesSent) totalMessagesSent.addAndGet(nSends) allDone.countDown() + } + + private def generateProducerData(topic: String, messageId: Long): Array[Byte] = { + val msgSize = if(config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize) + if(config.seqIdMode) { + val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId + generateMessageWithSeqId(topic, seqId, msgSize) + } else { + new Array[Byte](msgSize) + } + } + + private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = { + // Each thread gets a unique range of sequential no. for its ids. + // Eg. 1000 msg in 10 threads => 100 msg per thread + // thread 0 IDs : 0 ~ 99 + // thread 1 IDs : 100 ~ 199 + // thread 2 IDs : 200 ~ 299 + // . . . + leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId)) + + val msgHeader = topicLabel + SEP + + topic + SEP + + threadIdLabel + SEP + + threadId + SEP + + messageIdLabel + SEP + + leftPaddedSeqId + SEP + + val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x') + debug(seqMsgString) + return seqMsgString.getBytes() } } Index: core/src/main/scala/kafka/tools/ReplayLogProducer.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/tools/ReplayLogProducer.scala (date 1411445060000) +++ core/src/main/scala/kafka/tools/ReplayLogProducer.scala (date 1411618627000) @@ -108,7 +108,7 @@ CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, inputTopicOpt) - ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortOrDie(parser, options.valueOf(brokerListOpt)) val zkConnect = options.valueOf(zkConnectOpt) val brokerList = options.valueOf(brokerListOpt) Index: core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala (date 1411445060000) +++ core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala (date 1411618627000) @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,33 +19,33 @@ import joptsimple.OptionParser import kafka.cluster.Broker -import kafka.message.{ MessageSet, MessageAndOffset, ByteBufferMessageSet } +import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet} import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference import kafka.client.ClientUtils -import java.util.regex.{ PatternSyntaxException, Pattern } +import java.util.regex.{PatternSyntaxException, Pattern} import kafka.api._ import java.text.SimpleDateFormat import java.util.Date -import kafka.common.{ ErrorMapping, TopicAndPartition } +import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.utils._ -import kafka.consumer.{ ConsumerConfig, Whitelist, SimpleConsumer } +import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer} /** - * For verifying the consistency among replicas. + * For verifying the consistency among replicas. * - * 1. start a fetcher on every broker. + * 1. start a fetcher on every broker. - * 2. each fetcher does the following + * 2. each fetcher does the following - * 2.1 issues fetch request + * 2.1 issues fetch request - * 2.2 puts the fetched result in a shared buffer + * 2.2 puts the fetched result in a shared buffer - * 2.3 waits for all other fetchers to finish step 2.2 + * 2.3 waits for all other fetchers to finish step 2.2 - * 2.4 one of the fetchers verifies the consistency of fetched results among replicas + * 2.4 one of the fetchers verifies the consistency of fetched results among replicas * - * The consistency verification is up to the high watermark. The tool reports the + * The consistency verification is up to the high watermark. The tool reports the - * max lag between the verified offset and the high watermark among all partitions. + * max lag between the verified offset and the high watermark among all partitions. * - * If a broker goes down, the verification of the partitions on that broker is delayed + * If a broker goes down, the verification of the partitions on that broker is delayed - * until the broker is up again. + * until the broker is up again. * * Caveats: * 1. The tools needs all brokers to be up at startup time. @@ -93,13 +93,13 @@ .ofType(classOf[java.lang.Long]) .defaultsTo(30 * 1000L) - if (args.length == 0) + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) - ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortOrDie(parser, options.valueOf(brokerListOpt)) val regex = options.valueOf(topicWhiteListOpt) val topicWhiteListFiler = new Whitelist(regex) @@ -122,7 +122,7 @@ val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( - topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) + topicMetadata => if(topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) true else false) @@ -135,13 +135,13 @@ debug("Selected topic partitions: " + topicPartitionReplicaList) val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId) .map { - case (brokerId, partitions) => + case(brokerId, partitions) => - brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } + brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId)} - } + } debug("Topic partitions per broker: " + topicAndPartitionsPerBroker) val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] = topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) - .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } + .map { case(topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size} debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( topicMetadataResponse => @@ -149,8 +149,8 @@ partitionMetadata => (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id))).groupBy(_._2) .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { - case (topicAndPartition, leaderId) => topicAndPartition + case(topicAndPartition, leaderId) => topicAndPartition - }) + }) debug("Leaders per broker: " + leadersPerBroker) val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, @@ -162,7 +162,7 @@ // create all replica fetcher threads val verificationBrokerId = topicAndPartitionsPerBroker.head._1 val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map { - case (brokerId, topicAndPartitions) => + case(brokerId, topicAndPartitions) => new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId, sourceBroker = brokerMap(brokerId), topicAndPartitions = topicAndPartitions, @@ -194,11 +194,11 @@ private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long) private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int], - leadersPerBroker: Map[Int, Seq[TopicAndPartition]], - expectedNumFetchers: Int, - brokerMap: Map[Int, Broker], - initialOffsetTime: Long, - reportInterval: Long) extends Logging { + leadersPerBroker: Map[Int, Seq[TopicAndPartition]], + expectedNumFetchers: Int, + brokerMap: Map[Int, Broker], + initialOffsetTime: Long, + reportInterval: Long) extends Logging { private val fetchOffsetMap = new Pool[TopicAndPartition, Long] private val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]] private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers)) @@ -229,7 +229,7 @@ private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = { offsetResponse.partitionErrorAndOffsets.filter { - case (topicAndPartition, partitionOffsetsResponse) => partitionOffsetsResponse.error != ErrorMapping.NoError + case(topicAndPartition, partitionOffsetsResponse) => partitionOffsetsResponse.error != ErrorMapping.NoError }.mkString } @@ -243,7 +243,7 @@ val offsetResponse = consumer.getOffsetsBefore(offsetRequest) assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse)) offsetResponse.partitionErrorAndOffsets.foreach { - case (topicAndPartition, partitionOffsetResponse) => + case(topicAndPartition, partitionOffsetResponse) => fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head) } } @@ -266,22 +266,22 @@ "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") val messageIteratorMap = fetchResponsePerReplica.map { - case (replicaId, fetchResponse) => + case(replicaId, fetchResponse) => replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator } val maxHw = fetchResponsePerReplica.values.map(_.hw).max // Iterate one message at a time from every replica, until high watermark is reached. var isMessageInAllReplicas = true - while (isMessageInAllReplicas) { + while(isMessageInAllReplicas) { var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None for ((replicaId, messageIterator) <- messageIteratorMap) { try { - if (messageIterator.hasNext) { + if(messageIterator.hasNext) { val messageAndOffset = messageIterator.next() // only verify up to the high watermark - if (messageAndOffset.offset >= fetchResponsePerReplica.get(replicaId).hw) + if(messageAndOffset.offset >= fetchResponsePerReplica.get(replicaId).hw) isMessageInAllReplicas = false else { messageInfoFromFirstReplicaOpt match { @@ -289,14 +289,14 @@ messageInfoFromFirstReplicaOpt = Some( MessageInfo(replicaId, messageAndOffset.offset, messageAndOffset.nextOffset, messageAndOffset.message.checksum)) case Some(messageInfoFromFirstReplica) => - if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) { + if(messageInfoFromFirstReplica.offset != messageAndOffset.offset) { println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset " + messageInfoFromFirstReplica.offset + " doesn't match replica " + replicaId + "'s offset " + messageAndOffset.offset) System.exit(1) } - if (messageInfoFromFirstReplica.checksum != messageAndOffset.message.checksum) + if(messageInfoFromFirstReplica.checksum != messageAndOffset.message.checksum) println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition + " has unmatched checksum at offset " + messageAndOffset.offset + "; replica " + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum @@ -311,14 +311,14 @@ .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) } } - if (isMessageInAllReplicas) { + if(isMessageInAllReplicas) { val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset fetchOffsetMap.put(topicAndPartition, nextOffset) debug(expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas match at offset " + nextOffset + " for " + topicAndPartition) } } - if (maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) { + if(maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) { offsetWithMaxLag = fetchOffsetMap.get(topicAndPartition) maxLag = maxHw - offsetWithMaxLag maxLagTopicAndPartition = topicAndPartition @@ -326,7 +326,7 @@ fetchResponsePerReplica.clear() } val currentTimeMs = SystemTime.milliseconds - if (currentTimeMs - lastReportTime > reportInterval) { + if(currentTimeMs - lastReportTime > reportInterval) { println(ReplicaVerificationTool.dateFormat.format(new Date(currentTimeMs)) + ": max lag is " + maxLag + " for partition " + maxLagTopicAndPartition + " at offset " + offsetWithMaxLag + " among " + messageSetCache.size + " paritions") @@ -336,8 +336,8 @@ } private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartitions: Iterable[TopicAndPartition], - replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) + replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, + fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) extends ShutdownableThread(name) { val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, ReplicaVerificationTool.clientId) val fetchRequestBuilder = new FetchRequestBuilder(). @@ -363,13 +363,13 @@ response = simpleConsumer.fetch(fetchRequest) } catch { case t: Throwable => - if (!isRunning.get) + if(!isRunning.get) throw t } - if (response != null) { + if(response != null) { response.data.foreach { - case (topicAndPartition, partitionData) => + case(topicAndPartition, partitionData) => replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData) } } else { @@ -385,7 +385,7 @@ debug("Ready for verification") // one of the fetchers will do the verification - if (doVerification) { + if(doVerification) { debug("Do verification") replicaBuffer.verifyCheckSum() replicaBuffer.createNewFetcherBarrier() \ No newline at end of file Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (date 1411445060000) +++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (date 1411618627000) @@ -100,7 +100,7 @@ val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, partitionIdOpt) - ToolsUtils.validatePortAndDie(parser, options.valueOf(brokerListOpt)) + ToolsUtils.validatePortOrDie(parser, options.valueOf(brokerListOpt)) val topic = options.valueOf(topicOpt) val partitionId = options.valueOf(partitionIdOpt).intValue() Index: core/src/main/scala/kafka/tools/ToolsUtils.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/tools/ToolsUtils.scala (date 1411445060000) +++ core/src/main/scala/kafka/tools/ToolsUtils.scala (date 1411618627000) @@ -6,7 +6,9 @@ object ToolsUtils extends App { - private def validate(hostPort: String) = { + + + def validatePortOrDie(parser: OptionParser, hostPort: String) { val regex = new Regex(":[0-9]+$") val hostPorts: Array[String] = if(hostPort.contains(',')) @@ -18,11 +20,8 @@ regex.findAllMatchIn(hostPortData).size > 0 } - !(validHostPort.isEmpty) && validHostPort.size == hostPorts.length - } - - def validatePortAndDie(parser: OptionParser, hostPort: String) { - if(!validate(hostPort)) + val isValid = !(validHostPort.isEmpty) && validHostPort.size == hostPorts.length + if(!isValid) CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092,check usage below for more \n ") } \ No newline at end of file