From ea14495af2b5487d9daa6f18e2e414ee28caad5c Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Tue, 19 Aug 2014 10:50:03 -0700 Subject: [PATCH] KAFKA-1601; ConsoleConsumer/SimpleConsumerPerformance should be transaction-aware --- .../kafka/consumer/ConsumerTransactionBuffer.scala | 59 ++++++++++++++++++++++ .../scala/kafka/message/MessageAndMetadata.scala | 2 +- .../main/scala/kafka/tools/ConsoleConsumer.scala | 43 +++++++++------- .../kafka/tools/SimpleConsumerPerformance.scala | 6 ++- 4 files changed, 89 insertions(+), 21 deletions(-) create mode 100644 core/src/main/scala/kafka/consumer/ConsumerTransactionBuffer.scala diff --git a/core/src/main/scala/kafka/consumer/ConsumerTransactionBuffer.scala b/core/src/main/scala/kafka/consumer/ConsumerTransactionBuffer.scala new file mode 100644 index 0000000..52e04dd --- /dev/null +++ b/core/src/main/scala/kafka/consumer/ConsumerTransactionBuffer.scala @@ -0,0 +1,59 @@ +package kafka.consumer + +import kafka.message._ +import scala.collection.mutable +import kafka.utils._ + +object TxRequestTypes { + val Ongoing: Short = 0 + val Begin: Short = 1 + val PreCommit: Short = 2 + val Commit: Short = 3 + val Committed: Short = 4 + val PreAbort: Short = 5 + val Abort: Short = 6 + val Aborted: Short = 7 +} + +/** + * Provide buffer for transactional messages + * Queue messages per transaction + * Dequeue messages when their transaction is committed + */ +class ConsumerTransactionBuffer extends Logging{ + + private val bufferedMessagesPerTx = new mutable.HashMap[Int, mutable.Queue[MessageAndOffset]] + + def bufferAndMaybeGet(message: MessageAndOffset): Seq[MessageAndOffset] = { + val txId = message.message.txId + val txRequestType = message.message.txControl + + if (txId == -1) { + Seq(message) + } + else { + txRequestType match { + case TxRequestTypes.Ongoing => + val queue = bufferedMessagesPerTx.getOrElseUpdate(txId, new mutable.Queue[MessageAndOffset]) + queue += message + Seq.empty + case TxRequestTypes.Commit => + bufferedMessagesPerTx.remove(txId).getOrElse(Seq.empty) + case TxRequestTypes.Abort => + bufferedMessagesPerTx.remove(txId) + Seq.empty + case _ => + error("Unrecognized txRequest type: %i".format(txRequestType)) + Seq.empty + } + } + } + + def bufferAndMaybeGet(messages: ByteBufferMessageSet): Seq[MessageAndOffset] = { + val committedMessages = new mutable.Queue[MessageAndOffset] + for (message <- messages) { + committedMessages ++= bufferAndMaybeGet(message) + } + committedMessages + } +} diff --git a/core/src/main/scala/kafka/message/MessageAndMetadata.scala b/core/src/main/scala/kafka/message/MessageAndMetadata.scala index d693abc..780c00f 100644 --- a/core/src/main/scala/kafka/message/MessageAndMetadata.scala +++ b/core/src/main/scala/kafka/message/MessageAndMetadata.scala @@ -21,7 +21,7 @@ import kafka.serializer.Decoder import kafka.utils.Utils case class MessageAndMetadata[K, V](topic: String, partition: Int, - private val rawMessage: Message, offset: Long, + val rawMessage: Message, offset: Long, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) { /** diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 323fc85..5ec6884 100644 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -27,7 +27,7 @@ import kafka.message._ import kafka.serializer._ import kafka.utils._ import kafka.metrics.KafkaMetricsReporter -import kafka.consumer.{Blacklist,Whitelist,ConsumerConfig,Consumer} +import kafka.consumer._ /** * Consumer that dumps messages out to standard out. @@ -157,6 +157,7 @@ object ConsoleConsumer extends Logging { } }) + val consumerTransactionBuffer = new ConsumerTransactionBuffer() var numMessages = 0L val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(formatterArgs) @@ -168,23 +169,29 @@ object ConsoleConsumer extends Logging { stream for(messageAndTopic <- iter) { - try { - formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) - numMessages += 1 - } catch { - case e: Throwable => - if (skipMessageOnError) - error("Error processing message, skipping this message: ", e) - else - throw e - } - if(System.out.checkError()) { - // This means no one is listening to our output stream any more, time to shutdown - System.err.println("Unable to write to standard out, closing consumer.") - System.err.println("Consumed %d messages".format(numMessages)) - formatter.close() - connector.shutdown() - System.exit(1) + val messageAndOffset = MessageAndOffset(messageAndTopic.rawMessage, messageAndTopic.offset); + for (committedMessageAndOffset <- consumerTransactionBuffer.bufferAndMaybeGet(messageAndOffset)) { + val committedMessageAndTopic = MessageAndMetadata(messageAndTopic.topic, messageAndTopic.partition, + committedMessageAndOffset.message, committedMessageAndOffset.offset, + messageAndTopic.keyDecoder, messageAndTopic.valueDecoder) + try { + formatter.writeTo(committedMessageAndTopic.key, committedMessageAndTopic.message, System.out) + numMessages += 1 + } catch { + case e: Throwable => + if (skipMessageOnError) + error("Error processing message, skipping this message: ", e) + else + throw e + } + if (System.out.checkError()) { + // This means no one is listening to our output stream any more, time to shutdown + System.err.println("Unable to write to standard out, closing consumer.") + System.err.println("Consumed %d messages".format(numMessages)) + formatter.close() + connector.shutdown() + System.exit(1) + } } } } catch { diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala index 7602b8d..9465afd 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala @@ -20,7 +20,7 @@ package kafka.tools import java.net.URI import java.text.SimpleDateFormat import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} -import kafka.consumer.SimpleConsumer +import kafka.consumer.{ConsumerTransactionBuffer, SimpleConsumer} import kafka.utils._ import org.apache.log4j.Logger import kafka.common.TopicAndPartition @@ -43,6 +43,7 @@ object SimpleConsumerPerformance { } val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId) + val consumerTransactionBuffer = new ConsumerTransactionBuffer() // reset to latest or smallest offset val topicAndPartition = TopicAndPartition(config.topic, config.partition) @@ -70,7 +71,8 @@ object SimpleConsumerPerformance { var messagesRead = 0 var bytesRead = 0 val messageSet = fetchResponse.messageSet(config.topic, config.partition) - for (message <- messageSet) { + val committedMessages = consumerTransactionBuffer.bufferAndMaybeGet(messageSet) + for (message <- committedMessages) { messagesRead += 1 bytesRead += message.message.payloadSize } -- 1.7.12.4