diff --git a/bin/kafka-simple-consumer-perf-test.sh b/bin/kafka-simple-consumer-perf-test.sh new file mode 100644 index 0000000..91abacc --- /dev/null +++ b/bin/kafka-simple-consumer-perf-test.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then + export KAFKA_HEAP_OPTS="-Xmx512M" +fi +exec $(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerPerformance $@ diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 1cf2f62..1d41776 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -51,6 +51,7 @@ object ConsumerConfig extends Config { val ExcludeInternalTopics = true val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" val DefaultClientId = "" + val ReadCommitted = false def validate(config: ConsumerConfig) { validateClientId(config.clientId) @@ -175,6 +176,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */ val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics) + /** Whether consumer should only return committed messages **/ + val readCommitted = props.getBoolean("read.committed", ReadCommitted) + validate(this) } diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 0e64632..9172d6d 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -18,10 +18,14 @@ package kafka.consumer import kafka.api._ +import kafka.message.{Message, ByteBufferMessageSet} import kafka.network._ import kafka.utils._ import kafka.common.{ErrorMapping, TopicAndPartition} +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + /** * A consumer of kafka messages */ @@ -39,6 +43,15 @@ class SimpleConsumer(val host: String, private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId) private var isClosed = false + /** Keeps pending transaction data in memory **/ + private val pendingTxData = new mutable.HashMap[(String, Int, Int), ListBuffer[Message]] + /** Maintains a list of transactions that can be consumed in order **/ + private val completedTx = new mutable.HashMap[(String, Int), mutable.Queue[ListBuffer[Message]]] + /** Keeps non-transactional messages per topic-partition **/ + private val nonTxMessages = new mutable.HashMap[(String, Int), ListBuffer[Message]] + /** Keeps last consumed offset per topic-partition **/ + private val lastConsumedOffset = new mutable.HashMap[(String, Int), Long]() + private def connect(): BlockingChannel = { close blockingChannel.connect() @@ -98,7 +111,7 @@ class SimpleConsumer(val host: String, } /** - * Fetch a set of messages from a topic. + * Fetch a set of messages from a topic * * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. * @return a set of fetched messages @@ -120,6 +133,198 @@ class SimpleConsumer(val host: String, } /** + * Fetch messages from a topic-partition. It will return all available messages (tx and non-tx) per topic-partition + * @param request + * @return + */ + def fetchTx(request: FetchRequest): Map[(String, Int), (Seq[Message], Long)] = { + var response: Receive = null + var reqBuilder = new FetchRequestBuilder() + var req = reqBuilder.build() + val reqData = new ListBuffer[(String, Int, Long, Int)] + val fetchSize = request.requestInfo.values.head.fetchSize + val availableData: mutable.Map[(String, Int), (Seq[Message], Long)] = new mutable.HashMap[(String, Int), (Seq[Message], Long)] + val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestTimer + val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer + + var continueFetching = true + do { + aggregateTimer.time { + specificTimer.time { + response = sendRequest(request) + } + } + val fetchResponse = FetchResponse.readFrom(response.buffer) + val fetchedSize = fetchResponse.sizeInBytes + fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestSizeHist.update(fetchedSize) + fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize) + + // iterate over byteBufferMessageSet and inspect messages + for (tp <- fetchResponse.data.keys) { + val topic = tp.topic + val partition = tp.partition + var offset: Long = 0 + for (msg <- fetchResponse.messageSet(topic, partition)) { + offset = msg.nextOffset + maybeUpdateLastConsumedOffset(offset, topic, partition) + lastConsumedOffset.put((topic, partition), offset) + val txId = msg.message.txId + debug("Message of transaction: " + txId) + if (txId != -1) { + // handle tx message + val txControl = getTxControl(msg.message.attributes) + debug("txControl: " + txControl) + txControl match { + case TxControl.Abort => + abortTransaction(topic, partition, txId) + case TxControl.Commit => + commitTransaction(topic, partition, txId) + case TxControl.Ongoing => + dataOf(topic, partition, txId, msg.message) + } + } + else { + // handle non-tx message + if(nonTxMessages.contains((topic, partition))) { + nonTxMessages((topic, partition)).append(msg.message) + } else { + val buffer = new ListBuffer[Message]() + buffer.append(msg.message) + nonTxMessages.put((topic, partition), buffer) + } + } + } + // incrementally create a potential next request + reqData.append((topic, partition, offset, fetchSize)) + } + + // check if there is data to return to application and delete from state if so + if(completedTx.size > 0 || nonTxMessages.size > 0) { + continueFetching = false + // populate with nontx messages + for ((topic, partition) <- nonTxMessages.keys) { + val offset = lastConsumedOffset((topic, partition)) + if (availableData.contains((topic, partition))) { + val messages = (availableData((topic, partition))_1) ++ nonTxMessages((topic, partition)) + availableData.put((topic, partition), (messages, offset)) + nonTxMessages.remove((topic, partition)) + } else { + availableData.put((topic, partition), (nonTxMessages((topic, partition)), offset)) + nonTxMessages.remove((topic, partition)) + } + } + // and then with tx data + for ((topic, partition) <- completedTx.keys) { + val offset = lastConsumedOffset((topic, partition)) + if (availableData.contains((topic, partition))) { + var txMsgs = new ListBuffer[Message]() + for(q <- completedTx((topic, partition))) txMsgs = txMsgs ++ q + val messages: Seq[Message] = (availableData((topic, partition))_1) ++ txMsgs + availableData.put((topic, partition), (messages, offset)) + } else { + var txMsgs =new ListBuffer[Message]() + for(q <- completedTx((topic, partition))) + txMsgs = txMsgs ++ q + availableData.put((topic, partition), (txMsgs, offset)) + } + completedTx.remove((topic, partition)) + } + } + + // create new fetchRequest if we continue fetching + if (continueFetching) { + for ((topic, partition, offset, fetchSize) <- reqData) { + reqBuilder = reqBuilder.addFetch(topic, partition, offset, fetchSize) + } + req = reqBuilder.build() + } + } while (continueFetching) + availableData.toMap + } + + /** + * This function updates latest consumed offset if necessary + * @param offset + * @param topic + * @param partition + * @return + */ + private def maybeUpdateLastConsumedOffset(offset: Long, topic: String, partition: Int) = { + if(lastConsumedOffset.contains((topic, partition))) { + if (offset > lastConsumedOffset((topic, partition))) { + lastConsumedOffset.put((topic, partition), offset) + } + } + } + + object TxControl { + val Ongoing: Short = 0 + val Commit: Short = 3 + val Abort: Short = 6 + } + + /** + * This function returns the txControl flag given the message attributes + * @param attr + * @return + */ + private def getTxControl(attr: Short) = { + // apply mask to attr and get correct txControl + val txControlMask = 0x07 << 3 + val txControlOffset = 3 + ((attr & txControlMask) >> txControlOffset).toShort + } + + /** + * This function will abort a transaction, i.e. clean all associated state + * @param topic + * @param partition + * @param txId + * @return + */ + private def abortTransaction(topic: String, partition: Int, txId: Int) = { + trace("Aborting transaction: " + txId) + pendingTxData.remove((topic, partition, txId)) + } + + /** + * This function will commit a transaction, moving its data to completedTx so that it can be returned to the application + * @param topic + * @param partition + * @param txId + * @return + */ + private def commitTransaction(topic: String, partition: Int, txId: Int) = { + trace("Committing transaction: " + txId) + if(completedTx.contains((topic, partition))) { + completedTx((topic, partition)).enqueue(pendingTxData((topic, partition, txId))) + } else { + val msgs = new mutable.Queue[ListBuffer[Message]]() + msgs.enqueue(pendingTxData((topic, partition, txId))) + completedTx.put((topic, partition), msgs) + } + pendingTxData.remove((topic, partition, txId)) + } + + /** + * This method accumulates data of a transaction + * @param topic + * @param partition + * @param txId + * @param message + * @return + */ + private def dataOf(topic: String, partition: Int, txId: Int, message: Message) = { + if(pendingTxData.contains((topic, partition, txId))) { + pendingTxData((topic, partition, txId)).append(message) + } else { + val buffer = new ListBuffer[Message]() + buffer.append(message) + pendingTxData.put((topic, partition, txId), buffer) + } + } + + /** * Get a list of valid offsets (up to maxSize) before the given time. * @param request a [[kafka.api.OffsetRequest]] object. * @return a [[kafka.api.OffsetResponse]] object. diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala index 7602b8d..0ff3d06 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala @@ -59,27 +59,45 @@ object SimpleConsumerPerformance { var lastReportTime: Long = startMs var lastBytesRead = 0L var lastMessagesRead = 0L + if(! config.readCommitted) + println("Simple Consumer configured to READ UNCOMMITTED") + else + println("Simple Consumer configured to READ COMMITTED") + while(!done) { // TODO: add in the maxWait and minBytes for performance val request = new FetchRequestBuilder() .clientId(config.clientId) .addFetch(config.topic, config.partition, offset, config.fetchSize) .build() - val fetchResponse = consumer.fetch(request) var messagesRead = 0 var bytesRead = 0 - val messageSet = fetchResponse.messageSet(config.topic, config.partition) - for (message <- messageSet) { - messagesRead += 1 - bytesRead += message.message.payloadSize - } - - if(messagesRead == 0 || totalMessagesRead > config.numMessages) - done = true - else + if(! config.readCommitted) { + val fetchResponse = consumer.fetch(request) + val messageSet = fetchResponse.messageSet(config.topic, config.partition) + for (message <- messageSet) { + messagesRead += 1 + bytesRead += message.message.payloadSize + } + if(messagesRead == 0 || totalMessagesRead > config.numMessages) + done = true + else // we only did one fetch so we find the offset for the first (head) messageset - offset += messageSet.validBytes + offset += messageSet.validBytes + } else if (config.readCommitted) { + val fetchTxResponse = consumer.fetchTx(request) + val (messages, lastConsumedOffset) = fetchTxResponse((config.topic, config.partition)) + println("Got "+messages.size+" messages from "+config.topic+" - "+config.partition+" with last offset: "+lastConsumedOffset) + for (message <- messages) { + messagesRead += 1 + bytesRead += message.payloadSize + } + if(messagesRead == 0 || totalMessagesRead > config.numMessages) + done = true + else + offset = lastConsumedOffset + } totalBytesRead += bytesRead totalMessagesRead += messagesRead @@ -154,5 +172,6 @@ object SimpleConsumerPerformance { val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) val hideHeader = options.has(hideHeaderOpt) val clientId = options.valueOf(clientIdOpt).toString + val readCommitted = true } }