Index: core/src/main/scala/kafka/tools/ConsumerPerformance.scala =================================================================== --- core/src/main/scala/kafka/tools/ConsumerPerformance.scala (revision 1205259) +++ core/src/main/scala/kafka/tools/ConsumerPerformance.scala (working copy) @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.net.URI -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicLong -import java.nio.channels.ClosedByInterruptException -import joptsimple._ -import org.apache.log4j.Logger -import kafka.utils.Utils -import kafka.consumer.{ConsumerConfig, ConsumerConnector, Consumer} - -abstract class ShutdownableThread(name: String) extends Thread(name) { - def shutdown(): Unit -} - -/** - * Performance test for the full zookeeper consumer - */ -object ConsumerPerformance { - private val logger = Logger.getLogger(getClass()) - - def main(args: Array[String]): Unit = { - - val parser = new OptionParser - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val consumerPropsOpt = parser.accepts("props", "REQUIRED: Properties file with the consumer properties.") - .withRequiredArg - .describedAs("properties") - .ofType(classOf[String]) - val numThreadsOpt = parser.accepts("threads", "Number of processing threads.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(10) - val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100000) - val sleepSecsOpt = parser.accepts("sleep", "Initial interval to wait before connecting.") - .withRequiredArg - .describedAs("secs") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(5) - - val options = parser.parse(args : _*) - - for(arg <- List(topicOpt, consumerPropsOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - - val numThreads = options.valueOf(numThreadsOpt).intValue - val reportingInterval = options.valueOf(reportingIntervalOpt).intValue - val propsFile = options.valueOf(consumerPropsOpt) - val topic = options.valueOf(topicOpt) - val printInterval = options.valueOf(reportingIntervalOpt).intValue - val initialSleep = options.valueOf(sleepSecsOpt).intValue * 1000 - - println("Starting consumer...") - var totalNumMsgs = new AtomicLong(0) - var totalNumBytes = new AtomicLong(0) - - val consumerConfig = new ConsumerConfig(Utils.loadProps(propsFile)) - val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) - - val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> numThreads)) - var threadList = List[ShutdownableThread]() - for ((topic, streamList) <- topicMessageStreams) - for (i <- 0 until streamList.length) - threadList ::= new ShutdownableThread("kafka-zk-consumer-" + i) { - private val shutdownLatch = new CountDownLatch(1) - - def shutdown(): Unit = { - interrupt - shutdownLatch.await - } - - override def run() { - var totalBytesRead = 0L - var nMessages = 0L - val startMs = System.currentTimeMillis - - try { - for (message <- streamList(i)) { - nMessages += 1 - totalBytesRead += message.payloadSize - if (nMessages % printInterval == 0) { - val elapsedSecs = (System.currentTimeMillis - startMs) / 1000.0 - printMessage(totalBytesRead, nMessages, elapsedSecs) - } - } - } - catch { - case _: InterruptedException => - case _: ClosedByInterruptException => - case e => throw e - } - totalNumMsgs.addAndGet(nMessages) - totalNumBytes.addAndGet(totalBytesRead) - val elapsedSecs = (System.currentTimeMillis - startMs) / 1000.0 - printMessage(totalBytesRead, nMessages, elapsedSecs) - shutdownComplete - } - - private def printMessage(totalBytesRead: Long, nMessages: Long, elapsedSecs: Double) = { - logger.info("thread[" + i + "], nMsgs:" + nMessages + " bytes:" + totalBytesRead + - " nMsgs/sec:" + (nMessages / elapsedSecs).formatted("%.2f") + - " MB/sec:" + (totalBytesRead / elapsedSecs / (1024.0*1024.0)).formatted("%.2f")) - - } - private def shutdownComplete() = shutdownLatch.countDown - } - - logger.info("Sleeping for " + initialSleep / 1000 + " seconds.") - Thread.sleep(initialSleep) - logger.info("starting threads") - for (thread <- threadList) - thread.start - - // attach shutdown handler to catch control-c - Runtime.getRuntime().addShutdownHook(new Thread() { - override def run() = { - for (thread <- threadList) - thread.shutdown - - try { - consumerConnector.shutdown - } - catch { - case _ => - } - println("total nMsgs: " + totalNumMsgs) - println("totalBytesRead " + totalNumBytes) - } - }); - - } - -} Index: core/src/main/scala/kafka/tools/ReplayLogProducer.scala =================================================================== --- core/src/main/scala/kafka/tools/ReplayLogProducer.scala (revision 1205259) +++ core/src/main/scala/kafka/tools/ReplayLogProducer.scala (working copy) @@ -169,6 +169,7 @@ props.put("buffer.size", (64*1024).toString) props.put("compression.codec", config.compressionCodec.codec.toString) props.put("batch.size", config.batchSize.toString) + props.put("queue.enqueueTimeout.ms", "-1") if(config.isAsync) props.put("producer.type", "async") Index: core/src/main/scala/kafka/tools/ProducerPerformance.scala =================================================================== --- core/src/main/scala/kafka/tools/ProducerPerformance.scala (revision 1205259) +++ core/src/main/scala/kafka/tools/ProducerPerformance.scala (working copy) @@ -1,268 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import kafka.utils.Utils -import java.util.concurrent.{CountDownLatch, Executors} -import java.util.concurrent.atomic.AtomicLong -import kafka.producer._ -import async.DefaultEventHandler -import kafka.serializer.StringEncoder -import org.apache.log4j.Logger -import joptsimple.{OptionSet, OptionParser} -import java.util.{Random, Properties} -import kafka.message.{CompressionCodec, Message, ByteBufferMessageSet} - -/** - * Load test for the producer - */ -object ProducerPerformance { - - def main(args: Array[String]) { - - val logger = Logger.getLogger(getClass) - val config = new PerfConfig(args) - if(!config.isFixSize) - logger.info("WARN: Throughput will be slower due to changing message size per request") - - val totalBytesSent = new AtomicLong(0) - val totalMessagesSent = new AtomicLong(0) - val executor = Executors.newFixedThreadPool(config.numThreads) - val allDone = new CountDownLatch(config.numThreads) - val startMs = System.currentTimeMillis - val rand = new java.util.Random - - for(i <- 0 until config.numThreads) { - if(config.isAsync) - executor.execute(new AsyncProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand)) - else - executor.execute(new SyncProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand)) - } - - allDone.await() - val elapsedSecs = (System.currentTimeMillis - startMs) / 1000.0 - logger.info("Total Num Messages: " + totalMessagesSent.get + " bytes: " + totalBytesSent.get + " in " + elapsedSecs + " secs") - logger.info("Messages/sec: " + (1.0 * totalMessagesSent.get / elapsedSecs).formatted("%.4f")) - logger.info("MB/sec: " + (totalBytesSent.get / elapsedSecs / (1024.0*1024.0)).formatted("%.4f")) - System.exit(0) - } - - class PerfConfig(args: Array[String]) { - val parser = new OptionParser - val brokerInfoOpt = parser.accepts("brokerinfo", "REQUIRED: broker info (either from zookeeper or a list.") - .withRequiredArg - .describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port") - .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to send.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - val messageSizeOpt = parser.accepts("message-size", "The size of each message.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) - val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") - val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.") - val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(200) - val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(10) - val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(5000) - val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed") - .withRequiredArg - .describedAs("compression codec ") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) - - val options = parser.parse(args : _*) - for(arg <- List(brokerInfoOpt, topicOpt, numMessagesOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - val brokerInfo = options.valueOf(brokerInfoOpt) - val numMessages = options.valueOf(numMessagesOpt).intValue - val messageSize = options.valueOf(messageSizeOpt).intValue - val isFixSize = !options.has(varyMessageSizeOpt) - val isAsync = options.has(asyncOpt) - var batchSize = options.valueOf(batchSizeOpt).intValue - val numThreads = options.valueOf(numThreadsOpt).intValue - val topic = options.valueOf(topicOpt) - val reportingInterval = options.valueOf(reportingIntervalOpt).intValue - val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue) - } - - private def getStringOfLength(len: Int) : String = { - val strArray = new Array[Char](len) - for (i <- 0 until len) - strArray(i) = 'x' - return new String(strArray) - } - - class AsyncProducerThread(val threadId: Int, - val config: PerfConfig, - val totalBytesSent: AtomicLong, - val totalMessagesSent: AtomicLong, - val allDone: CountDownLatch, - val rand: Random) extends Runnable { - val logger = Logger.getLogger(getClass) - val brokerInfoList = config.brokerInfo.split("=") - val props = new Properties() - if (brokerInfoList(0) == "zk.connect") - props.put("zk.connect", brokerInfoList(1)) - else - props.put("broker.list", brokerInfoList(1)) - - props.put("compression.codec", config.compressionCodec.codec.toString) - props.put("producer.type","async") - props.put("batch.size", config.batchSize.toString) - props.put("reconnect.interval", Integer.MAX_VALUE.toString) - props.put("buffer.size", (64*1024).toString) - props.put("queue.enqueueTimeout.ms", "-1") - logger.info("Producer properties = " + props.toString) - - val producerConfig = new ProducerConfig(props) - val producer = new Producer[String, String](producerConfig, new StringEncoder, - new DefaultEventHandler[String](producerConfig, null), null, new DefaultPartitioner[String]) - - override def run { - var bytesSent = 0L - var lastBytesSent = 0L - var nSends = 0 - var lastNSends = 0 - var message = getStringOfLength(config.messageSize) - var reportTime = System.currentTimeMillis() - var lastReportTime = reportTime - val messagesPerThread = config.numMessages / config.numThreads - logger.info("Messages per thread = " + messagesPerThread) - for(j <- 0 until messagesPerThread) { - var strLength = config.messageSize - if (!config.isFixSize) { - strLength = rand.nextInt(config.messageSize) - message = getStringOfLength(strLength) - bytesSent += strLength - }else - bytesSent += config.messageSize - try { - producer.send(new ProducerData[String,String](config.topic, message)) - nSends += 1 - }catch { - case e: Exception => e.printStackTrace - } - if(nSends % config.reportingInterval == 0) { - reportTime = System.currentTimeMillis() - logger.info("thread " + threadId + ": " + nSends + " messages sent " - + (1000.0 * (nSends - lastNSends) / (reportTime - lastReportTime)).formatted("%.4f") + " nMsg/sec " - + (1000.0 * (bytesSent - lastBytesSent) / (reportTime - lastReportTime) / (1024 * 1024)).formatted("%.4f") + " MBs/sec") - lastReportTime = reportTime - lastBytesSent = bytesSent - lastNSends = nSends - } - } - producer.close() - totalBytesSent.addAndGet(bytesSent) - totalMessagesSent.addAndGet(nSends) - allDone.countDown() - } - } - - class SyncProducerThread(val threadId: Int, - val config: PerfConfig, - val totalBytesSent: AtomicLong, - val totalMessagesSent: AtomicLong, - val allDone: CountDownLatch, - val rand: Random) extends Runnable { - val logger = Logger.getLogger(getClass) - val props = new Properties() - val brokerInfoList = config.brokerInfo.split("=") - if (brokerInfoList(0) == "zk.connect") - props.put("zk.connect", brokerInfoList(1)) - else - props.put("broker.list", brokerInfoList(1)) - props.put("compression.codec", config.compressionCodec.codec.toString) - props.put("reconnect.interval", Integer.MAX_VALUE.toString) - props.put("buffer.size", (64*1024).toString) - - val producerConfig = new ProducerConfig(props) - val producer = new Producer[String, String](producerConfig, new StringEncoder, - new DefaultEventHandler[String](producerConfig, null), null, new DefaultPartitioner[String]) - - override def run { - var bytesSent = 0L - var lastBytesSent = 0L - var nSends = 0 - var lastNSends = 0 - val message = getStringOfLength(config.messageSize) - var reportTime = System.currentTimeMillis() - var lastReportTime = reportTime - val messagesPerThread = config.numMessages / config.numThreads / config.batchSize - logger.info("Messages per thread = " + messagesPerThread) - var messageSet: List[String] = Nil - for(k <- 0 until config.batchSize) { - messageSet ::= message - } - for(j <- 0 until messagesPerThread) { - var strLength = config.messageSize - if (!config.isFixSize) { - for(k <- 0 until config.batchSize) { - strLength = rand.nextInt(config.messageSize) - messageSet ::= getStringOfLength(strLength) - bytesSent += strLength - } - }else - bytesSent += config.batchSize*config.messageSize - try { - producer.send(new ProducerData[String,String](config.topic, messageSet)) - nSends += 1 - }catch { - case e: Exception => e.printStackTrace - } - if(nSends % config.reportingInterval == 0) { - reportTime = System.currentTimeMillis() - logger.info("thread " + threadId + ": " + nSends + " messages sent " - + (1000.0 * (nSends - lastNSends) * config.batchSize / (reportTime - lastReportTime)).formatted("%.4f") + " nMsg/sec " - + (1000.0 * (bytesSent - lastBytesSent) / (reportTime - lastReportTime) / (1024 * 1024)).formatted("%.4f") + " MBs/sec") - lastReportTime = reportTime - lastBytesSent = bytesSent - lastNSends = nSends - } - } - producer.close() - totalBytesSent.addAndGet(bytesSent) - totalMessagesSent.addAndGet(nSends*config.batchSize) - allDone.countDown() - } - } -} Index: core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala =================================================================== --- core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala (revision 1205259) +++ core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala (working copy) @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.net.URI -import joptsimple._ -import kafka.api.FetchRequest -import kafka.utils._ -import kafka.server._ -import kafka.consumer.SimpleConsumer - -/** - * Performance test for the simple consumer - */ -object SimpleConsumerPerformance { - - def main(args: Array[String]) { - - val parser = new OptionParser - val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") - .withRequiredArg - .describedAs("kafka://hostname:port") - .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.") - .withRequiredArg - .describedAs("bytes") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024*1024) - - val options = parser.parse(args : _*) - - for(arg <- List(urlOpt, topicOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - - val url = new URI(options.valueOf(urlOpt)) - val topic = options.valueOf(topicOpt) - val fetchSize = options.valueOf(fetchSizeOpt).intValue - - val consumer = new SimpleConsumer(url.getHost, url.getPort, 30*1000, 2*fetchSize) - val startMs = System.currentTimeMillis - var done = false - var totalRead = 0 - val reportingInterval = 100000 - var consumedInInterval = 0 - var offset: Long = 0L - while(!done) { - val messages = consumer.fetch(new FetchRequest(topic, 0, offset, fetchSize)) - var messagesRead = 0 - for(message <- messages) - messagesRead += 1 - - if(messagesRead == 0) - done = true - else - offset += messages.validBytes - - totalRead += messagesRead - consumedInInterval += messagesRead - - if(consumedInInterval > reportingInterval) { - println("Bytes read: " + totalRead) - consumedInInterval = 0 - } - } - val ellapsedSeconds = (System.currentTimeMillis - startMs) / 1000.0 - println(totalRead + " messages read, " + offset + " bytes") - println("Messages/sec: " + totalRead / ellapsedSeconds) - println("MB/sec: " + offset / ellapsedSeconds / (1024.0*1024.0)) - System.exit(0) - } - -} Index: core/src/main/scala/kafka/utils/Utils.scala =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (revision 1205259) +++ core/src/main/scala/kafka/utils/Utils.scala (working copy) @@ -29,6 +29,7 @@ import scala.collection._ import scala.collection.mutable import kafka.message.{NoCompressionCodec, CompressionCodec} +import org.I0Itec.zkclient.ZkClient /** * Helper functions! @@ -593,6 +594,18 @@ else CompressionCodec.getCompressionCodec(codecValueString.toInt) } + + def tryCleanupZookeeper(zkUrl: String, groupId: String) { + try { + val dir = "/consumers/" + groupId + logger.info("Cleaning up temporary zookeeper data under " + dir + ".") + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) + zk.deleteRecursive(dir) + zk.close() + } catch { + case _ => // swallow + } + } } class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) { Index: perf/config/log4j.properties =================================================================== --- perf/config/log4j.properties (revision 0) +++ perf/config/log4j.properties (revision 0) @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, fileAppender + +log4j.appender.fileAppender=org.apache.log4j.FileAppender +log4j.appender.fileAppender.File=perf.log +log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.fileAppender.layout.ConversionPattern=%m %n + +# Turn on all our debugging info +log4j.logger.kafka=INFO + Index: perf/src/main/scala/kafka/perf/ConsumerPerformance.scala =================================================================== --- perf/src/main/scala/kafka/perf/ConsumerPerformance.scala (revision 0) +++ perf/src/main/scala/kafka/perf/ConsumerPerformance.scala (revision 0) @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.perf + +import java.net.URI +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicLong +import java.nio.channels.ClosedByInterruptException +import joptsimple._ +import org.apache.log4j.Logger +import kafka.message.Message +import org.I0Itec.zkclient.ZkClient +import kafka.utils.{ZKStringSerializer, Utils} +import java.util.{Random, Properties} +import kafka.consumer._ +import java.text.SimpleDateFormat + +/** + * Performance test for the full zookeeper consumer + */ +object ConsumerPerformance { + private val logger = Logger.getLogger(getClass()) + + def main(args: Array[String]): Unit = { + + val config = new ConsumerPerfConfig(args) + logger.info("Starting consumer...") + var totalMessagesRead = new AtomicLong(0) + var totalBytesRead = new AtomicLong(0) + + if(!config.hideHeader) { + if(!config.showDetailedStats) + println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + else + println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + } + + // clean up zookeeper state for this group id for every perf run + Utils.tryCleanupZookeeper(config.consumerConfig.zkConnect, config.consumerConfig.groupId) + + val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig) + + val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.topic -> config.numThreads)) + var threadList = List[ConsumerPerfThread]() + for ((topic, streamList) <- topicMessageStreams) + for (i <- 0 until streamList.length) + threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, + totalMessagesRead, totalBytesRead) + + logger.info("Sleeping for 1000 seconds.") + Thread.sleep(1000) + logger.info("starting threads") + val startMs = System.currentTimeMillis + for (thread <- threadList) + thread.start + + for (thread <- threadList) + thread.shutdown + + val endMs = System.currentTimeMillis + val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0 + if(!config.showDetailedStats) { + val totalMBRead = (totalBytesRead.get*1.0)/(1024*1024) + println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs), + config.consumerConfig.fetchSize, totalMBRead, totalMBRead/elapsedSecs, totalMessagesRead.get, + totalMessagesRead.get/elapsedSecs)) + } + System.exit(0) + } + + class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val groupIdOpt = parser.accepts("group", "The group id to consume on.") + .withRequiredArg + .describedAs("gid") + .defaultsTo("perf-consumer-" + new Random().nextInt(100000)) + .ofType(classOf[String]) + val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024 * 1024) + val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + + "offset to consume from, start with the latest message present in the log rather than the earliest message.") + val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(2 * 1024 * 1024) + val numThreadsOpt = parser.accepts("threads", "Number of processing threads.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(10) + + val options = parser.parse(args : _*) + + for(arg <- List(topicOpt, zkConnectOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val props = new Properties + props.put("groupid", options.valueOf(groupIdOpt)) + props.put("socket.buffer.size", options.valueOf(socketBufferSizeOpt).toString) + props.put("fetch.size", options.valueOf(fetchSizeOpt).toString) + props.put("autooffset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest") + props.put("zk.connect", options.valueOf(zkConnectOpt)) + props.put("consumer.timeout.ms", "5000") + val consumerConfig = new ConsumerConfig(props) + val numThreads = options.valueOf(numThreadsOpt).intValue + val topic = options.valueOf(topicOpt) + val numMessages = options.valueOf(numMessagesOpt).longValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val showDetailedStats = options.has(showDetailedStatsOpt) + val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) + val hideHeader = options.has(hideHeaderOpt) + } + + class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaMessageStream[Message], + config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) + extends Thread(name) { + private val shutdownLatch = new CountDownLatch(1) + + def shutdown(): Unit = { + shutdownLatch.await + } + + override def run() { + var bytesRead = 0L + var messagesRead = 0L + val startMs = System.currentTimeMillis + var lastReportTime: Long = startMs + var lastBytesRead = 0L + var lastMessagesRead = 0L + + try { + for (message <- stream if messagesRead < config.numMessages) { + messagesRead += 1 + bytesRead += message.payloadSize + + if (messagesRead % config.reportingInterval == 0) { + if(config.showDetailedStats) + printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis) + lastReportTime = System.currentTimeMillis + lastMessagesRead = messagesRead + lastBytesRead = bytesRead + } + } + } + catch { + case _: InterruptedException => + case _: ClosedByInterruptException => + case _: ConsumerTimeoutException => + case e => throw e + } + totalMessagesRead.addAndGet(messagesRead) + totalBytesRead.addAndGet(bytesRead) + if(config.showDetailedStats) + printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis) + shutdownComplete + } + + private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long, + startMs: Long, endMs: Long) = { + val elapsedMs = endMs - startMs + val totalMBRead = (bytesRead*1.0)/(1024*1024) + val mbRead = ((bytesRead - lastBytesRead)*1.0)/(1024*1024) + println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id, + config.consumerConfig.fetchSize, totalMBRead, + 1000.0*(mbRead/elapsedMs), messagesRead, ((messagesRead - lastMessagesRead)/elapsedMs)*1000.0)) + } + + private def shutdownComplete() = shutdownLatch.countDown + } + +} Index: perf/src/main/scala/kafka/perf/PerfConfig.scala =================================================================== --- perf/src/main/scala/kafka/perf/PerfConfig.scala (revision 0) +++ perf/src/main/scala/kafka/perf/PerfConfig.scala (revision 0) @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.perf + +import joptsimple.OptionParser +import java.text.SimpleDateFormat + +class PerfConfig(args: Array[String]) { + val parser = new OptionParser + val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Long]) + .defaultsTo(Long.MaxValue) + val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(5000) + val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + + "See java.text.SimpleDateFormat for options.") + .withRequiredArg + .describedAs("date format") + .ofType(classOf[String]) + .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS") + val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + + "interval as configured by reporting-interval") + val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ") +} Index: perf/src/main/scala/kafka/perf/ProducerPerformance.scala =================================================================== --- perf/src/main/scala/kafka/perf/ProducerPerformance.scala (revision 0) +++ perf/src/main/scala/kafka/perf/ProducerPerformance.scala (revision 0) @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.perf + +import java.util.concurrent.{CountDownLatch, Executors} +import java.util.concurrent.atomic.AtomicLong +import kafka.producer._ +import async.DefaultEventHandler +import org.apache.log4j.Logger +import joptsimple.OptionParser +import kafka.message.{CompressionCodec, Message} +import kafka.serializer.DefaultEncoder +import java.text.SimpleDateFormat +import java.util.{Date, Random, Properties} + +/** + * Load test for the producer + */ +object ProducerPerformance { + + def main(args: Array[String]) { + + val logger = Logger.getLogger(getClass) + val config = new ProducerPerfConfig(args) + if(!config.isFixSize) + logger.info("WARN: Throughput will be slower due to changing message size per request") + + val totalBytesSent = new AtomicLong(0) + val totalMessagesSent = new AtomicLong(0) + val executor = Executors.newFixedThreadPool(config.numThreads) + val allDone = new CountDownLatch(config.numThreads) + val startMs = System.currentTimeMillis + val rand = new java.util.Random + + if(!config.hideHeader) { + if(!config.showDetailedStats) + println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " + + "total.data.sent.in.nMsg, nMsg.sec") + else + println("time, compression, thread.id, message.size, batch.size, total.data.sent.in.MB, MB.sec, " + + "total.data.sent.in.nMsg, nMsg.sec") + } + + for(i <- 0 until config.numThreads) { + executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand)) + } + + allDone.await() + val endMs = System.currentTimeMillis + val elapsedSecs = (endMs - startMs) / 1000.0 + if(!config.showDetailedStats) { + val totalMBSent = (totalBytesSent.get * 1.0)/ (1024 * 1024) + println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), + config.dateFormat.format(endMs), config.compressionCodec.codec, config.messageSize, config.batchSize, + totalMBSent, totalMBSent/elapsedSecs, totalMessagesSent.get, totalMessagesSent.get/elapsedSecs)) + } + System.exit(0) + } + + class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) { + val brokerInfoOpt = parser.accepts("brokerinfo", "REQUIRED: broker info (either from zookeeper or a list.") + .withRequiredArg + .describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port") + .ofType(classOf[String]) + val messageSizeOpt = parser.accepts("message-size", "The size of each message.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") + val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.") + val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(200) + val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(10) + val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed") + .withRequiredArg + .describedAs("compression codec ") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + + val options = parser.parse(args : _*) + for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + val topic = options.valueOf(topicOpt) + val numMessages = options.valueOf(numMessagesOpt).longValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val showDetailedStats = options.has(showDetailedStatsOpt) + val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) + val hideHeader = options.has(hideHeaderOpt) + val brokerInfo = options.valueOf(brokerInfoOpt) + val messageSize = options.valueOf(messageSizeOpt).intValue + val isFixSize = !options.has(varyMessageSizeOpt) + val isAsync = options.has(asyncOpt) + var batchSize = options.valueOf(batchSizeOpt).intValue + val numThreads = options.valueOf(numThreadsOpt).intValue + val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue) + } + + private def getStringOfLength(len: Int) : String = { + val strArray = new Array[Char](len) + for (i <- 0 until len) + strArray(i) = 'x' + return new String(strArray) + } + + class ProducerThread(val threadId: Int, + val config: ProducerPerfConfig, + val totalBytesSent: AtomicLong, + val totalMessagesSent: AtomicLong, + val allDone: CountDownLatch, + val rand: Random) extends Runnable { + val logger = Logger.getLogger(getClass) + val props = new Properties() + val brokerInfoList = config.brokerInfo.split("=") + if (brokerInfoList(0) == "zk.connect") { + props.put("zk.connect", brokerInfoList(1)) + props.put("zk.sessiontimeout.ms", "300000") + } + else + props.put("broker.list", brokerInfoList(1)) + props.put("compression.codec", config.compressionCodec.codec.toString) + props.put("reconnect.interval", Integer.MAX_VALUE.toString) + props.put("buffer.size", (64*1024).toString) + if(config.isAsync) { + props.put("producer.type","async") + props.put("batch.size", config.batchSize.toString) + props.put("queue.enqueueTimeout.ms", "-1") + } + val producerConfig = new ProducerConfig(props) + val producer = new Producer[Message, Message](producerConfig) + + override def run { + var bytesSent = 0L + var lastBytesSent = 0L + var nSends = 0 + var lastNSends = 0 + val message = new Message(new Array[Byte](config.messageSize)) + var reportTime = System.currentTimeMillis() + var lastReportTime = reportTime + val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads / config.batchSize + else config.numMessages / config.numThreads + if(logger.isDebugEnabled) logger.debug("Messages per thread = " + messagesPerThread) + var messageSet: List[Message] = Nil + for(k <- 0 until config.batchSize) { + messageSet ::= message + } + + var j: Long = 0L + while(j < messagesPerThread) { + var strLength = config.messageSize + if (!config.isFixSize) { + for(k <- 0 until config.batchSize) { + strLength = rand.nextInt(config.messageSize) + val message = new Message(getStringOfLength(strLength).getBytes) + messageSet ::= message + bytesSent += message.payloadSize + } + }else if(!config.isAsync) { + bytesSent += config.batchSize*message.payloadSize + } + try { + if(!config.isAsync) { + producer.send(new ProducerData[Message,Message](config.topic, null, messageSet)) + nSends += config.batchSize + }else { + if(!config.isFixSize) { + strLength = rand.nextInt(config.messageSize) + val message = new Message(getStringOfLength(strLength).getBytes) + producer.send(new ProducerData[Message,Message](config.topic, message)) + bytesSent += message.payloadSize + }else { + producer.send(new ProducerData[Message,Message](config.topic, message)) + bytesSent += message.payloadSize + } + nSends += 1 + } + }catch { + case e: Exception => e.printStackTrace + } + if(nSends % config.reportingInterval == 0) { + reportTime = System.currentTimeMillis() + val elapsed = (reportTime - lastReportTime)/ 1000.0 + val mbBytesSent = ((bytesSent - lastBytesSent) * 1.0)/(1024 * 1024) + val numMessagesPerSec = (nSends - lastNSends) / elapsed + val mbPerSec = mbBytesSent / elapsed + val formattedReportTime = config.dateFormat.format(reportTime) + if(config.showDetailedStats) + println(("%s, %d, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(formattedReportTime, config.compressionCodec.codec, + threadId, config.messageSize, config.batchSize, (bytesSent*1.0)/(1024 * 1024), mbPerSec, nSends, numMessagesPerSec)) + lastReportTime = reportTime + lastBytesSent = bytesSent + lastNSends = nSends + } + j += 1 + } + producer.close() + totalBytesSent.addAndGet(bytesSent) + totalMessagesSent.addAndGet(nSends) + allDone.countDown() + } + } +} Index: perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala =================================================================== --- perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala (revision 0) +++ perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala (revision 0) @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.perf + +import java.net.URI +import joptsimple._ +import kafka.utils._ +import kafka.server._ +import kafka.consumer.SimpleConsumer +import org.apache.log4j.Logger +import kafka.api.{OffsetRequest, FetchRequest} +import java.text.SimpleDateFormat + +/** + * Performance test for the simple consumer + */ +object SimpleConsumerPerformance { + + def main(args: Array[String]) { + val logger = Logger.getLogger(getClass) + val config = new ConsumerPerfConfig(args) + + if(!config.hideHeader) { + if(!config.showDetailedStats) + println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + else + println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + } + + val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize) + + // reset to latest or smallest offset + var offset: Long = if(config.fromLatest) consumer.getOffsetsBefore(config.topic, config.partition, OffsetRequest.LatestTime, 1).head + else consumer.getOffsetsBefore(config.topic, config.partition, OffsetRequest.EarliestTime, 1).head + + val startMs = System.currentTimeMillis + var done = false + var totalBytesRead = 0L + var totalMessagesRead = 0L + var consumedInterval = 0 + var lastReportTime: Long = startMs + var lastBytesRead = 0L + var lastMessagesRead = 0L + while(!done) { + val messages = consumer.fetch(new FetchRequest(config.topic, config.partition, offset, config.fetchSize)) + var messagesRead = 0 + var bytesRead = 0 + + for(message <- messages) { + messagesRead += 1 + bytesRead += message.message.payloadSize + } + + if(messagesRead == 0 || totalMessagesRead > config.numMessages) + done = true + else + offset += messages.validBytes + + totalBytesRead += bytesRead + totalMessagesRead += messagesRead + consumedInterval += messagesRead + + if(consumedInterval > config.reportingInterval) { + if(config.showDetailedStats) { + val reportTime = System.currentTimeMillis + val elapsed = (reportTime - lastReportTime)/1000.0 + val totalMBRead = ((totalBytesRead-lastBytesRead)*1.0)/(1024*1024) + println(("%s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(reportTime), config.fetchSize, + (totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed, + totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed)) + } + lastReportTime = SystemTime.milliseconds + lastBytesRead = totalBytesRead + lastMessagesRead = totalMessagesRead + consumedInterval = 0 + } + } + val reportTime = System.currentTimeMillis + val elapsed = (reportTime - startMs) / 1000.0 + + if(!config.showDetailedStats) { + val totalMBRead = (totalBytesRead*1.0)/(1024*1024) + println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), + config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed, + totalMessagesRead, totalMessagesRead/elapsed)) + } + System.exit(0) + } + + class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { + val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") + .withRequiredArg + .describedAs("kafka://hostname:port") + .ofType(classOf[String]) + val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + + "offset to consume from, start with the latest message present in the log rather than the earliest message.") + val partitionOpt = parser.accepts("partition", "The topic partition to consume from.") + .withRequiredArg + .describedAs("partition") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.") + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024*1024) + + val options = parser.parse(args : _*) + + for(arg <- List(topicOpt, urlOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + val url = new URI(options.valueOf(urlOpt)) + val fetchSize = options.valueOf(fetchSizeOpt).intValue + val fromLatest = options.has(resetBeginningOffsetOpt) + val partition = options.valueOf(partitionOpt).intValue + val topic = options.valueOf(topicOpt) + val numMessages = options.valueOf(numMessagesOpt).longValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val showDetailedStats = options.has(showDetailedStatsOpt) + val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) + val hideHeader = options.has(hideHeaderOpt) + } +} Index: project/build/KafkaProject.scala =================================================================== --- project/build/KafkaProject.scala (revision 1205259) +++ project/build/KafkaProject.scala (working copy) @@ -23,6 +23,7 @@ lazy val core = project("core", "core-kafka", new CoreKafkaProject(_)) lazy val examples = project("examples", "java-examples", new KafkaExamplesProject(_), core) lazy val contrib = project("contrib", "contrib", new ContribProject(_)) + lazy val perf = project("perf", "perf", new KafkaPerfProject(_)) lazy val releaseZipTask = core.packageDistTask @@ -81,9 +82,6 @@ ZkClientDepAdder(pom) } - override def repositories = Set(ScalaToolsSnapshots, "JBoss Maven 2 Repository" at "http://repository.jboss.com/maven2", - "Oracle Maven 2 Repository" at "http://download.oracle.com/maven", "maven.org" at "http://repo2.maven.org/maven2/") - override def artifactID = "kafka" override def filterScalaJars = false @@ -146,6 +144,29 @@ } + class KafkaPerfProject(info: ProjectInfo) extends DefaultProject(info) + with IdeaProject + with CoreDependencies { + val perfPackageAction = packageAllAction + val dependsOnCore = core + + //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required + // some dependencies on various sun and javax packages. + override def ivyXML = + + + + + + + + + override def artifactID = "kafka-perf" + override def filterScalaJars = false + override def javaCompileOptions = super.javaCompileOptions ++ + List(JavaCompileOption("-Xlint:unchecked")) + } + class KafkaExamplesProject(info: ProjectInfo) extends DefaultProject(info) with IdeaProject with CoreDependencies {