Index: perf/src/main/scala/kafka/perf/ProducerPerformance.scala
===================================================================
--- perf/src/main/scala/kafka/perf/ProducerPerformance.scala	(revision 1399939)
+++ perf/src/main/scala/kafka/perf/ProducerPerformance.scala	(working copy)
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -23,8 +23,11 @@
 import org.apache.log4j.Logger
 import kafka.message.{CompressionCodec, Message}
 import java.text.SimpleDateFormat
-import java.util.{Random, Properties}
-import kafka.utils.Logging
+import java.nio.ByteBuffer
+import kafka.metrics.{KafkaMetricsReporterMBean, KafkaMetricsReporter, KafkaMetricsConfig}
+import java.util._
+import kafka.utils.{VerifiableProperties, Utils, Logging}
+import collection.immutable.List
 
 /**
  * Load test for the producer
@@ -44,18 +47,20 @@
     val allDone = new CountDownLatch(config.numThreads)
     val startMs = System.currentTimeMillis
     val rand = new java.util.Random
+    val urand = new UniqueRandom(config.startIndex, config.numMessages + config.startIndex - 1)
+    urand.newList()
 
     if(!config.hideHeader) {
       if(!config.showDetailedStats)
         println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +
-          "total.data.sent.in.nMsg, nMsg.sec")
+                        "total.data.sent.in.nMsg, nMsg.sec")
       else
         println("time, compression, thread.id, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +
-          "total.data.sent.in.nMsg, nMsg.sec")
+                        "total.data.sent.in.nMsg, nMsg.sec")
     }
 
     for(i <- 0 until config.numThreads) {
-      executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))
+      executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand, urand))
     }
 
     allDone.await()
@@ -64,65 +69,80 @@
     if(!config.showDetailedStats) {
       val totalMBSent = (totalBytesSent.get * 1.0)/ (1024 * 1024)
       println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs),
-        config.dateFormat.format(endMs), config.compressionCodec.codec, config.messageSize, config.batchSize,
-        totalMBSent, totalMBSent/elapsedSecs, totalMessagesSent.get, totalMessagesSent.get/elapsedSecs))
+                                                                  config.dateFormat.format(endMs), config.compressionCodec.codec, config.messageSize, config.batchSize,
+                                                                  totalMBSent, totalMBSent/elapsedSecs, totalMessagesSent.get, totalMessagesSent.get/elapsedSecs))
     }
     System.exit(0)
   }
 
   class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: the broker list must be specified.")
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.")
+            .withRequiredArg
+            .describedAs("hostname:port,..,hostname:port")
+            .ofType(classOf[String])
+    val topicsOpt = parser.accepts("topics", "REQUIRED: The set of topics to produce to / consume from.")
       .withRequiredArg
-      .describedAs("hostname:port")
+      .describedAs("topic1,topic2..")
       .ofType(classOf[String])
     val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(3000)
+            .withRequiredArg()
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(3000)
     val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
-      "to complete")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(-1)
+            "to complete")
+            .withRequiredArg()
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(-1)
     val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(100)
+            .withRequiredArg
+            .describedAs("size")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(100)
+    val startIndexOpt = parser.accepts("start-index", "The start index of message id")
+            .withRequiredArg
+            .describedAs("start index")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(0)
     val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
     val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
     val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(200)
+            .withRequiredArg
+            .describedAs("size")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(200)
     val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
+            .withRequiredArg
+            .describedAs("count")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(1)
     val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed")
-      .withRequiredArg
-      .describedAs("compression codec ")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
-    val initialMessageIdOpt = parser.accepts("initial-message-id", "If set, messages will be tagged with an " + 
-        "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + 
-        "in the form of 'Message:000...1:xxx...'")
-      .withRequiredArg()
-      .describedAs("initial message id")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
+            .withRequiredArg
+            .describedAs("compression codec ")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(0)
+    val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
+            "ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
+            "in the form of 'Message:000...1:xxx...'")
+            .withRequiredArg()
+            .describedAs("initial message id")
+            .ofType(classOf[java.lang.Integer])
+    val messageSendGapMsOpt = parser.accepts("message-send-gap", "If set, the send thread will wait for specified time between two sends")
+            .withRequiredArg()
+            .describedAs("message send time gap")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(0)
+    val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
 
     val options = parser.parse(args : _*)
-    for(arg <- List(topicOpt, brokerListOpt, numMessagesOpt)) {
+    for(arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) {
       if(!options.has(arg)) {
         System.err.println("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
         System.exit(1)
       }
     }
-    val topic = options.valueOf(topicOpt)
+    val topicsStr = options.valueOf(topicsOpt)
+    val topics = topicsStr.split(",")
     val numMessages = options.valueOf(numMessagesOpt).longValue
     val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
     val showDetailedStats = options.has(showDetailedStatsOpt)
@@ -135,32 +155,64 @@
     var batchSize = options.valueOf(batchSizeOpt).intValue
     var numThreads = options.valueOf(numThreadsOpt).intValue
     val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
-    val initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
+    val startIndex = options.valueOf(startIndexOpt).longValue()
     val seqIdMode = options.has(initialMessageIdOpt)
+    var initialMessageId: Int = 0
+    if (seqIdMode)
+      initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
     val produceRequestTimeoutMs = options.valueOf(produceRequestTimeoutMsOpt).intValue()
     val produceRequestRequiredAcks = options.valueOf(produceRequestRequiredAcksOpt).intValue()
+    val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue()
+    val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
 
-    // override necessary flags in seqIdMode
-    if (seqIdMode) { 
-      batchSize = 1
-      isFixSize = true
-
-      warn("seqIdMode - isAsync is overridden to:" + isAsync)
-      warn("seqIdMode - batchSize is overridden to:" + batchSize)
-      warn("seqIdMode - sFixSize is overridden to: " + isFixSize)
+    if (csvMetricsReporterEnabled) {
+      val props = new Properties()
+      props.put("kafka.metrics.polling.interval.secs", "5")
+      props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
+      props.put("kafka.csv.metrics.dir", "kafka_metrics")
+      props.put("kafka.csv.metrics.reporter.enabled", "true")
+      val verifiableProps = new VerifiableProperties(props)
+      val metricsConfig = new KafkaMetricsConfig(verifiableProps)
+      metricsConfig.reporters.foreach(reporterType => {
+        val reporter = Utils.createObject[KafkaMetricsReporter](reporterType)
+        reporter.init(verifiableProps)
+        if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
+          Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
+      })
     }
   }
 
-  private def getStringOfLength(len: Int) : String = {
-    val strArray = new Array[Char](len)
-    for (i <- 0 until len)
-      strArray(i) = 'x'
-    return new String(strArray)
+  private def getUniqRandByteArrayOfLength(len: Int, urand: UniqueRandom): Array[Byte] = {
+    var nextVal: Long = 0
+    urand.synchronized {
+      nextVal = urand.next()
+    }
+    ByteBuffer.allocate(len + 10).putLong(nextVal).array() // add 10 to len to prevent overflow
   }
 
-  private def getByteArrayOfLength(len: Int): Array[Byte] = {
-    //new Array[Byte](len)
-    new Array[Byte]( if (len == 0) 5 else len )
+  class UniqueRandom(start: Long, end: Long) {
+    val list = new ArrayList[Long]()
+    var index = 0
+
+    def newList() {
+      for (i <- start until end + 1) {
+        list.add(i)
+      }
+      Collections.shuffle(list)
+    }
+
+    def hasNext(): Boolean = {
+      if ( index < list.size() )
+        return true
+      else
+        return false
+    }
+
+    def next(): Long = {
+      val token = list.get(index)
+      index += 1
+      return token
+    }
   }
 
   class ProducerThread(val threadId: Int,
@@ -168,7 +220,8 @@
                        val totalBytesSent: AtomicLong,
                        val totalMessagesSent: AtomicLong,
                        val allDone: CountDownLatch,
-                       val rand: Random) extends Runnable {
+                       val rand: Random,
+                       val urand: UniqueRandom) extends Runnable {
     val props = new Properties()
     props.put("broker.list", config.brokerList)
     props.put("compression.codec", config.compressionCodec.codec.toString)
@@ -186,89 +239,83 @@
     val producer = new Producer[Message, Message](producerConfig)
     val seqIdNumDigit = 10   // no. of digits for max int value
 
+    val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads / config.batchSize
+    else config.numMessages / config.numThreads
+    debug("Messages per thread = " + messagesPerThread)
+
+    // generate the sequential message ID
+    private val SEP            = ":"              // message field separator
+    private val messageIdLabel = "MessageID"
+    private val threadIdLabel  = "ThreadID"
+    private val topicLabel     = "Topic"
+    private var leftPaddedSeqId : String = ""
+    private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Message = {
+      // Each thread gets a unique range of sequential no. for its ids.
+      // Eg. 1000 msg in 10 threads => 100 msg per thread
+      // thread 0 IDs :   0 ~  99
+      // thread 1 IDs : 100 ~ 199
+      // thread 2 IDs : 200 ~ 299
+      // . . .
+      leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId))
+
+      val msgHeader = topicLabel      + SEP +
+              topic           + SEP +
+              threadIdLabel   + SEP +
+              threadId        + SEP +
+              messageIdLabel  + SEP +
+              leftPaddedSeqId + SEP
+
+      val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x')
+      debug(seqMsgString)
+      return new Message(seqMsgString.getBytes())
+    }
+
+    private def generateProducerData(topic: String, sendBatchId: Long): (ProducerData[Message, Message], Int) = {
+      if(!config.isAsync) {
+        var messageSet: List[Message] = Nil
+        var bytesSent = 0
+        val batchStartMsgId = config.initialMessageId + (messagesPerThread * threadId) + config.batchSize * sendBatchId
+        for(k <- 0 until config.batchSize) {
+          val msg = if(config.seqIdMode)
+            generateMessageWithSeqId(topic, batchStartMsgId + k, if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize))
+          else
+            new Message(getUniqRandByteArrayOfLength(if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize), urand))
+          messageSet ::= msg
+          bytesSent += msg.payloadSize
+        }
+        (new ProducerData[Message, Message](topic, null, messageSet), bytesSent)
+      } else {
+        val message = if(config.seqIdMode)
+          generateMessageWithSeqId(topic, config.initialMessageId + (messagesPerThread * threadId) + sendBatchId, if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize))
+        else
+          new Message(getUniqRandByteArrayOfLength(if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize), urand))
+        (new ProducerData[Message, Message](topic, null, message), message.payloadSize)
+      }
+    }
+
+
     override def run {
       var bytesSent = 0L
       var lastBytesSent = 0L
       var nSends = 0
       var lastNSends = 0
-      var message = new Message(new Array[Byte](config.messageSize))
       var reportTime = System.currentTimeMillis()
       var lastReportTime = reportTime
-      val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads / config.batchSize
-                              else config.numMessages / config.numThreads
-      debug("Messages per thread = " + messagesPerThread)
 
-      // generate the sequential message ID
-      val SEP            = ":"              // message field separator
-      val messageIdLabel = "MessageID"
-      val threadIdLabel  = "ThreadID"
-      val topicLabel     = "Topic"
-      var leftPaddedSeqId : String = ""
-      
       var j: Long = 0L
       while(j < messagesPerThread) {
-        var strLength = config.messageSize
-        
-        if (config.seqIdMode) {
-          // Each thread gets a unique range of sequential no. for its ids.
-          // Eg. 1000 msg in 10 threads => 100 msg per thread
-          // thread 0 IDs :   0 ~  99
-          // thread 1 IDs : 100 ~ 199
-          // thread 2 IDs : 200 ~ 299
-          // . . .
-          
-          val msgId = config.initialMessageId + (messagesPerThread * threadId) + j
-          leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId))
-          
-          val msgHeader = topicLabel      + SEP + 
-                          config.topic    + SEP + 
-                          threadIdLabel   + SEP + 
-                          threadId        + SEP + 
-                          messageIdLabel  + SEP + 
-                          leftPaddedSeqId + SEP
-                             
-          val seqMsgString = String.format("%1$-"+config.messageSize+"s", msgHeader).replace(' ', 'x')
-          
-          debug(seqMsgString)
-          message = new Message(seqMsgString.getBytes())
-        }
-
-        var messageSet: List[Message] = Nil
-        if(config.isFixSize) {
-          for(k <- 0 until config.batchSize) {
-            messageSet ::= message
-          }
-        }
-
-        if (!config.isFixSize) {
-          for(k <- 0 until config.batchSize) {
-            strLength = rand.nextInt(config.messageSize)
-            messageSet ::= message
-            bytesSent += message.payloadSize
-          }
-        }else if(!config.isAsync) {
-          bytesSent += config.batchSize*message.payloadSize
-        }
-        try  {
-          if(!config.isAsync) {
-            producer.send(new ProducerData[Message,Message](config.topic, null, messageSet))
-            if(!config.isFixSize) messageSet = Nil
-            nSends += config.batchSize
-          }else {
-            if(!config.isFixSize) {
-              strLength = rand.nextInt(config.messageSize)
-              val messageBytes = getByteArrayOfLength(strLength)
-              rand.nextBytes(messageBytes)
-              val message = new Message(messageBytes)
-              producer.send(new ProducerData[Message,Message](config.topic, message))
-              bytesSent += message.payloadSize
-            }else {
-              producer.send(new ProducerData[Message,Message](config.topic, message))
-              bytesSent += message.payloadSize
+        try {
+          config.topics.foreach(
+            topic =>{
+              val (producerData, bytesSent_) = generateProducerData(topic, j)
+              bytesSent += bytesSent_
+              producer.send(producerData)
+              nSends += (if(!config.isAsync) config.batchSize else 1)
+              if(config.messageSendGapMs > 0)
+                Thread.sleep(config.messageSendGapMs)
             }
-            nSends += 1
-          }
-        }catch {
+          )
+        } catch {
           case e: Exception => error("Error sending messages", e)
         }
         if(nSends % config.reportingInterval == 0) {
@@ -280,7 +327,7 @@
           val formattedReportTime = config.dateFormat.format(reportTime)
           if(config.showDetailedStats)
             println(("%s, %d, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(formattedReportTime, config.compressionCodec.codec,
-              threadId, config.messageSize, config.batchSize, (bytesSent*1.0)/(1024 * 1024), mbPerSec, nSends, numMessagesPerSec))
+                                                                        threadId, config.messageSize, config.batchSize, (bytesSent*1.0)/(1024 * 1024), mbPerSec, nSends, numMessagesPerSec))
           lastReportTime = reportTime
           lastBytesSent = bytesSent
           lastNSends = nSends
