Index: system_test/mirror_maker/bin/run-test.sh
===================================================================
--- system_test/mirror_maker/bin/run-test.sh	(revision 1401495)
+++ system_test/mirror_maker/bin/run-test.sh	(working copy)
@@ -131,7 +131,7 @@
     topic=$1
     zk=$2
     info "start producing messages for topic $topic to zookeeper $zk ..."
-    $base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo zk.connect=$zk --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log &
+    $base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo zk.connect=$zk --topics $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log &
     pid_producer=$!
 }
 
Index: system_test/broker_failure/bin/run-test.sh
===================================================================
--- system_test/broker_failure/bin/run-test.sh	(revision 1401495)
+++ system_test/broker_failure/bin/run-test.sh	(working copy)
@@ -450,7 +450,7 @@
         $base_dir/bin/kafka-run-class.sh \
             kafka.perf.ProducerPerformance \
             --brokerinfo zk.connect=localhost:2181 \
-            --topic $topic \
+            --topics $topic \
             --messages $num_msg_per_batch \
             --message-size $message_size \
             --threads $num_producer_threads \
Index: system_test/producer_perf/bin/run-compression-test.sh
===================================================================
--- system_test/producer_perf/bin/run-compression-test.sh	(revision 1401495)
+++ system_test/producer_perf/bin/run-compression-test.sh	(working copy)
@@ -28,7 +28,7 @@
 
 sleep 4
 echo "start producing $num_messages messages ..."
-$base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --compression-codec 1 
+$base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topics test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --compression-codec 1 
 
 echo "wait for data to be persisted" 
 cur_offset="-1"
Index: system_test/producer_perf/bin/run-test.sh
===================================================================
--- system_test/producer_perf/bin/run-test.sh	(revision 1401495)
+++ system_test/producer_perf/bin/run-test.sh	(working copy)
@@ -28,7 +28,7 @@
 
 sleep 4
 echo "start producing $num_messages messages ..."
-$base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async
+$base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topics test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async
 
 echo "wait for data to be persisted" 
 cur_offset="-1"
Index: system_test/utils/kafka_system_test_utils.py
===================================================================
--- system_test/utils/kafka_system_test_utils.py	(revision 1401495)
+++ system_test/utils/kafka_system_test_utils.py	(working copy)
@@ -871,7 +871,7 @@
                        "--broker-list " + brokerListStr,
                        "--initial-message-id " + str(initMsgId),
                        "--messages " + noMsgPerBatch,
-                       "--topic " + topic,
+                       "--topics " + topic,
                        "--threads " + threads,
                        "--compression-codec " + compCodec,
                        "--message-size " + messageSize,
Index: perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
===================================================================
--- perf/src/main/scala/kafka/perf/ConsumerPerformance.scala	(revision 1401495)
+++ perf/src/main/scala/kafka/perf/ConsumerPerformance.scala	(working copy)
@@ -86,6 +86,10 @@
                            .withRequiredArg
                            .describedAs("urls")
                            .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
     val groupIdOpt = parser.accepts("group", "The group id to consume on.")
                            .withRequiredArg
                            .describedAs("gid")
Index: perf/src/main/scala/kafka/perf/PerfConfig.scala
===================================================================
--- perf/src/main/scala/kafka/perf/PerfConfig.scala	(revision 1401495)
+++ perf/src/main/scala/kafka/perf/PerfConfig.scala	(working copy)
@@ -22,10 +22,6 @@
 
 class PerfConfig(args: Array[String]) {
   val parser = new OptionParser
-  val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
-    .withRequiredArg
-    .describedAs("topic")
-    .ofType(classOf[String])
   val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume")
     .withRequiredArg
     .describedAs("count")
Index: perf/src/main/scala/kafka/perf/ProducerPerformance.scala
===================================================================
--- perf/src/main/scala/kafka/perf/ProducerPerformance.scala	(revision 1401495)
+++ perf/src/main/scala/kafka/perf/ProducerPerformance.scala	(working copy)
@@ -23,9 +23,11 @@
 import org.apache.log4j.Logger
 import kafka.message.{CompressionCodec, Message}
 import java.text.SimpleDateFormat
-import java.util.{Random, Properties}
+import java.util._
+import java.nio.ByteBuffer
+import collection.immutable.List
 import kafka.utils.{VerifiableProperties, Logging}
-import kafka.metrics.{KafkaCSVMetricsReporter, KafkaMetricsReporterMBean, KafkaMetricsReporter, KafkaMetricsConfig}
+import kafka.metrics.KafkaCSVMetricsReporter
 
 
 /**
@@ -50,10 +52,10 @@
     if(!config.hideHeader) {
       if(!config.showDetailedStats)
         println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +
-          "total.data.sent.in.nMsg, nMsg.sec")
+                        "total.data.sent.in.nMsg, nMsg.sec")
       else
         println("time, compression, thread.id, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +
-          "total.data.sent.in.nMsg, nMsg.sec")
+                        "total.data.sent.in.nMsg, nMsg.sec")
     }
 
     for(i <- 0 until config.numThreads) {
@@ -66,55 +68,63 @@
     if(!config.showDetailedStats) {
       val totalMBSent = (totalBytesSent.get * 1.0)/ (1024 * 1024)
       println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs),
-        config.dateFormat.format(endMs), config.compressionCodec.codec, config.messageSize, config.batchSize,
-        totalMBSent, totalMBSent/elapsedSecs, totalMessagesSent.get, totalMessagesSent.get/elapsedSecs))
+                                                                  config.dateFormat.format(endMs), config.compressionCodec.codec, config.messageSize, config.batchSize,
+                                                                  totalMBSent, totalMBSent/elapsedSecs, totalMessagesSent.get, totalMessagesSent.get/elapsedSecs))
     }
     System.exit(0)
   }
 
   class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: the broker list must be specified.")
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.")
+            .withRequiredArg
+            .describedAs("hostname:port,..,hostname:port")
+            .ofType(classOf[String])
+    val topicsOpt = parser.accepts("topics", "REQUIRED: The set of topics to produce to / consume from.")
       .withRequiredArg
-      .describedAs("hostname:port")
+      .describedAs("topic1,topic2..")
       .ofType(classOf[String])
     val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(3000)
+            .withRequiredArg()
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(3000)
     val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
-      "to complete")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(-1)
+            "to complete")
+            .withRequiredArg()
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(-1)
     val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(100)
+            .withRequiredArg
+            .describedAs("size")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(100)
     val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
     val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
     val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(200)
+            .withRequiredArg
+            .describedAs("size")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(200)
     val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
+            .withRequiredArg
+            .describedAs("count")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(1)
     val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed")
-      .withRequiredArg
-      .describedAs("compression codec ")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
-    val initialMessageIdOpt = parser.accepts("initial-message-id", "If set, messages will be tagged with an " + 
-        "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + 
-        "in the form of 'Message:000...1:xxx...'")
-      .withRequiredArg()
-      .describedAs("initial message id")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
+            .withRequiredArg
+            .describedAs("compression codec ")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(0)
+    val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
+            "ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
+            "in the form of 'Message:000...1:xxx...'")
+            .withRequiredArg()
+            .describedAs("initial message id")
+            .ofType(classOf[java.lang.Integer])
+    val messageSendGapMsOpt = parser.accepts("message-send-gap", "If set, the send thread will wait for specified time between two sends")
+            .withRequiredArg()
+            .describedAs("message send time gap")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(0)
     val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
     val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
             "set, the csv metrics will be outputed here")
@@ -123,14 +133,15 @@
       .ofType(classOf[java.lang.String])
 
     val options = parser.parse(args : _*)
-    for(arg <- List(topicOpt, brokerListOpt, numMessagesOpt)) {
+    for(arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) {
       if(!options.has(arg)) {
         System.err.println("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
         System.exit(1)
       }
     }
-    val topic = options.valueOf(topicOpt)
+    val topicsStr = options.valueOf(topicsOpt)
+    val topics = topicsStr.split(",")
     val numMessages = options.valueOf(numMessagesOpt).longValue
     val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
     val showDetailedStats = options.has(showDetailedStatsOpt)
@@ -143,8 +154,10 @@
     var batchSize = options.valueOf(batchSizeOpt).intValue
     var numThreads = options.valueOf(numThreadsOpt).intValue
     val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
-    val initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
     val seqIdMode = options.has(initialMessageIdOpt)
+    var initialMessageId: Int = 0
+    if (seqIdMode)
+      initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
     val produceRequestTimeoutMs = options.valueOf(produceRequestTimeoutMsOpt).intValue()
     val produceRequestRequiredAcks = options.valueOf(produceRequestRequiredAcksOpt).intValue()
 
@@ -163,29 +176,9 @@
       KafkaCSVMetricsReporter.startCSVMetricReporter(verifiableProps)
     }
 
-    // override necessary flags in seqIdMode
-    if (seqIdMode) { 
-      batchSize = 1
-      isFixSize = true
-
-      warn("seqIdMode - isAsync is overridden to:" + isAsync)
-      warn("seqIdMode - batchSize is overridden to:" + batchSize)
-      warn("seqIdMode - sFixSize is overridden to: " + isFixSize)
-    }
+    val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue()
   }
 
-  private def getStringOfLength(len: Int) : String = {
-    val strArray = new Array[Char](len)
-    for (i <- 0 until len)
-      strArray(i) = 'x'
-    return new String(strArray)
-  }
-
-  private def getByteArrayOfLength(len: Int): Array[Byte] = {
-    //new Array[Byte](len)
-    new Array[Byte]( if (len == 0) 5 else len )
-  }
-
   class ProducerThread(val threadId: Int,
                        val config: ProducerPerfConfig,
                        val totalBytesSent: AtomicLong,
@@ -209,89 +202,66 @@
     val producer = new Producer[Message, Message](producerConfig)
     val seqIdNumDigit = 10   // no. of digits for max int value
 
+    val messagesPerThread = config.numMessages / config.numThreads
+    debug("Messages per thread = " + messagesPerThread)
+
+    // generate the sequential message ID
+    private val SEP            = ":"              // message field separator
+    private val messageIdLabel = "MessageID"
+    private val threadIdLabel  = "ThreadID"
+    private val topicLabel     = "Topic"
+    private var leftPaddedSeqId : String = ""
+    private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Message = {
+      // Each thread gets a unique range of sequential no. for its ids.
+      // Eg. 1000 msg in 10 threads => 100 msg per thread
+      // thread 0 IDs :   0 ~  99
+      // thread 1 IDs : 100 ~ 199
+      // thread 2 IDs : 200 ~ 299
+      // . . .
+      leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId))
+
+      val msgHeader = topicLabel      + SEP +
+              topic           + SEP +
+              threadIdLabel   + SEP +
+              threadId        + SEP +
+              messageIdLabel  + SEP +
+              leftPaddedSeqId + SEP
+
+      val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x')
+      debug(seqMsgString)
+      return new Message(seqMsgString.getBytes())
+    }
+
+    private def generateProducerData(topic: String, messageId: Long): (ProducerData[Message, Message], Int) = {
+      val message = if(config.seqIdMode)
+        generateMessageWithSeqId(topic, config.initialMessageId + (messagesPerThread * threadId) + messageId, if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize))
+      else
+        new Message(ByteBuffer.allocate(if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize)).array())
+      (new ProducerData[Message, Message](topic, null, message), message.payloadSize)
+    }
+
     override def run {
       var bytesSent = 0L
       var lastBytesSent = 0L
       var nSends = 0
       var lastNSends = 0
-      var message = new Message(new Array[Byte](config.messageSize))
       var reportTime = System.currentTimeMillis()
       var lastReportTime = reportTime
-      val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads / config.batchSize
-                              else config.numMessages / config.numThreads
-      debug("Messages per thread = " + messagesPerThread)
 
-      // generate the sequential message ID
-      val SEP            = ":"              // message field separator
-      val messageIdLabel = "MessageID"
-      val threadIdLabel  = "ThreadID"
-      val topicLabel     = "Topic"
-      var leftPaddedSeqId : String = ""
-      
       var j: Long = 0L
       while(j < messagesPerThread) {
-        var strLength = config.messageSize
-        
-        if (config.seqIdMode) {
-          // Each thread gets a unique range of sequential no. for its ids.
-          // Eg. 1000 msg in 10 threads => 100 msg per thread
-          // thread 0 IDs :   0 ~  99
-          // thread 1 IDs : 100 ~ 199
-          // thread 2 IDs : 200 ~ 299
-          // . . .
-          
-          val msgId = config.initialMessageId + (messagesPerThread * threadId) + j
-          leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId))
-          
-          val msgHeader = topicLabel      + SEP + 
-                          config.topic    + SEP + 
-                          threadIdLabel   + SEP + 
-                          threadId        + SEP + 
-                          messageIdLabel  + SEP + 
-                          leftPaddedSeqId + SEP
-                             
-          val seqMsgString = String.format("%1$-"+config.messageSize+"s", msgHeader).replace(' ', 'x')
-          
-          debug(seqMsgString)
-          message = new Message(seqMsgString.getBytes())
-        }
-
-        var messageSet: List[Message] = Nil
-        if(config.isFixSize) {
-          for(k <- 0 until config.batchSize) {
-            messageSet ::= message
-          }
-        }
-
-        if (!config.isFixSize) {
-          for(k <- 0 until config.batchSize) {
-            strLength = rand.nextInt(config.messageSize)
-            messageSet ::= message
-            bytesSent += message.payloadSize
-          }
-        }else if(!config.isAsync) {
-          bytesSent += config.batchSize*message.payloadSize
-        }
-        try  {
-          if(!config.isAsync) {
-            producer.send(new ProducerData[Message,Message](config.topic, null, messageSet))
-            if(!config.isFixSize) messageSet = Nil
-            nSends += config.batchSize
-          }else {
-            if(!config.isFixSize) {
-              strLength = rand.nextInt(config.messageSize)
-              val messageBytes = getByteArrayOfLength(strLength)
-              rand.nextBytes(messageBytes)
-              val message = new Message(messageBytes)
-              producer.send(new ProducerData[Message,Message](config.topic, message))
-              bytesSent += message.payloadSize
-            }else {
-              producer.send(new ProducerData[Message,Message](config.topic, message))
-              bytesSent += message.payloadSize
+        try {
+          config.topics.foreach(
+            topic =>{
+              val (producerData, bytesSent_) = generateProducerData(topic, j)
+              bytesSent += bytesSent_
+              producer.send(producerData)
+              nSends += 1
+              if(config.messageSendGapMs > 0)
+                Thread.sleep(config.messageSendGapMs)
             }
-            nSends += 1
-          }
-        }catch {
+          )
+        } catch {
           case e: Exception => error("Error sending messages", e)
         }
         if(nSends % config.reportingInterval == 0) {
@@ -302,8 +272,9 @@
           val mbPerSec = mbBytesSent / elapsed
           val formattedReportTime = config.dateFormat.format(reportTime)
           if(config.showDetailedStats)
-            println(("%s, %d, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(formattedReportTime, config.compressionCodec.codec,
-              threadId, config.messageSize, config.batchSize, (bytesSent*1.0)/(1024 * 1024), mbPerSec, nSends, numMessagesPerSec))
+            println(("%s, %d, %d, %d, %d, %.2f, %.4f, %d, %.4f").
+                            format(formattedReportTime, config.compressionCodec.codec,threadId,
+                                   config.messageSize, config.batchSize, (bytesSent*1.0)/(1024 * 1024), mbPerSec, nSends, numMessagesPerSec))
           lastReportTime = reportTime
           lastBytesSent = bytesSent
           lastNSends = nSends
Index: perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
===================================================================
--- perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala	(revision 1401495)
+++ perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala	(working copy)
@@ -117,6 +117,10 @@
                            .withRequiredArg
                            .describedAs("kafka://hostname:port")
                            .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
     val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
       "offset to consume from, start with the latest message present in the log rather than the earliest message.")
     val partitionOpt = parser.accepts("partition", "The topic partition to consume from.")
