From a9e0a16b8584da055d156a2d2af1eb0c4a0aa266 Mon Sep 17 00:00:00 2001 From: raulcf Date: Tue, 19 Aug 2014 10:53:03 -0700 Subject: [PATCH] KAFKA-1526; producer perf tool with tx updated with new options to measure rtt and e2e latency --- .../producer/internals/RTTLoggingCallback.java | 37 +++++++++ .../main/scala/kafka/producer/BaseProducer.scala | 93 ++++++++++++++++++++++ .../scala/kafka/tools/ProducerPerformance.scala | 73 ++++++++++++++--- 3 files changed, 192 insertions(+), 11 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/internals/RTTLoggingCallback.java diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RTTLoggingCallback.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RTTLoggingCallback.java new file mode 100644 index 0000000..f582fcb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RTTLoggingCallback.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RTTLoggingCallback implements Callback{ + private static final Logger log = LoggerFactory.getLogger(RTTLoggingCallback.class); + + private long startTime; + + public RTTLoggingCallback() { + this.startTime = System.currentTimeMillis(); + } + + public void onCompletion(RecordMetadata metadata, Exception e) { + long endTime = System.currentTimeMillis(); + log.trace("! {}", (endTime-startTime)); + } +} diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala index b020793..cc6ad86 100644 --- a/core/src/main/scala/kafka/producer/BaseProducer.scala +++ b/core/src/main/scala/kafka/producer/BaseProducer.scala @@ -17,15 +17,63 @@ package kafka.producer +import org.apache.kafka.clients.consumer.OffsetMetadata +import org.apache.kafka.clients.producer.Callback +; +import org.apache.kafka.common.TopicPartition; + import java.util.Properties // A base producer used whenever we need to have options for both old and new producers; // this class will be removed once we fully rolled out 0.9 trait BaseProducer { + def begin() + def begin(txTimeoutMs: Int) + def commit(offsets: java.util.Map[TopicPartition, OffsetMetadata]) + def abort() def send(topic: String, key: Array[Byte], value: Array[Byte]) + def send(topic: String, key: Array[Byte], value: Array[Byte], callback: Callback) def close() } +class NewTxProducer(producerProps: Properties) extends BaseProducer { + import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} + import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback + + val producer = new KafkaProducer(producerProps) + + override def send(topic: String, key: Array[Byte], value: Array[Byte]) { + val record = new ProducerRecord(topic, key, value) + this.producer.send(record, + new ErrorLoggingCallback(topic, key, value, false)) + } + + override def send(topic: String, key: Array[Byte], value: Array[Byte], callback: Callback) { + val record = new ProducerRecord(topic, key, value) + this.producer.send(record, callback) + } + + override def begin() { + this.producer.begin(60000) + } + + override def begin(txTimeoutMs: Int) { + this.producer.begin(txTimeoutMs) + } + + override def abort() { + this.producer.abort() + } + + override def commit(offsets: java.util.Map[TopicPartition, OffsetMetadata]) { + this.producer.commit(offsets) + } + + override def close() { + this.producer.close() + } +} + class NewShinyProducer(producerProps: Properties) extends BaseProducer { import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback @@ -45,6 +93,31 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer { } } + override def send(topic: String, key: Array[Byte], value: Array[Byte], callback: Callback) { + val record = new ProducerRecord(topic, key, value) + if(sync) { + this.producer.send(record).get() + } else { + this.producer.send(record, callback) + } + } + + override def begin() { + + } + + override def begin(txTimeoutMs: Int) { + + } + + override def abort() { + + } + + override def commit(offsets: java.util.Map[TopicPartition, OffsetMetadata]) { + + } + override def close() { this.producer.close() } @@ -62,6 +135,26 @@ class OldProducer(producerProps: Properties) extends BaseProducer { this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, value)) } + override def send(topic: String, key: Array[Byte], value: Array[Byte], callback: Callback) { + this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, value)) + } + + override def begin() { + + } + + override def begin(txTimeoutMs: Int) { + + } + + override def abort() { + + } + + override def commit(offsets: java.util.Map[TopicPartition, OffsetMetadata]) { + + } + override def close() { this.producer.close() } diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index fc3e724..2d468d8 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -18,7 +18,7 @@ package kafka.tools import kafka.metrics.KafkaMetricsReporter -import kafka.producer.{OldProducer, NewShinyProducer} +import kafka.producer.{OldProducer, NewShinyProducer, NewTxProducer} import kafka.utils.{VerifiableProperties, Logging, CommandLineUtils} import kafka.message.CompressionCodec import kafka.serializer._ @@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicLong import java.util._ import java.text.SimpleDateFormat import java.math.BigInteger +import org.apache.kafka.clients.producer.internals.RTTLoggingCallback + import scala.collection.immutable.List import org.apache.log4j.Logger @@ -63,6 +65,7 @@ object ProducerPerformance extends Logging { val endMs = System.currentTimeMillis val elapsedSecs = (endMs - startMs) / 1000.0 val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024) + println() println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format( config.dateFormat.format(startMs), config.dateFormat.format(endMs), config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent, @@ -122,6 +125,15 @@ object ProducerPerformance extends Logging { .ofType(classOf[java.lang.String]) val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") + val numMessagesPerTxOpt = parser.accepts("messages-per-tx", "Number of messages per transaction") + .withRequiredArg + .describedAs("# messages per transaction") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3) + val useTxProducerOpt = parser.accepts("tx-producer", "Use the transactional producer implementation") + val measureRTTOpt = parser.accepts("report-rtt", "Report RTT per message") + val measureE2ELatencyOpt = parser.accepts("e2e", "Measure e2e latency, WARN: producer and consumer must run under a same logical clock") + val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt) @@ -147,6 +159,10 @@ object ProducerPerformance extends Logging { val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue() val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue() val useNewProducer = options.has(useNewProducerOpt) + val numMessagesPerTx = options.valueOf(numMessagesPerTxOpt).intValue() + val useTxProducer = options.has(useTxProducerOpt) + val measureRTT = options.has(measureRTTOpt) + val measureE2ELatency = options.has(measureE2ELatencyOpt) val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) @@ -178,7 +194,19 @@ object ProducerPerformance extends Logging { debug("Messages per thread = " + messagesPerThread) val props = new Properties() val producer = - if (config.useNewProducer) { + if (config.useTxProducer) { + import org.apache.kafka.clients.producer.ProducerConfig + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance") + props.put(ProducerConfig.CLIENT_TX_GROUP_CONFIG, 0.toString) + props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) + new NewTxProducer(props) + } else if (config.useNewProducer) { import org.apache.kafka.clients.producer.ProducerConfig props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) @@ -224,7 +252,9 @@ object ProducerPerformance extends Logging { // . . . leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId)) - val msgHeader = topicLabel + SEP + + val currentTime = System.currentTimeMillis() + + val msgHeader = currentTime.toString + SEP + topicLabel + SEP + topic + SEP + threadIdLabel + SEP + threadId + SEP + @@ -252,22 +282,43 @@ object ProducerPerformance extends Logging { var i: Long = 0L var message: Array[Byte] = null + var msgSentInTx = 0 + + if (config.useTxProducer) producer.begin() // start a new transaction while (i < messagesPerThread) { + + if (config.useTxProducer && config.numMessagesPerTx > 0 && msgSentInTx == config.numMessagesPerTx){ + producer.commit(null) // commit in-flight tx + + producer.begin() // and start a new one + msgSentInTx = 0 // resent the counter for the new tx + } + try { config.topics.foreach( - topic => { - message = generateProducerData(topic, i) - producer.send(topic, BigInteger.valueOf(i).toByteArray, message) - bytesSent += message.size - nSends += 1 - if (config.messageSendGapMs > 0) - Thread.sleep(config.messageSendGapMs) - }) + topic => { + message = generateProducerData(topic, i) + if(config.measureRTT) { + producer.send(topic, "%d".format(i).getBytes(), message, new RTTLoggingCallback()) + } else { + producer.send(topic, "%d".format(i).getBytes(), message) + } + msgSentInTx += 1 // no need to check whether tx are active + bytesSent += message.size + nSends += 1 + if (config.messageSendGapMs > 0) + Thread.sleep(config.messageSendGapMs) + }) } catch { case e: Throwable => error("Error when sending message " + new String(message), e) } i += 1 } + + if (config.useTxProducer) { + producer.commit(null) + } // finish last transaction + try { producer.close() } catch { -- 1.7.12.4