Index: perf/src/main/scala/kafka/perf/ProducerPerformance.scala =================================================================== --- perf/src/main/scala/kafka/perf/ProducerPerformance.scala (revision 1241760) +++ perf/src/main/scala/kafka/perf/ProducerPerformance.scala (working copy) @@ -26,6 +26,10 @@ import java.util.{Random, Properties} import kafka.utils.Logging +import java.nio.ByteBuffer +import java.util.ArrayList +import java.util.Collections + /** * Load test for the producer */ @@ -45,6 +49,9 @@ val startMs = System.currentTimeMillis val rand = new java.util.Random + @volatile var urand = new UniqueRandom(config.startIndex, config.numMessages + config.startIndex - 1) + urand.newList() + if(!config.hideHeader) { if(!config.showDetailedStats) println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " + @@ -55,7 +62,7 @@ } for(i <- 0 until config.numThreads) { - executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand)) + executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand, urand)) } allDone.await() @@ -97,6 +104,12 @@ .describedAs("compression codec ") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) + val startIndexOption = parser.accepts("start-index", "starting index for the range of random numbers to generate" + + " for message, the ending index is no. of messages + starting index - 1") + .withRequiredArg() + .describedAs("index") + .ofType(classOf[java.lang.Long]) + .defaultsTo(1) val options = parser.parse(args : _*) for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) { @@ -119,6 +132,7 @@ var batchSize = options.valueOf(batchSizeOpt).intValue val numThreads = options.valueOf(numThreadsOpt).intValue val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue) + val startIndex = options.valueOf(startIndexOption).longValue() } private def getStringOfLength(len: Int) : String = { @@ -128,28 +142,65 @@ return new String(strArray) } + private def getUniqRandByteArrayOfLength(len: Int, urand: UniqueRandom): Array[Byte] = { + var nextVal: Long = 0 + urand.synchronized { + nextVal = urand.next() + } + ByteBuffer.allocate(len + 10).putLong(nextVal).array() // add 10 to len to prevent overflow + } + private def getByteArrayOfLength(len: Int): Array[Byte] = { - //new Array[Byte](len) new Array[Byte]( if (len == 0) 5 else len ) } + class UniqueRandom(start: Long, end: Long) { + val list = new ArrayList[Long]() + var index = 0 + + def newList() { + for (i <- start until end + 1) { + list.add(i) + } + Collections.shuffle(list) + } + + def hasNext(): Boolean = { + if ( index < list.size() ) + return true + else + return false + } + + def next(): Long = { + val token = list.get(index) + index += 1 + return token + } + } + class ProducerThread(val threadId: Int, val config: ProducerPerfConfig, val totalBytesSent: AtomicLong, val totalMessagesSent: AtomicLong, val allDone: CountDownLatch, - val rand: Random) extends Runnable { + val rand: Random, + val urand: UniqueRandom) extends Runnable { + val props = new Properties() val brokerInfoList = config.brokerInfo.split("=") + if (brokerInfoList(0) == "zk.connect") { props.put("zk.connect", brokerInfoList(1)) props.put("zk.sessiontimeout.ms", "300000") } else props.put("broker.list", brokerInfoList(1)) + props.put("compression.codec", config.compressionCodec.codec.toString) props.put("reconnect.interval", Integer.MAX_VALUE.toString) props.put("buffer.size", (64*1024).toString) + if(config.isAsync) { props.put("producer.type","async") props.put("batch.size", config.batchSize.toString) @@ -169,40 +220,53 @@ val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads / config.batchSize else config.numMessages / config.numThreads debug("Messages per thread = " + messagesPerThread) - var messageSet: List[Message] = Nil - if(config.isFixSize) { - for(k <- 0 until config.batchSize) { - messageSet ::= message - } - } + var j: Long = 0L while(j < messagesPerThread) { var strLength = config.messageSize - if (!config.isFixSize) { - for(k <- 0 until config.batchSize) { - strLength = rand.nextInt(config.messageSize) - val message = new Message(getByteArrayOfLength(strLength)) - messageSet ::= message - bytesSent += message.payloadSize + var messageSet: List[Message] = Nil + + // if NOT Async, generate messages in batch and put them into messageSet + if (!config.isAsync) { + // Fix Size => initialize messageSet with unique random numbers + // of size determined by config.messageSize + if(config.isFixSize) { + for(k <- 0 until config.batchSize) { + val msg = new Message(getUniqRandByteArrayOfLength(config.messageSize, urand)) + messageSet ::= msg + } } - }else if(!config.isAsync) { - bytesSent += config.batchSize*message.payloadSize + + // NOT Fix Size => initialize messageSet with unique random numbers + // of size determined by rand.nextInt(config.messageSize) + if (!config.isFixSize) { + for(k <- 0 until config.batchSize) { + strLength = rand.nextInt(config.messageSize) + val message = new Message(getUniqRandByteArrayOfLength(strLength, urand)) + messageSet ::= message + bytesSent += message.payloadSize + } + }else if(!config.isAsync) { + bytesSent += config.batchSize*message.payloadSize + } } + try { - if(!config.isAsync) { + if(!config.isAsync) { // NOT Async => send messageSet in each loop producer.send(new ProducerData[Message,Message](config.topic, null, messageSet)) if(!config.isFixSize) messageSet = Nil nSends += config.batchSize - }else { - if(!config.isFixSize) { + } + else { // Async => send 1 message in each loop + if(!config.isFixSize) { // Async & NOT Fix size strLength = rand.nextInt(config.messageSize) - val messageBytes = getByteArrayOfLength(strLength) - rand.nextBytes(messageBytes) - val message = new Message(messageBytes) + val message = new Message(getUniqRandByteArrayOfLength(strLength, urand)) producer.send(new ProducerData[Message,Message](config.topic, message)) debug(config.topic + "-checksum:" + message.checksum) bytesSent += message.payloadSize - }else { + } + else { // Async & Fix size + val message = new Message(getUniqRandByteArrayOfLength(config.messageSize, urand)) producer.send(new ProducerData[Message,Message](config.topic, message)) debug(config.topic + "-checksum:" + message.checksum) bytesSent += message.payloadSize @@ -212,6 +276,7 @@ }catch { case e: Exception => e.printStackTrace } + if(nSends % config.reportingInterval == 0) { reportTime = System.currentTimeMillis() val elapsed = (reportTime - lastReportTime)/ 1000.0