diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala index b020793..d791b1c 100644 --- a/core/src/main/scala/kafka/producer/BaseProducer.scala +++ b/core/src/main/scala/kafka/producer/BaseProducer.scala @@ -17,15 +17,55 @@ package kafka.producer +import org.apache.kafka.clients.consumer.OffsetMetadata; +import org.apache.kafka.common.TopicPartition; + import java.util.Properties // A base producer used whenever we need to have options for both old and new producers; // this class will be removed once we fully rolled out 0.9 trait BaseProducer { + def begin() + def begin(txTimeoutMs: Int) + def commit(offsets: java.util.Map[TopicPartition, OffsetMetadata]) + def abort() def send(topic: String, key: Array[Byte], value: Array[Byte]) def close() } +class NewTxProducer(producerProps: Properties) extends BaseProducer { + import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} + import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback + + val producer = new KafkaProducer(producerProps) + + override def send(topic: String, key: Array[Byte], value: Array[Byte]) { + val record = new ProducerRecord(topic, key, value) + this.producer.send(record, + new ErrorLoggingCallback(topic, key, value, false)) + } + + override def begin() { + this.producer.begin() + } + + override def begin(txTimeoutMs: Int) { + this.producer.begin(txTimeoutMs) + } + + override def abort() { + this.producer.abort() + } + + override def commit(offsets: java.util.Map[TopicPartition, OffsetMetadata]) { + this.producer.commit(offsets) + } + + override def close() { + this.producer.close() + } +} + class NewShinyProducer(producerProps: Properties) extends BaseProducer { import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback @@ -45,6 +85,22 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer { } } + override def begin() { + + } + + override def begin(txTimeoutMs: Int) { + + } + + override def abort() { + + } + + override def commit(offsets: java.util.Map[TopicPartition, OffsetMetadata]) { + + } + override def close() { this.producer.close() } @@ -62,6 +118,22 @@ class OldProducer(producerProps: Properties) extends BaseProducer { this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, value)) } + override def begin() { + + } + + override def begin(txTimeoutMs: Int) { + + } + + override def abort() { + + } + + override def commit(offsets: java.util.Map[TopicPartition, OffsetMetadata]) { + + } + override def close() { this.producer.close() } diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index fc3e724..416cb1c 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -18,7 +18,7 @@ package kafka.tools import kafka.metrics.KafkaMetricsReporter -import kafka.producer.{OldProducer, NewShinyProducer} +import kafka.producer.{OldProducer, NewShinyProducer, NewTxProducer} import kafka.utils.{VerifiableProperties, Logging, CommandLineUtils} import kafka.message.CompressionCodec import kafka.serializer._ @@ -121,6 +121,12 @@ object ProducerPerformance extends Logging { .describedAs("metrics dictory") .ofType(classOf[java.lang.String]) val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") + val numMessagesPerTxOpt = parser.accepts("messages-per-tx", "Number of messages per transaction") + .withRequiredArg + .describedAs("# messages per transaction") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3) + val useTxProducerOpt = parser.accepts("tx-producer", "Use the transactional producer implementation") val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt) @@ -147,6 +153,8 @@ object ProducerPerformance extends Logging { val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue() val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue() val useNewProducer = options.has(useNewProducerOpt) + val numMessagesPerTx = options.valueOf(numMessagesPerTxOpt).intValue() + val useTxProducer = options.has(useTxProducerOpt) val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) @@ -178,7 +186,19 @@ object ProducerPerformance extends Logging { debug("Messages per thread = " + messagesPerThread) val props = new Properties() val producer = - if (config.useNewProducer) { + if (config.useTxProducer) { + import org.apache.kafka.clients.producer.ProducerConfig + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance") + props.put(ProducerConfig.CLIENT_TX_GROUP_CONFIG, 0.toString) + props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) + new NewTxProducer(props) + } else if (config.useNewProducer) { import org.apache.kafka.clients.producer.ProducerConfig props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) @@ -252,22 +272,36 @@ object ProducerPerformance extends Logging { var i: Long = 0L var message: Array[Byte] = null + var msgSentInTx = 0 + + if (config.useTxProducer) producer.begin() // start a new transaction while (i < messagesPerThread) { + + if (config.useTxProducer && config.numMessagesPerTx > 0 && msgSentInTx == config.numMessagesPerTx){ + producer.commit(null) // commit in-flight tx + producer.begin() // and start a new one + msgSentInTx = 0 // resent the counter for the new tx + } + try { config.topics.foreach( - topic => { - message = generateProducerData(topic, i) - producer.send(topic, BigInteger.valueOf(i).toByteArray, message) - bytesSent += message.size - nSends += 1 - if (config.messageSendGapMs > 0) - Thread.sleep(config.messageSendGapMs) - }) + topic => { + message = generateProducerData(topic, i) + producer.send(topic, "%d".format(i).getBytes(), message) + msgSentInTx += 1 // no need to check whether tx are active + bytesSent += message.size + nSends += 1 + if (config.messageSendGapMs > 0) + Thread.sleep(config.messageSendGapMs) + }) } catch { case e: Throwable => error("Error when sending message " + new String(message), e) } i += 1 } + + if (config.useTxProducer) producer.commit(null) // finish last transaction + try { producer.close() } catch {