From c24d314b4b53b11eb641df214b07a9e312fdd498 Mon Sep 17 00:00:00 2001 From: raulcf Date: Tue, 19 Aug 2014 10:39:16 -0700 Subject: [PATCH] KAFKA-1527; simpleConsumer after review and updates to simpleConsumerPerformance --- bin/kafka-simple-consumer-perf-test.sh | 20 ++ .../main/scala/kafka/consumer/ConsumerConfig.scala | 4 + .../main/scala/kafka/consumer/SimpleConsumer.scala | 236 ++++++++++++++++++++- .../kafka/tools/SimpleConsumerPerformance.scala | 180 ++++++++++++++-- 4 files changed, 424 insertions(+), 16 deletions(-) create mode 100755 bin/kafka-simple-consumer-perf-test.sh diff --git a/bin/kafka-simple-consumer-perf-test.sh b/bin/kafka-simple-consumer-perf-test.sh new file mode 100755 index 0000000..91abacc --- /dev/null +++ b/bin/kafka-simple-consumer-perf-test.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then + export KAFKA_HEAP_OPTS="-Xmx512M" +fi +exec $(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerPerformance $@ diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 1cf2f62..1d41776 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -51,6 +51,7 @@ object ConsumerConfig extends Config { val ExcludeInternalTopics = true val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" val DefaultClientId = "" + val ReadCommitted = false def validate(config: ConsumerConfig) { validateClientId(config.clientId) @@ -175,6 +176,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */ val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics) + /** Whether consumer should only return committed messages **/ + val readCommitted = props.getBoolean("read.committed", ReadCommitted) + validate(this) } diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 0e64632..16592b7 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -18,10 +18,14 @@ package kafka.consumer import kafka.api._ +import kafka.message.{Message, ByteBufferMessageSet} import kafka.network._ import kafka.utils._ import kafka.common.{ErrorMapping, TopicAndPartition} +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + /** * A consumer of kafka messages */ @@ -39,6 +43,15 @@ class SimpleConsumer(val host: String, private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId) private var isClosed = false + /** Keeps pending transaction data in memory **/ + private val pendingTxData = new mutable.HashMap[(String, Int, Int), ListBuffer[Message]] + /** Maintains a list of transactions that can be consumed in order **/ + private val completedTx = new mutable.HashMap[(String, Int), mutable.Queue[ListBuffer[Message]]] + /** Keeps non-transactional messages per topic-partition **/ + private val nonTxMessages = new mutable.HashMap[(String, Int), ListBuffer[Message]] + /** Keeps last consumed offset per topic-partition **/ + private val lastConsumedOffset = new mutable.HashMap[(String, Int), Long]() + private def connect(): BlockingChannel = { close blockingChannel.connect() @@ -98,7 +111,7 @@ class SimpleConsumer(val host: String, } /** - * Fetch a set of messages from a topic. + * Fetch a set of messages from a topic * * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. * @return a set of fetched messages @@ -120,6 +133,227 @@ class SimpleConsumer(val host: String, } /** + * Simply models a topic partition + * @param topic + * @param partition + */ + case class TopicPartition(topic: String, partition: Int) + + /** + * Models a collection of messages and the latest offset corresponding to those messages, ie latest offset read from the broker + * @param messages + * @param offset + */ + case class MessagesOffset(messages: Seq[Message], offset: Long) + + /** + * Wraps the info required to build a fetch request + * @param topic + * @param partition + * @param offset + * @param fetchSize + */ + case class RequestData(topic: String, partition: Int, offset: Long, fetchSize: Int) + + /** + * Fetch messages from a topic-partition. It will return all available messages (tx and non-tx) per topic-partition. Note that the limited support for + * a single partition is an implementation artifact. In general, it would be easy to read a complete transaction, i.e. regardless the number of partitions it spans. + * This is possible because each commit message will contain the partitions involved in the committed transaction. In this way, when the consumer reads a commit message, + * it only needs to keep the number of partitions involved and make sure it reads a commit in each one of those partitions. Only then, the transaction is returned to + * the application. This functionality is not implemented in this "temporal" consumer, and would rather be included in the new Consumer. + * @param request + * @return + */ + def fetchTx(request: FetchRequest): Map[TopicPartition, MessagesOffset] = { + var response: Receive = null + var reqBuilder = new FetchRequestBuilder() + var req = reqBuilder.build() + val reqData = new ListBuffer[RequestData] + val fetchSize = request.requestInfo.values.head.fetchSize + val availableData: mutable.Map[TopicPartition, MessagesOffset] = new mutable.HashMap[TopicPartition, MessagesOffset] + val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestTimer + val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer + + var moreDataAvailable = true + var continueFetching = true + do { + aggregateTimer.time { + specificTimer.time { + response = sendRequest(request) + } + } + val fetchResponse = FetchResponse.readFrom(response.buffer) + val fetchedSize = fetchResponse.sizeInBytes + fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestSizeHist.update(fetchedSize) + fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize) + + // iterate over byteBufferMessageSet and inspect messages + for (tp <- fetchResponse.data.keys) { + val topic = tp.topic + val partition = tp.partition + var offset: Long = 0 + val messageSet = fetchResponse.messageSet(topic, partition) + moreDataAvailable = if(messageSet.size > 0) true else false // all topic-partition has to say NO more data to quit this + for (msg <- messageSet) { + offset = msg.nextOffset + maybeUpdateLastConsumedOffset(offset, topic, partition) + val txId = msg.message.txId + trace("Message of transaction: " + txId) + if (txId != -1) { + // handle tx message + val txControl = getTxControl(msg.message.attributes) + trace("txControl: " + txControl) + txControl match { + case TxControl.Abort => + abortTransaction(topic, partition, txId) + case TxControl.Commit => + commitTransaction(topic, partition, txId) + case TxControl.Ongoing => + dataOf(topic, partition, txId, msg.message) + } + } + else { + // handle non-tx message + if(nonTxMessages.contains((topic, partition))) { + nonTxMessages((topic, partition)).append(msg.message) + } else { + val buffer = new ListBuffer[Message]() + buffer.append(msg.message) + nonTxMessages.put((topic, partition), buffer) + } + } + } + // incrementally create a potential next request + reqData.append(RequestData(topic, partition, offset, fetchSize)) + } + + // check if there is data to return to application and delete from state if so + if(completedTx.size > 0 || nonTxMessages.size > 0 || !moreDataAvailable) { + continueFetching = false + // populate with nontx messages + for ((topic, partition) <- nonTxMessages.keys) { + val offset = lastConsumedOffset((topic, partition)) + if (availableData.contains(TopicPartition(topic, partition))) { + val messages = (availableData(TopicPartition(topic, partition)).messages) ++ nonTxMessages((topic, partition)) + availableData.put(TopicPartition(topic, partition), MessagesOffset(messages, offset)) + nonTxMessages.remove((topic, partition)) + } else { + availableData.put(TopicPartition(topic, partition), MessagesOffset(nonTxMessages((topic, partition)), offset)) + nonTxMessages.remove((topic, partition)) + } + } + // and then with tx data + for ((topic, partition) <- completedTx.keys) { + val offset = lastConsumedOffset((topic, partition)) + if (availableData.contains(TopicPartition(topic, partition))) { + var txMsgs = new ListBuffer[Message]() + for(q <- completedTx((topic, partition))) txMsgs = txMsgs ++ q + val messages: Seq[Message] = (availableData(TopicPartition(topic, partition)).messages) ++ txMsgs + availableData.put(TopicPartition(topic, partition), MessagesOffset(messages, offset)) + } else { + var txMsgs =new ListBuffer[Message]() + for(q <- completedTx((topic, partition))) + txMsgs = txMsgs ++ q + availableData.put(TopicPartition(topic, partition), MessagesOffset(txMsgs, offset)) + } + completedTx.remove((topic, partition)) + } + } + + // create new fetchRequest if we continue fetching + if (continueFetching) { + for (reqData <- reqData) { + reqBuilder = reqBuilder.addFetch(reqData.topic, reqData.partition, reqData.offset, reqData.fetchSize) + } + req = reqBuilder.build() + } + } while (continueFetching) + availableData.toMap + } + + /** + * This function updates latest consumed offset if necessary + * @param offset + * @param topic + * @param partition + * @return + */ + private def maybeUpdateLastConsumedOffset(offset: Long, topic: String, partition: Int) = { + if(lastConsumedOffset.contains((topic, partition))) { + if (offset > lastConsumedOffset((topic, partition))) { + lastConsumedOffset.put((topic, partition), offset) + } + } + } + + object TxControl { + val Ongoing: Short = 0 + val Commit: Short = 3 + val Abort: Short = 6 + } + + /** + * This function returns the txControl flag given the message attributes + * @param attr + * @return + */ + private def getTxControl(attr: Short) = { + // apply mask to attr and get correct txControl + val txControlMask = 0x07 << 3 + val txControlOffset = 3 + ((attr & txControlMask) >> txControlOffset).toShort + } + + /** + * This function will abort a transaction, i.e. clean all associated state + * @param topic + * @param partition + * @param txId + * @return + */ + private def abortTransaction(topic: String, partition: Int, txId: Int) = { + trace("Aborting transaction: " + txId) + pendingTxData.remove((topic, partition, txId)) + } + + /** + * This function will commit a transaction, moving its data to completedTx so that it can be returned to the application + * @param topic + * @param partition + * @param txId + * @return + */ + private def commitTransaction(topic: String, partition: Int, txId: Int) = { + trace("Committing transaction: " + txId) + if(completedTx.contains((topic, partition))) { + completedTx((topic, partition)).enqueue(pendingTxData((topic, partition, txId))) + } else { + val msgs = new mutable.Queue[ListBuffer[Message]]() + msgs.enqueue(pendingTxData((topic, partition, txId))) + completedTx.put((topic, partition), msgs) + } + pendingTxData.remove((topic, partition, txId)) + } + + /** + * This method accumulates data of a transaction + * @param topic + * @param partition + * @param txId + * @param message + * @return + */ + private def dataOf(topic: String, partition: Int, txId: Int, message: Message) = { + if(pendingTxData.contains((topic, partition, txId))) { + pendingTxData((topic, partition, txId)).append(message) + } else { + val buffer = new ListBuffer[Message]() + buffer.append(message) + pendingTxData.put((topic, partition, txId), buffer) + } + } + + /** * Get a list of valid offsets (up to maxSize) before the given time. * @param request a [[kafka.api.OffsetRequest]] object. * @return a [[kafka.api.OffsetResponse]] object. diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala index 7602b8d..fc59db4 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala @@ -19,12 +19,16 @@ package kafka.tools import java.net.URI import java.text.SimpleDateFormat -import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} +import joptsimple.{OptionException, OptionParser, OptionSet} +import kafka.api._ import kafka.consumer.SimpleConsumer +import kafka.message.MessageAndOffset import kafka.utils._ import org.apache.log4j.Logger import kafka.common.TopicAndPartition +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer /** * Performance test for the simple consumer @@ -32,7 +36,7 @@ import kafka.common.TopicAndPartition object SimpleConsumerPerformance { def main(args: Array[String]) { - val logger = Logger.getLogger(getClass) + val log = Logger.getLogger(getClass) val config = new ConsumerPerfConfig(args) if(!config.hideHeader) { @@ -44,6 +48,65 @@ object SimpleConsumerPerformance { val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId) + val messageFormatterClass = config.messageFormatter + val formatterArgs = config.formatterArgs + val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] + formatter.init(formatterArgs) + + if(config.correctnessTest) { + println("Running correctness test:") + + var offset: Long = 0 + // Do metadata request to get all partitions of a given topic + val requests = new mutable.HashMap[Int, FetchRequest]() + val topics = new scala.collection.mutable.ArrayBuffer[String]() + topics += config.topic + val tmr: TopicMetadataRequest = new TopicMetadataRequest(topics, 0) + val topicMetadataResponse = consumer.send(tmr) + val topicMetadata = topicMetadataResponse.topicsMetadata + for (topicM <- topicMetadata) { + val partitions = topicM.partitionsMetadata + for (partition <- partitions) { + val request = new FetchRequestBuilder() + .clientId(config.clientId) + .addFetch(config.topic, partition.partitionId, 0, config.fetchSize) + .build() + requests.put(partition.partitionId, request) + } + } + log.debug("Request Map: "+requests.foreach(println)) + // Read all data per partition + for (partitionId <- requests.keySet) { + log.debug("Reading messages from partitionId: "+partitionId) + var done = false + while (!done) { + val r = requests.get(partitionId) + log.debug("Fetch Request: "+r.get) + var messagesRead = 0 + val fetchTxResponse = consumer.fetchTx(r.get) + val messagesAndLastOffset = fetchTxResponse(consumer.TopicPartition(config.topic, partitionId)) + for (message <- messagesAndLastOffset.messages) { + formatter.writeTo(message.key.array(), message.payload.array(), System.out) + messagesRead += 1 + } + if (messagesRead == 0) + done = true + else { + // we only did one fetch so we find the offset for the first (head) messageset + offset += messagesAndLastOffset.offset + // update the request + val request = new FetchRequestBuilder() + .clientId(config.clientId) + .addFetch(config.topic, partitionId, offset, config.fetchSize) + .build() + requests.put(partitionId, request) + } + } + } + println("DONE!") + System.exit(0) + } + // reset to latest or smallest offset val topicAndPartition = TopicAndPartition(config.topic, config.partition) val request = OffsetRequest(Map( @@ -59,30 +122,62 @@ object SimpleConsumerPerformance { var lastReportTime: Long = startMs var lastBytesRead = 0L var lastMessagesRead = 0L + if(config.readUncommitted) + println("Simple Consumer configured to READ UNCOMMITTED") + else + println("Simple Consumer configured to READ COMMITTED") + while(!done) { // TODO: add in the maxWait and minBytes for performance val request = new FetchRequestBuilder() .clientId(config.clientId) .addFetch(config.topic, config.partition, offset, config.fetchSize) .build() - val fetchResponse = consumer.fetch(request) var messagesRead = 0 var bytesRead = 0 - val messageSet = fetchResponse.messageSet(config.topic, config.partition) - for (message <- messageSet) { - messagesRead += 1 - bytesRead += message.message.payloadSize + if(config.readUncommitted) { + val fetchResponse = consumer.fetch(request) + val messageSet = fetchResponse.messageSet(config.topic, config.partition) + for (message <- messageSet) { + //formatter.writeTo(message.message.key.array(), message.message.payload.array(), System.out) + println(Utils.readString(message.message.payload)) + if(config.measureE2ELatency) { + val emitTime = getEmitTime(message) + val currentTime = System.currentTimeMillis() + val latency = currentTime - emitTime + println("! "+latency) + } + messagesRead += 1 + totalMessagesRead += 1 + bytesRead += message.message.payloadSize + } + if(messagesRead == 0 || totalMessagesRead >= config.numMessages) + done = true + else + // we only did one fetch so we find the offset for the first (head) messageset + offset += messageSet.validBytes + } else if (! config.readUncommitted) { + val fetchTxResponse = consumer.fetchTx(request) + //val (messages, lastConsumedOffset) = fetchTxResponse((config.topic, config.partition)) + val messagesAndLastOffset = fetchTxResponse(consumer.TopicPartition(config.topic, config.partition)) + log.debug("Got " + messagesAndLastOffset.messages.size + " messages from " + config.topic + " - " + config.partition + " with last offset: " + messagesAndLastOffset.offset) + for (message <- messagesAndLastOffset.messages) { + //formatter.writeTo(message.key.array(), message.payload.array(), System.out) + messagesRead += 1 + totalMessagesRead += 1 + bytesRead += message.payloadSize + } + println("messagesRead: "+messagesRead+" totalMessagesRead: "+totalMessagesRead+" config.numMessages: "+config.numMessages) + if (messagesRead == 0 || totalMessagesRead >= config.numMessages) { + done = true + } else { + offset = messagesAndLastOffset.offset + } } - if(messagesRead == 0 || totalMessagesRead > config.numMessages) - done = true - else - // we only did one fetch so we find the offset for the first (head) messageset - offset += messageSet.validBytes - totalBytesRead += bytesRead - totalMessagesRead += messagesRead + //totalMessagesRead += messagesRead consumedInterval += messagesRead if(consumedInterval > config.reportingInterval) { @@ -100,11 +195,15 @@ object SimpleConsumerPerformance { consumedInterval = 0 } } + + log.debug("Done reading messages") + val reportTime = System.currentTimeMillis val elapsed = (reportTime - startMs) / 1000.0 if(!config.showDetailedStats) { val totalMBRead = (totalBytesRead*1.0)/(1024*1024) + println("start, totalTime, fetchSize, totalMB, MB/s, totalMessages, messages/s") println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed, totalMessagesRead, totalMessagesRead/elapsed)) @@ -112,6 +211,11 @@ object SimpleConsumerPerformance { System.exit(0) } + private def getEmitTime(message: MessageAndOffset): Long = { + val msgString = Utils.readString(message.message.payload) + (msgString.split(":"))(0).toLong + } + class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") .withRequiredArg @@ -123,6 +227,17 @@ object SimpleConsumerPerformance { .ofType(classOf[String]) val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + "offset to consume from, start with the latest message present in the log rather than the earliest message.") + + val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") + .withRequiredArg + .describedAs("class") + .ofType(classOf[String]) + .defaultsTo(classOf[DefaultMessageFormatter].getName) + val messageFormatterArgOpt = parser.accepts("property") + .withRequiredArg + .describedAs("prop") + .ofType(classOf[String]) + val partitionOpt = parser.accepts("partition", "The topic partition to consume from.") .withRequiredArg .describedAs("partition") @@ -138,8 +253,19 @@ object SimpleConsumerPerformance { .describedAs("clientId") .ofType(classOf[String]) .defaultsTo("SimpleConsumerPerformanceClient") + val readUncommittedOption = parser.accepts("read-uncommitted", "Configures whether to read committed transactions or all messages") + .withRequiredArg() + .describedAs("readUncommitted") + .ofType(classOf[java.lang.Boolean]) + .defaultsTo(false) + val correctnessTestOption = parser.accepts("correctness-test", "Runs to check correctness, i.e. reading all messages from all partitions") + .withOptionalArg() + .describedAs("correctness-test") + .ofType(classOf[java.lang.Boolean]) + .defaultsTo(false) + val measureE2ELatencyOpt = parser.accepts("e2e", "Measure e2e latency, WARN: producer and consumer must run under a same logical clock") - val options = parser.parse(args : _*) + val options: OptionSet = tryParse(parser, args) CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, urlOpt) @@ -154,5 +280,29 @@ object SimpleConsumerPerformance { val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) val hideHeader = options.has(hideHeaderOpt) val clientId = options.valueOf(clientIdOpt).toString + + val messageFormatter = Class.forName(options.valueOf(messageFormatterOpt)) + + val params = ArrayBuffer[String]() + val javaParams = options.valuesOf(messageFormatterArgOpt) + for (i <- 0 until javaParams.size()) { + val arg = javaParams.get(i) + params += arg + } + val formatterArgs = CommandLineUtils.parseKeyValueArgs(params) + val readUncommitted = options.valueOf(readUncommittedOption).booleanValue() + val correctnessTest = options.valueOf(correctnessTestOption).booleanValue() + val measureE2ELatency = options.has(measureE2ELatencyOpt) + } + + def tryParse(parser: OptionParser, args: Array[String]) = { + try { + parser.parse(args : _*) + } catch { + case e: OptionException => { + Utils.croak(e.getMessage) + null + } + } } } -- 1.7.12.4