diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 75a3fc4..ba6e210 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -70,6 +70,11 @@ do CLASSPATH=$CLASSPATH:$file done +for file in $base_dir/clients/target/scala-${SCALA_VERSION}/clients*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + # classpath addition for release for file in $base_dir/libs/*.jar; do diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index b274e5e..56b93c7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer.internals; @@ -151,7 +147,7 @@ public class Sender implements Runnable { // do the I/O try { - this.selector.poll(5L, sends); + this.selector.poll(100L, sends); } catch (IOException e) { e.printStackTrace(); } @@ -188,6 +184,7 @@ public class Sender implements Runnable { public void initiateClose() { this.running = false; this.accumulator.close(); + this.wakeup(); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index 108d61e..3ebbb80 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -26,22 +26,25 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.record.Records; - public class ProducerPerformance { public static void main(String[] args) throws Exception { - if (args.length != 3) { - System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url num_records record_size"); + if (args.length != 5) { + System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks"); System.exit(1); } String url = args[0]; - int numRecords = Integer.parseInt(args[1]); - int recordSize = Integer.parseInt(args[2]); + String topicName = args[1]; + int numRecords = Integer.parseInt(args[2]); + int recordSize = Integer.parseInt(args[3]); + int acks = Integer.parseInt(args[4]); Properties props = new Properties(); - props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, "1"); + props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, Integer.toString(acks)); props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url); props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000)); props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); + props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(256 * 1024 * 1024)); + props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(256 * 1024)); KafkaProducer producer = new KafkaProducer(props); Callback callback = new Callback() { @@ -52,7 +55,7 @@ public class ProducerPerformance { }; byte[] payload = new byte[recordSize]; Arrays.fill(payload, (byte) 1); - ProducerRecord record = new ProducerRecord("test", payload); + ProducerRecord record = new ProducerRecord(topicName, payload); long start = System.currentTimeMillis(); long maxLatency = -1L; long totalLatency = 0; @@ -75,8 +78,8 @@ public class ProducerPerformance { long ellapsed = System.currentTimeMillis() - start; double msgsSec = 1000.0 * numRecords / (double) ellapsed; double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0); - System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).", numRecords, ellapsed, msgsSec, mbSec); producer.close(); + System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).\n", numRecords, ellapsed, msgsSec, mbSec); } } diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala index c4aed10..7757f83 100644 --- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala +++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -24,23 +24,24 @@ import kafka.message._ object TestEndToEndLatency { def main(args: Array[String]) { - if(args.length != 3) { - System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect num_messages") + if (args.length != 4) { + System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages") System.exit(1) } val brokerList = args(0) val zkConnect = args(1) - val numMessages = args(2).toInt - val topic = "test" - + val topic = args(2) + val numMessages = args(3).toInt + val consumerProps = new Properties() consumerProps.put("group.id", topic) consumerProps.put("auto.commit", "true") consumerProps.put("auto.offset.reset", "largest") consumerProps.put("zookeeper.connect", zkConnect) consumerProps.put("socket.timeout.ms", 1201000.toString) - + //consumerProps.put("fetch.wait.max.ms", 500.toString) + val config = new ConsumerConfig(consumerProps) val connector = Consumer.create(config) var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head @@ -50,16 +51,16 @@ object TestEndToEndLatency { producerProps.put("metadata.broker.list", brokerList) producerProps.put("producer.type", "sync") val producer = new Producer[Any, Any](new ProducerConfig(producerProps)) - - val message = new Message("hello there beautiful".getBytes) + + val message = "hello there beautiful".getBytes var totalTime = 0.0 - for(i <- 0 until numMessages) { + for (i <- 0 until numMessages) { var begin = System.nanoTime producer.send(new KeyedMessage(topic, message)) val received = iter.next val elapsed = System.nanoTime - begin // poor man's progress bar - if(i % 10000 == 0) + if (i % 1000 == 0) println(i + "\t" + elapsed / 1000.0 / 1000.0) totalTime += elapsed } diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index 55ee01b..4dde468 100644 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -23,7 +23,7 @@ import java.nio.channels.ClosedByInterruptException import org.apache.log4j.Logger import kafka.message.Message import kafka.utils.ZkUtils -import java.util.{Random, Properties} +import java.util.{ Random, Properties } import kafka.consumer._ import java.text.SimpleDateFormat @@ -40,8 +40,8 @@ object ConsumerPerformance { var totalMessagesRead = new AtomicLong(0) var totalBytesRead = new AtomicLong(0) - if(!config.hideHeader) { - if(!config.showDetailedStats) + if (!config.hideHeader) { + if (!config.showDetailedStats) println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") else println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") @@ -57,7 +57,7 @@ object ConsumerPerformance { for ((topic, streamList) <- topicMessageStreams) for (i <- 0 until streamList.length) threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, - totalMessagesRead, totalBytesRead) + totalMessagesRead, totalBytesRead) logger.info("Sleeping for 1 second.") Thread.sleep(1000) @@ -67,61 +67,61 @@ object ConsumerPerformance { thread.start for (thread <- threadList) - thread.shutdown + thread.join val endMs = System.currentTimeMillis val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0 - if(!config.showDetailedStats) { - val totalMBRead = (totalBytesRead.get*1.0)/(1024*1024) + if (!config.showDetailedStats) { + val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024) println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs), - config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead/elapsedSecs, totalMessagesRead.get, - totalMessagesRead.get/elapsedSecs)) + config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, + totalMessagesRead.get / elapsedSecs)) } System.exit(0) } class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) val groupIdOpt = parser.accepts("group", "The group id to consume on.") - .withRequiredArg - .describedAs("gid") - .defaultsTo("perf-consumer-" + new Random().nextInt(100000)) - .ofType(classOf[String]) + .withRequiredArg + .describedAs("gid") + .defaultsTo("perf-consumer-" + new Random().nextInt(100000)) + .ofType(classOf[String]) val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024 * 1024) + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024 * 1024) val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + "offset to consume from, start with the latest message present in the log rather than the earliest message.") val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(2 * 1024 * 1024) + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(2 * 1024 * 1024) val numThreadsOpt = parser.accepts("threads", "Number of processing threads.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(10) + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(10) val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) - val options = parser.parse(args : _*) + val options = parser.parse(args: _*) - for(arg <- List(topicOpt, zkConnectOpt)) { - if(!options.has(arg)) { + for (arg <- List(topicOpt, zkConnectOpt)) { + if (!options.has(arg)) { System.err.println("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) System.exit(1) @@ -132,7 +132,7 @@ object ConsumerPerformance { props.put("group.id", options.valueOf(groupIdOpt)) props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) - props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest") + props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest") props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) props.put("consumer.timeout.ms", "5000") props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString) @@ -147,13 +147,8 @@ object ConsumerPerformance { } class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]], - config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) + config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) extends Thread(name) { - private val shutdownLatch = new CountDownLatch(1) - - def shutdown(): Unit = { - shutdownLatch.await - } override def run() { var bytesRead = 0L @@ -164,43 +159,41 @@ object ConsumerPerformance { var lastMessagesRead = 0L try { - for (messageAndMetadata <- stream if messagesRead < config.numMessages) { + val iter = stream.iterator + while (iter.hasNext && messagesRead < config.numMessages) { + val messageAndMetadata = iter.next messagesRead += 1 bytesRead += messageAndMetadata.message.length if (messagesRead % config.reportingInterval == 0) { - if(config.showDetailedStats) + if (config.showDetailedStats) printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis) lastReportTime = System.currentTimeMillis lastMessagesRead = messagesRead lastBytesRead = bytesRead } } - } - catch { + } catch { case _: InterruptedException => case _: ClosedByInterruptException => case _: ConsumerTimeoutException => - case e: Throwable => throw e + case e: Throwable => e.printStackTrace() } totalMessagesRead.addAndGet(messagesRead) totalBytesRead.addAndGet(bytesRead) - if(config.showDetailedStats) + if (config.showDetailedStats) printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis) - shutdownComplete } private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long, - startMs: Long, endMs: Long) = { + startMs: Long, endMs: Long) = { val elapsedMs = endMs - startMs - val totalMBRead = (bytesRead*1.0)/(1024*1024) - val mbRead = ((bytesRead - lastBytesRead)*1.0)/(1024*1024) + val totalMBRead = (bytesRead * 1.0) / (1024 * 1024) + val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024) println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id, config.consumerConfig.fetchMessageMaxBytes, totalMBRead, - 1000.0*(mbRead/elapsedMs), messagesRead, ((messagesRead - lastMessagesRead)/elapsedMs)*1000.0)) + 1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0)) } - - private def shutdownComplete() = shutdownLatch.countDown } } diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index ad2ac26..2490793 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -17,18 +17,19 @@ package kafka.perf -import java.util.concurrent.{CountDownLatch, Executors} +import java.util.concurrent.{ CountDownLatch, Executors } import java.util.concurrent.atomic.AtomicLong import kafka.producer._ import org.apache.log4j.Logger -import kafka.message.{CompressionCodec, Message} +import kafka.message.{ CompressionCodec, Message } import java.text.SimpleDateFormat import kafka.serializer._ import java.util._ import collection.immutable.List -import kafka.utils.{VerifiableProperties, Logging} +import kafka.utils.{ VerifiableProperties, Logging, Utils } import kafka.metrics.KafkaMetricsReporter - +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord /** * Load test for the producer @@ -39,7 +40,7 @@ object ProducerPerformance extends Logging { val logger = Logger.getLogger(getClass) val config = new ProducerPerfConfig(args) - if(!config.isFixSize) + if (!config.isFixedSize) logger.info("WARN: Throughput will be slower due to changing message size per request") val totalBytesSent = new AtomicLong(0) @@ -49,79 +50,80 @@ object ProducerPerformance extends Logging { val startMs = System.currentTimeMillis val rand = new java.util.Random - if(!config.hideHeader) - println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " + - "total.data.sent.in.nMsg, nMsg.sec") + if (!config.hideHeader) + println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " + + "total.data.sent.in.nMsg, nMsg.sec") - for(i <- 0 until config.numThreads) { + for (i <- 0 until config.numThreads) { executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand)) } allDone.await() val endMs = System.currentTimeMillis val elapsedSecs = (endMs - startMs) / 1000.0 - val totalMBSent = (totalBytesSent.get * 1.0)/ (1024 * 1024) + val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024) println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format( config.dateFormat.format(startMs), config.dateFormat.format(endMs), config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent, - totalMBSent/elapsedSecs, totalMessagesSent.get, totalMessagesSent.get/elapsedSecs)) + totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs)) System.exit(0) } class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) { val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.") - .withRequiredArg - .describedAs("hostname:port,..,hostname:port") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("hostname:port,..,hostname:port") + .ofType(classOf[String]) val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to") .withRequiredArg .describedAs("topic1,topic2..") .ofType(classOf[String]) val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3000) + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3000) val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3) + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3) val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + - "to complete") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(-1) + "to complete") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(-1) val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.") val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") - .withRequiredArg - .describedAs("number of threads") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) + .withRequiredArg + .describedAs("number of threads") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " + - "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + - "in the form of 'Message:000...1:xxx...'") - .withRequiredArg() - .describedAs("initial message id") - .ofType(classOf[java.lang.Integer]) + "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + + "in the form of 'Message:000...1:xxx...'") + .withRequiredArg() + .describedAs("initial message id") + .ofType(classOf[java.lang.Integer]) val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends") - .withRequiredArg() - .describedAs("message send time gap") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) + .withRequiredArg() + .describedAs("message send time gap") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") + "set, the csv metrics will be outputed here") .withRequiredArg .describedAs("metrics dictory") .ofType(classOf[java.lang.String]) + val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") - val options = parser.parse(args : _*) - for(arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) { - if(!options.has(arg)) { + val options = parser.parse(args: _*) + for (arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) { + if (!options.has(arg)) { System.err.println("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) System.exit(1) @@ -135,19 +137,20 @@ object ProducerPerformance extends Logging { val hideHeader = options.has(hideHeaderOpt) val brokerList = options.valueOf(brokerListOpt) val messageSize = options.valueOf(messageSizeOpt).intValue - var isFixSize = !options.has(varyMessageSizeOpt) + var isFixedSize = !options.has(varyMessageSizeOpt) var isSync = options.has(syncOpt) var batchSize = options.valueOf(batchSizeOpt).intValue var numThreads = options.valueOf(numThreadsOpt).intValue val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) val seqIdMode = options.has(initialMessageIdOpt) var initialMessageId: Int = 0 - if(seqIdMode) + if (seqIdMode) initialMessageId = options.valueOf(initialMessageIdOpt).intValue() val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue() val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue() val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue() val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue() + val useNewProducer = options.has(useNewProducerOpt) val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) @@ -167,44 +170,80 @@ object ProducerPerformance extends Logging { val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue() } - class ProducerThread(val threadId: Int, - val config: ProducerPerfConfig, - val totalBytesSent: AtomicLong, - val totalMessagesSent: AtomicLong, - val allDone: CountDownLatch, - val rand: Random) extends Runnable { + trait Producer { + def send(topic: String, partition: Long, bytes: Array[Byte]) + def close() + } + + class OldRustyProducer(config: ProducerPerfConfig) extends Producer { val props = new Properties() props.put("metadata.broker.list", config.brokerList) props.put("compression.codec", config.compressionCodec.codec.toString) - props.put("reconnect.interval", Integer.MAX_VALUE.toString) - props.put("send.buffer.bytes", (64*1024).toString) - if(!config.isSync) { - props.put("producer.type","async") + props.put("send.buffer.bytes", (64 * 1024).toString) + if (!config.isSync) { + props.put("producer.type", "async") props.put("batch.num.messages", config.batchSize.toString) props.put("queue.enqueue.timeout.ms", "-1") } - props.put("client.id", "ProducerPerformance") + props.put("client.id", "") props.put("request.required.acks", config.producerRequestRequiredAcks.toString) props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) props.put("message.send.max.retries", config.producerNumRetries.toString) props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) props.put("serializer.class", classOf[DefaultEncoder].getName.toString) props.put("key.serializer.class", classOf[NullEncoder[Long]].getName.toString) + val producer = new kafka.producer.Producer[Long, Array[Byte]](new ProducerConfig(props)) - - val producerConfig = new ProducerConfig(props) - val producer = new Producer[Long, Array[Byte]](producerConfig) - val seqIdNumDigit = 10 // no. of digits for max int value + def send(topic: String, partition: Long, bytes: Array[Byte]) { + this.producer.send(new KeyedMessage[Long, Array[Byte]](topic, partition, bytes)) + } + + def close() { + this.producer.close() + } + } + + class NewShinyProducer(config: ProducerPerfConfig) extends Producer { + val props = new Properties() + props.put("metadata.broker.list", config.brokerList) + props.put("send.buffer.bytes", (64 * 1024).toString) + props.put("client.id", "") + props.put("request.required.acks", config.producerRequestRequiredAcks.toString) + props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) + val producer = new KafkaProducer(props) + + def send(topic: String, partition: Long, bytes: Array[Byte]) { + val part = partition % this.producer.partitionsFor(topic).size + this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes)) + } + + def close() { + this.producer.close() + } + } + + class ProducerThread(val threadId: Int, + val config: ProducerPerfConfig, + val totalBytesSent: AtomicLong, + val totalMessagesSent: AtomicLong, + val allDone: CountDownLatch, + val rand: Random) extends Runnable { + val seqIdNumDigit = 10 // no. of digits for max int value val messagesPerThread = config.numMessages / config.numThreads debug("Messages per thread = " + messagesPerThread) + val producer = + if (config.useNewProducer) + new NewShinyProducer(config) + else + new OldRustyProducer(config) // generate the sequential message ID - private val SEP = ":" // message field separator + private val SEP = ":" // message field separator private val messageIdLabel = "MessageID" - private val threadIdLabel = "ThreadID" - private val topicLabel = "Topic" - private var leftPaddedSeqId : String = "" + private val threadIdLabel = "ThreadID" + private val topicLabel = "Topic" + private var leftPaddedSeqId: String = "" private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = { // Each thread gets a unique range of sequential no. for its ids. @@ -213,48 +252,43 @@ object ProducerPerformance extends Logging { // thread 1 IDs : 100 ~ 199 // thread 2 IDs : 200 ~ 299 // . . . - leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId)) + leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId)) - val msgHeader = topicLabel + SEP + - topic + SEP + - threadIdLabel + SEP + - threadId + SEP + - messageIdLabel + SEP + - leftPaddedSeqId + SEP + val msgHeader = topicLabel + SEP + + topic + SEP + + threadIdLabel + SEP + + threadId + SEP + + messageIdLabel + SEP + + leftPaddedSeqId + SEP - val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x') + val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x') debug(seqMsgString) return seqMsgString.getBytes() } - private def generateProducerData(topic: String, messageId: Long): (KeyedMessage[Long, Array[Byte]], Int) = { - val msgSize = if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize) - val message = - if(config.seqIdMode) { - val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId - generateMessageWithSeqId(topic, seqId, msgSize) - } else { - new Array[Byte](msgSize) - } - (new KeyedMessage[Long, Array[Byte]](topic, messageId, message), message.length) + private def generateProducerData(topic: String, messageId: Long): Array[Byte] = { + val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize) + if (config.seqIdMode) { + val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId + generateMessageWithSeqId(topic, seqId, msgSize) + } else { + new Array[Byte](msgSize) + } } override def run { var bytesSent = 0L var nSends = 0 var j: Long = 0L - while(j < messagesPerThread) { + while (j < messagesPerThread) { try { config.topics.foreach( topic => { - val (producerData, bytesSent_) = generateProducerData(topic, j) - bytesSent += bytesSent_ - producer.send(producerData) + producer.send(topic, j, generateProducerData(topic, j)) nSends += 1 - if(config.messageSendGapMs > 0) + if (config.messageSendGapMs > 0) Thread.sleep(config.messageSendGapMs) - } - ) + }) } catch { case e: Exception => error("Error sending messages", e) } diff --git a/project/Build.scala b/project/Build.scala index ddcfc41..12d84f8 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -140,7 +140,7 @@ object KafkaBuild extends Build { lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib, perf).settings((commonSettings ++ runRatTask ++ releaseTask ++ releaseZipTask ++ releaseTarTask): _*) - lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*) + lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*) dependsOn(clients) lazy val examples = Project(id = "java-examples", base = file("examples")).settings(commonSettings :_*) dependsOn (core) lazy val perf = Project(id = "perf", base = file("perf")).settings((Seq(name := "kafka-perf") ++ commonSettings):_*) dependsOn (core)