diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala new file mode 100644 index 0000000..7f0d5a1 --- /dev/null +++ b/core/src/main/scala/kafka/producer/BaseProducer.scala @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.producer + +import java.util.Properties + +// A transit producer used whenever we need to have options for both old and new producers, +// and will be removed once we fully rolled out 0.9 +trait BaseProducer { + def send(topic: String, key: Array[Byte], value: Array[Byte]) + def close() +} + +class NewShinyProducer(producerProps: Properties) extends BaseProducer { + import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} + import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback + + // decide whether to sync send based on producer properties + val sync = producerProps.getProperty("producer.type", "async").equals("sync") + + // block on buffer full + producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") + val producer = new KafkaProducer(producerProps) + + override def send(topic: String, key: Array[Byte], value: Array[Byte]) { + val record = new ProducerRecord(topic, key, value) + if(sync) { + this.producer.send(record).get() + } else { + this.producer.send(record, + new ErrorLoggingCallback(topic, key, value, false)) + } + } + + override def close() { + this.producer.close() + } +} + +class OldProducer(producerProps: Properties) extends BaseProducer { + import kafka.producer.{KeyedMessage, ProducerConfig} + + // default to byte array partitioner + if (producerProps.getProperty("partitioner.class") == null) + producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName) + val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps)) + + override def send(topic: String, key: Array[Byte], value: Array[Byte]) { + this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, value)) + } + + override def close() { + this.producer.close() + } +} + diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala index 988e437..6a3b02e 100644 --- a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala +++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala @@ -20,7 +20,7 @@ package kafka.producer import kafka.utils._ -private class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner { +class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner { def partition(key: Any, numPartitions: Int): Int = { Utils.abs(java.util.Arrays.hashCode(key.asInstanceOf[Array[Byte]])) % numPartitions } diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index b19ab49..a2af988 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -17,16 +17,17 @@ package kafka.producer -import joptsimple._ -import java.util.Properties -import java.io._ import kafka.common._ import kafka.message._ import kafka.serializer._ -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import kafka.utils.{CommandLineUtils, Utils} +import kafka.utils.CommandLineUtils + +import java.util.Properties +import java.io._ + +import joptsimple._ -object ConsoleProducer { +object ConsoleProducer { def main(args: Array[String]) { @@ -39,8 +40,46 @@ object ConsoleProducer { try { val producer = - if(config.useNewProducer) new NewShinyProducer(config) - else new OldProducer(config) + if(config.useNewProducer) { + import org.apache.kafka.clients.producer.ProducerConfig + + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) + props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) + props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) + props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) + props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) + if(config.queueEnqueueTimeoutMs != -1) + props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) + props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") + + new NewShinyProducer(props) + } else { + props.put("metadata.broker.list", config.brokerList) + props.put("compression.codec", config.compressionCodec) + props.put("producer.type", if(config.sync) "sync" else "async") + props.put("batch.num.messages", config.batchSize.toString) + props.put("message.send.max.retries", config.messageSendMaxRetries.toString) + props.put("retry.backoff.ms", config.retryBackoffMs.toString) + props.put("queue.buffering.max.ms", config.sendTimeout.toString) + props.put("queue.buffering.max.messages", config.queueSize.toString) + props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) + props.put("request.required.acks", config.requestRequiredAcks.toString) + props.put("request.timeout.ms", config.requestTimeoutMs.toString) + props.put("key.serializer.class", config.keyEncoderClass) + props.put("serializer.class", config.valueEncoderClass) + props.put("send.buffer.bytes", config.socketBuffer.toString) + props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) + props.put("client.id", "console-producer") + + new OldProducer(props) + } Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { @@ -257,72 +296,4 @@ object ConsoleProducer { } } } - - trait Producer { - def send(topic: String, key: Array[Byte], bytes: Array[Byte]) - def close() - } - - class NewShinyProducer(producerConfig: ProducerConfig) extends Producer { - import org.apache.kafka.clients.producer.ProducerConfig - val props = new Properties() - props.putAll(producerConfig.cmdLineProps) - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerConfig.brokerList) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerConfig.compressionCodec) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, producerConfig.socketBuffer.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, producerConfig.retryBackoffMs.toString) - props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, producerConfig.metadataExpiryMs.toString) - props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, producerConfig.metadataFetchTimeoutMs.toString) - props.put(ProducerConfig.ACKS_CONFIG, producerConfig.requestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, producerConfig.requestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, producerConfig.messageSendMaxRetries.toString) - props.put(ProducerConfig.LINGER_MS_CONFIG, producerConfig.sendTimeout.toString) - if(producerConfig.queueEnqueueTimeoutMs != -1) - props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerConfig.maxMemoryBytes.toString) - props.put(ProducerConfig.BATCH_SIZE_CONFIG, producerConfig.maxPartitionMemoryBytes.toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") - val producer = new KafkaProducer(props) - - def send(topic: String, key: Array[Byte], bytes: Array[Byte]) { - val response = this.producer.send(new ProducerRecord(topic, key, bytes)) - if(producerConfig.sync) { - response.get() - } - } - - def close() { - this.producer.close() - } - } - - class OldProducer(producerConfig: ConsoleProducer.ProducerConfig) extends Producer { - val props = new Properties() - props.putAll(producerConfig.cmdLineProps) - props.put("metadata.broker.list", producerConfig.brokerList) - props.put("compression.codec", producerConfig.compressionCodec) - props.put("producer.type", if(producerConfig.sync) "sync" else "async") - props.put("batch.num.messages", producerConfig.batchSize.toString) - props.put("message.send.max.retries", producerConfig.messageSendMaxRetries.toString) - props.put("retry.backoff.ms", producerConfig.retryBackoffMs.toString) - props.put("queue.buffering.max.ms", producerConfig.sendTimeout.toString) - props.put("queue.buffering.max.messages", producerConfig.queueSize.toString) - props.put("queue.enqueue.timeout.ms", producerConfig.queueEnqueueTimeoutMs.toString) - props.put("request.required.acks", producerConfig.requestRequiredAcks.toString) - props.put("request.timeout.ms", producerConfig.requestTimeoutMs.toString) - props.put("key.serializer.class", producerConfig.keyEncoderClass) - props.put("serializer.class", producerConfig.valueEncoderClass) - props.put("send.buffer.bytes", producerConfig.socketBuffer.toString) - props.put("topic.metadata.refresh.interval.ms", producerConfig.metadataExpiryMs.toString) - props.put("client.id", "console-producer") - val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new kafka.producer.ProducerConfig(props)) - - def send(topic: String, key: Array[Byte], bytes: Array[Byte]) { - this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, bytes)) - } - - def close() { - this.producer.close() - } - } } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index e4d1a86..72b4d0c 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,16 +17,18 @@ package kafka.tools -import joptsimple.OptionParser import kafka.utils.{Utils, CommandLineUtils, Logging} -import kafka.producer.{KeyedMessage, ProducerConfig, Producer} -import scala.collection.JavaConversions._ -import java.util.concurrent.CountDownLatch import kafka.consumer._ import kafka.serializer._ -import collection.mutable.ListBuffer -import kafka.tools.KafkaMigrationTool.{ProducerThread, ProducerDataChannel} -import kafka.javaapi +import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer} +import org.apache.kafka.clients.producer.ProducerRecord + +import scala.collection.mutable.ListBuffer +import scala.collection.JavaConversions._ + +import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch} + +import joptsimple.OptionParser object MirrorMaker extends Logging { @@ -52,6 +54,9 @@ object MirrorMaker extends Logging { .describedAs("config file") .ofType(classOf[String]) + val useNewProducerOpt = parser.accepts("new.producer", + "Use the new producer implementation.") + val numProducersOpt = parser.accepts("num.producers", "Number of producer instances") .withRequiredArg() @@ -70,7 +75,7 @@ object MirrorMaker extends Logging { .withRequiredArg() .describedAs("Queue size in terms of number of messages") .ofType(classOf[java.lang.Integer]) - .defaultsTo(10000); + .defaultsTo(10000) val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to mirror.") @@ -99,24 +104,35 @@ object MirrorMaker extends Logging { System.exit(1) } - val numStreams = options.valueOf(numStreamsOpt) + val numProducers = options.valueOf(numProducersOpt).intValue() + val numStreams = options.valueOf(numStreamsOpt).intValue() val bufferSize = options.valueOf(bufferSizeOpt).intValue() - val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => { - val props = Utils.loadProps(options.valueOf(producerConfigOpt)) - val config = props.getProperty("partitioner.class") match { - case null => - new ProducerConfig(props) { - override val partitionerClass = "kafka.producer.ByteArrayPartitioner" - } - case pClass : String => - new ProducerConfig(props) - } - new Producer[Array[Byte], Array[Byte]](config) - }) + val useNewProducer = options.has(useNewProducerOpt) + val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) + + // create data channel + val mirrorDataChannel = new ArrayBlockingQueue[ProducerRecord](bufferSize) + // create producer threads + val producers = (1 to numProducers).map(_ => { + if (useNewProducer) + new NewShinyProducer(producerProps) + else + new OldProducer(producerProps) + }) + + producerThreads = new ListBuffer[ProducerThread]() + var i: Int = 1 + for(producer <- producers) { + val producerThread = new ProducerThread(mirrorDataChannel, producer, i) + producerThreads += producerThread + i += 1 + } + + // create consumer streams connectors = options.valuesOf(consumerConfigOpt).toList - .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString))) + .map(cfg => new ConsumerConfig(Utils.loadProps(cfg))) .map(new ZookeeperConsumerConnector(_)) val filterSpec = if (options.has(whitelistOpt)) @@ -126,18 +142,13 @@ object MirrorMaker extends Logging { var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil try { - streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten + streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams, new DefaultDecoder(), new DefaultDecoder())).flatten } catch { case t: Throwable => fatal("Unable to create stream - shutting down mirror maker.") connectors.foreach(_.shutdown) } - - val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize); - - consumerThreads = streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, producers, streamAndIndex._2)) - - producerThreads = new ListBuffer[ProducerThread]() + consumerThreads = streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, mirrorDataChannel, producers, streamAndIndex._2)) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { @@ -145,15 +156,6 @@ object MirrorMaker extends Logging { } }) - // create producer threads - var i: Int = 1 - for(producer <- producers) { - val producerThread: KafkaMigrationTool.ProducerThread = new KafkaMigrationTool.ProducerThread(producerDataChannel, - new javaapi.producer.Producer[Array[Byte], Array[Byte]](producer), i) - producerThreads += producerThread - i += 1 - } - consumerThreads.foreach(_.start) producerThreads.foreach(_.start) @@ -173,8 +175,8 @@ object MirrorMaker extends Logging { } class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]], - producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]], - producers: Seq[Producer[Array[Byte], Array[Byte]]], + mirrorDataChannel: BlockingQueue[ProducerRecord], + producers: Seq[BaseProducer], threadId: Int) extends Thread with Logging { @@ -192,14 +194,13 @@ object MirrorMaker extends Logging { // Otherwise use a pre-assigned producer to send the message if (msgAndMetadata.key == null) { trace("Send the non-keyed message the producer channel.") - val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message) - producerDataChannel.sendRequest(pd) + val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.message) + mirrorDataChannel.put(data) } else { val producerId = Utils.abs(java.util.Arrays.hashCode(msgAndMetadata.key)) % producers.size() trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(msgAndMetadata.key), producerId)) val producer = producers(producerId) - val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) - producer.send(pd) + producer.send(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) } } } catch { @@ -219,5 +220,62 @@ object MirrorMaker extends Logging { } } } + + class ProducerThread (val dataChannel: BlockingQueue[ProducerRecord], + val producer: BaseProducer, + val threadId: Int) extends Thread { + val threadName = "ProducerThread-" + threadId + val logger = org.apache.log4j.Logger.getLogger(classOf[KafkaMigrationTool.ProducerThread].getName) + val shutdownComplete: CountDownLatch = new CountDownLatch(1) + + private final val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes) + + setName(threadName) + + override def run { + try { + while (true) { + val data: ProducerRecord = dataChannel.take + logger.trace("Sending message with value size %d".format(data.value().size)) + + if(data eq shutdownMessage) { + logger.info("Producer thread " + threadName + " finished running") + return + } + producer.send(data.topic(), data.key(), data.value()) + } + } catch { + case t: Throwable => { + logger.fatal("Producer thread failure due to ", t) + } + } finally { + shutdownComplete.countDown + } + } + + def shutdown { + try { + logger.info("Producer thread " + threadName + " shutting down") + dataChannel.put(shutdownMessage) + } + catch { + case ie: InterruptedException => { + logger.warn("Interrupt during shutdown of ProducerThread", ie) + } + } + } + + def awaitShutdown { + try { + shutdownComplete.await + producer.close + logger.info("Producer thread " + threadName + " shutdown complete") + } catch { + case ie: InterruptedException => { + logger.warn("Interrupt during shutdown of ProducerThread") + } + } + } + } } diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 1490bdb..2d5fca3 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -17,19 +17,20 @@ package kafka.perf -import java.util.concurrent.{ CountDownLatch, Executors } -import java.util.concurrent.atomic.AtomicLong -import kafka.producer._ -import org.apache.log4j.Logger +import kafka.metrics.KafkaMetricsReporter +import kafka.producer.{OldProducer, NewShinyProducer} +import kafka.utils.{VerifiableProperties, Logging} import kafka.message.CompressionCodec -import java.text.SimpleDateFormat import kafka.serializer._ + +import java.util.concurrent.{CountDownLatch, Executors} +import java.util.concurrent.atomic.AtomicLong import java.util._ -import collection.immutable.List -import kafka.utils.{VerifiableProperties, Logging, Utils} -import kafka.metrics.KafkaMetricsReporter -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback +import java.text.SimpleDateFormat +import java.math.BigInteger +import scala.collection.immutable.List + +import org.apache.log4j.Logger /** * Load test for the producer @@ -170,67 +171,6 @@ object ProducerPerformance extends Logging { val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue() } - trait Producer { - def send(topic: String, partition: Long, bytes: Array[Byte]) - def close() - } - - class OldRustyProducer(config: ProducerPerfConfig) extends Producer { - val props = new Properties() - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec.codec.toString) - props.put("send.buffer.bytes", (64 * 1024).toString) - if (!config.isSync) { - props.put("producer.type", "async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("queue.enqueue.timeout.ms", "-1") - } - props.put("client.id", "perf-test") - props.put("request.required.acks", config.producerRequestRequiredAcks.toString) - props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) - props.put("message.send.max.retries", config.producerNumRetries.toString) - props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) - props.put("serializer.class", classOf[DefaultEncoder].getName.toString) - props.put("key.serializer.class", classOf[NullEncoder[Long]].getName.toString) - val producer = new kafka.producer.Producer[Long, Array[Byte]](new ProducerConfig(props)) - - def send(topic: String, partition: Long, bytes: Array[Byte]) { - this.producer.send(new KeyedMessage[Long, Array[Byte]](topic, partition, bytes)) - } - - def close() { - this.producer.close() - } - } - - class NewShinyProducer(config: ProducerPerfConfig) extends Producer { - import org.apache.kafka.clients.producer.ProducerConfig - val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "perf-test") - props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) - val producer = new KafkaProducer(props) - - def send(topic: String, partition: Long, bytes: Array[Byte]) { - val part = partition % this.producer.partitionsFor(topic).size - if (config.isSync) { - this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes)).get() - } else { - this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes), - new ErrorLoggingCallback(topic, null, bytes, if (config.seqIdMode) true else false)) - } - } - - def close() { - this.producer.close() - } - } - class ProducerThread(val threadId: Int, val config: ProducerPerfConfig, val totalBytesSent: AtomicLong, @@ -241,11 +181,37 @@ object ProducerPerformance extends Logging { val messagesPerThread = config.numMessages / config.numThreads debug("Messages per thread = " + messagesPerThread) + val props = new Properties() val producer = - if (config.useNewProducer) - new NewShinyProducer(config) - else - new OldRustyProducer(config) + if (config.useNewProducer) { + props.put("metadata.broker.list", config.brokerList) + props.put("compression.codec", config.compressionCodec.codec.toString) + props.put("send.buffer.bytes", (64 * 1024).toString) + if (!config.isSync) { + props.put("producer.type", "async") + props.put("batch.num.messages", config.batchSize.toString) + props.put("queue.enqueue.timeout.ms", "-1") + } + props.put("client.id", "perf-test") + props.put("request.required.acks", config.producerRequestRequiredAcks.toString) + props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) + props.put("message.send.max.retries", config.producerNumRetries.toString) + props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) + props.put("serializer.class", classOf[DefaultEncoder].getName) + props.put("key.serializer.class", classOf[NullEncoder[Long]].getName) + new NewShinyProducer(props) + } else { + import org.apache.kafka.clients.producer.ProducerConfig + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "perf-test") + props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) + new OldProducer(props) + } // generate the sequential message ID private val SEP = ":" // message field separator @@ -288,15 +254,16 @@ object ProducerPerformance extends Logging { override def run { var bytesSent = 0L var nSends = 0 - var j: Long = 0L + var i: Long = 0L var message: Array[Byte] = null - while (j < messagesPerThread) { + while (i < messagesPerThread) { try { config.topics.foreach( topic => { - message = generateProducerData(topic, j) - producer.send(topic, j, message) + message = generateProducerData(topic, i) + producer.send(topic, BigInteger.valueOf(i).toByteArray, message) + bytesSent += message.size nSends += 1 if (config.messageSendGapMs > 0) Thread.sleep(config.messageSendGapMs) @@ -304,7 +271,7 @@ object ProducerPerformance extends Logging { } catch { case e: Throwable => error("Error when sending message " + new String(message), e) } - j += 1 + i += 1 } try { producer.close() diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 423b512..6917ea1 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -747,7 +747,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): cmdList = ["ssh " + hostname, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, - kafkaHome + "/bin/kafka-run-class.sh kafka.tools.newproducer.MirrorMaker", + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.MirrorMaker", "--consumer.config " + configPathName + "/" + mmConsumerConfigFile, "--producer.config " + configPathName + "/" + mmProducerConfigFile, "--whitelist=\".*\" >> ",