Index: system_test/utils/kafka_system_test_utils.py
===================================================================
--- system_test/utils/kafka_system_test_utils.py	(revision 1400581)
+++ system_test/utils/kafka_system_test_utils.py	(working copy)
@@ -366,8 +366,13 @@
                         logger.error("Unknown cluster name: " + clusterName)
                         sys.exit(1)
 
+                    addedCSVConfig = {}
+                    addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", clusterCfg["entity_id"], "metrics") 
+                    addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" 
+                    addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" 
+                    addedCSVConfig["kafka.csv.metrics.reporter.enabled"] = "true"
                     copy_file_with_dict_values(cfgTemplatePathname + "/server.properties",
-                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, None)
+                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, addedCSVConfig)
 
                 elif ( clusterCfg["role"] == "zookeeper"):
                     if clusterCfg["cluster_name"] == "source":
@@ -391,6 +396,33 @@
                     tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
                     copy_file_with_dict_values(cfgTemplatePathname + "/mirror_consumer.properties",
                         cfgDestPathname + "/" + tcCfg["mirror_consumer_config_filename"], tcCfg, None)
+                
+                elif ( clusterCfg["role"] == "producer_performance" ):
+                    copy_file_with_dict_values(cfgTemplatePathname + "/producer_performance.properties",
+                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, None)
+
+                elif ( clusterCfg["role"] == "console_consumer" ):
+                    copy_file_with_dict_values(cfgTemplatePathname + "/console_consumer.properties",
+                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, None)
+                
+                elif ( clusterCfg["role"] == "producer" ):
+                    addedCSVConfig = {}
+                    addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "producer", clusterCfg["entity_id"], "metrics") 
+                    addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" 
+                    addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" 
+                    addedCSVConfig["kfka.metrics.polling.interval.secsafka.csv.metrics.reporter.enabled"] = "true"
+                    copy_file_with_dict_values(cfgTemplatePathname + "/producer.properties",
+                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, addedCSVConfig)
+
+
+                elif ( clusterCfg["role"] == "consumer" ):
+                    addedCSVConfig = {}
+                    addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "consumer", clusterCfg["entity_id"], "metrics") 
+                    addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" 
+                    addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" 
+                    addedCSVConfig["kfka.metrics.polling.interval.secsafka.csv.metrics.reporter.enabled"] = "true"
+                    copy_file_with_dict_values(cfgTemplatePathname + "/consumer.properties",
+                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, addedCSVConfig)
                 else:
                     logger.debug("UNHANDLED role " + clusterCfg["role"], extra=d)
 
@@ -669,11 +701,7 @@
             elif role == "mirror_maker":
                 testcaseEnv.entityMirrorMakerParentPidDict[entityId] = tokens[1]
 
-    time.sleep(1)
-    if role != "mirror_maker":
-        metrics.start_metrics_collection(hostname, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
-
 def start_console_consumer(systemTestEnv, testcaseEnv):
 
     clusterList = systemTestEnv.clusterEntityConfigDictList
@@ -729,6 +757,8 @@
                    "--zookeeper " + zkConnectStr,
                    "--topic " + topic,
                    "--consumer-timeout-ms " + timeoutMs,
+                   "--csv-reporter-enable",
+                   "--metrics-dir " + get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "metrics"),
                    formatterOption,
                    "--from-beginning ",
                    " >> " + consumerLogPathName,
@@ -738,8 +768,6 @@
 
         logger.debug("executing command: [" + cmdStr + "]", extra=d)
         system_test_utils.async_sys_call(cmdStr)
-        time.sleep(2)
-        metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
         pidCmdStr = "ssh " + host + " 'cat " + consumerLogPath + "/entity_" + entityId + "_pid'"
         logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
@@ -780,8 +808,6 @@
         logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d)
         time.sleep(1)
         testcaseEnv.lock.release()
-        time.sleep(1)
-        metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
 def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafka07Client):
     host              = producerConfig["hostname"]
@@ -850,6 +876,8 @@
                        "--compression-codec " + compCodec,
                        "--message-size " + messageSize,
                        "--request-num-acks " + requestNumAcks,
+                       "--csv-reporter-enabled",
+                       "--metrics-dir " + get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "metrics"),
                        boolArgumentsStr,
                        " >> " + producerLogPathName,
                        " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
Index: system_test/utils/metrics.py
===================================================================
--- system_test/utils/metrics.py	(revision 1400581)
+++ system_test/utils/metrics.py	(working copy)
@@ -45,6 +45,26 @@
 thisClassName = '(metrics)'
 d = {'name_of_class': thisClassName}
 
+attributeNameToNameInReportedFileMap = {
+    'Min': 'min',
+    'Max': 'max',
+    'Mean': 'mean',
+    '50thPercentile': 'median',
+    'StdDev': 'stddev',
+    '95thPercentile': '95%',
+    '99thPercentile': '99%',
+    '999thPercentile': '99.9%',
+    'Count': 'count',
+    'OneMinuteRate': '1 min rate',
+    'MeanRate': 'mean rate',
+    'FiveMinuteRate': '5 min rate',
+    'FifteenMinuteRate': '15 min rate',
+    'Value': 'value'
+}
+
+def getCSVFileNameFromMetricsMbeanName(mbeanName):
+    return mbeanName.replace(":type=", ".").replace(",name=", ".") + ".csv"
+
 def read_metrics_definition(metricsFile):
     metricsFileData = open(metricsFile, "r").read()
     metricsJsonData = json.loads(metricsFileData)
@@ -71,7 +91,7 @@
     return dashboardsForRole
 
 def ensure_valid_headers(headers, attributes):
-    if headers[0] != "time":
+    if headers[0] != "# time":
         raise Exception("First column should be time")
     for header in headers:
         logger.debug(header, extra=d)
@@ -108,14 +128,17 @@
         try:
             # read first line as the headers
             headers = csv_reader.pop(0)
-            attributeColumnIndex = ensure_valid_headers(headers, attribute)
+            attributeColumnIndex = ensure_valid_headers(headers, attributeNameToNameInReportedFileMap[attribute])
             logger.debug("Column index for attribute {0} is {1}".format(attribute, attributeColumnIndex), extra=d)
-            start_time = int(csv_reader[0][0])
+            start_time = (int)(os.path.getctime(inputCsvFile) * 1000)
+            int(csv_reader[0][0])
             for line in csv_reader:
+                if(len(line) == 0):
+                    continue
                 yVal = float(line[attributeColumnIndex])                
-                xVal = (int(line[0])-start_time)/1000
+                xVal = int(line[0])
                 y.append(yVal)
-                epoch=int(line[0])/1000
+                epoch= start_time + int(line[0])
                 x.append(xVal)
                 xticks_labels.append(time.strftime("%H:%M:%S", time.localtime(epoch)))
                 coordinates.append(Coordinates(xVal, yVal))
@@ -160,7 +183,7 @@
         graphLegendLabels = []
         for entity in entities:
             entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entity['entity_id'], "metrics")
-            entityMetricCsvFile = entityMetricsDir + "/" + graph['bean_name'] + ".csv"
+            entityMetricCsvFile = entityMetricsDir + "/" + getCSVFileNameFromMetricsMbeanName(graph['bean_name'])
             inputCsvFiles.append(entityMetricCsvFile)
             graphLegendLabels.append(role + "-" + entity['entity_id'])
 #            print "Plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
@@ -173,7 +196,7 @@
             for labelAndAttribute in zip(labels, fullyQualifiedAttributeNames, attributes):            
                 outputGraphFile = testcaseEnv.testCaseDashboardsDir + "/" + role + "/" + labelAndAttribute[1] + ".svg"            
                 plot_graphs(inputCsvFiles, graphLegendLabels, graph['graph_name'] + '-' + labelAndAttribute[2], 
-                            "time", labelAndAttribute[0], labelAndAttribute[1], outputGraphFile)
+                            "time", labelAndAttribute[0], labelAndAttribute[2], outputGraphFile)
 #            print "Finished plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
         except Exception as e:
             logger.error("ERROR while plotting graph {0}: {1}".format(outputGraphFile, e), extra=d)
Index: system_test/metrics.json
===================================================================
--- system_test/metrics.json	(revision 1400581)
+++ system_test/metrics.json	(working copy)
@@ -90,14 +90,14 @@
                {
                   "graph_name": "ControllerLeaderElectionRateAndTime",
                   "y_label": "elections-per-sec,ms,ms",
-                  "bean_name": "kafka.server:type=ControllerStat,name=LeaderElectionRateAndTimeMs",
+                  "bean_name": "kafka.controller:type=ControllerStat,name=LeaderElectionRateAndTimeMs",
                   "attributes": "OneMinuteRate,Mean,99thPercentile"
                },
                {
                   "graph_name": "LogFlushRateAndTime",
                   "y_label": "flushes-per-sec,ms,ms",
-                  "bean_name": "kafka.message:type=LogFlushStats,name=LogFlushRateAndTimeMs",
-                  "attributes": "OneMinuteRate,Mean,99thPercentile"
+                  "bean_name": "kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs",
+                  "attributes": "Mean,99thPercentile,OneMinuteRate"
                },
                {
                   "graph_name": "AllBytesOutRate",
@@ -126,7 +126,7 @@
                   "graph_name": "ProduceRequestRateAndTime",
                   "y_label": "requests-per-sec,ms,ms",
                   "bean_name": "kafka.producer:type=ProducerRequestStat,name=ProduceRequestRateAndTimeMs",
-                  "attributes": "OneMinuteRate,Mean,99thPercentile"
+                  "attributes": "Mean,99thPercentile,OneMinuteRate"
                },
                {
                   "graph_name": "ProduceRequestSize",
@@ -143,7 +143,7 @@
                   "graph_name": "FetchRequestRateAndTime",
                   "y_label": "requests-per-sec,ms,ms",
                   "bean_name": "kafka.consumer:type=FetchRequestAndResponseStat,name=FetchRequestRateAndTimeMs",
-                  "attributes": "OneMinuteRate,Mean,99thPercentile"
+                  "attributes": "Mean,99thPercentile,OneMinuteRate"
                },
                {
                   "graph_name": "FetchResponseSize",
Index: core/src/main/scala/kafka/producer/Producer.scala
===================================================================
--- core/src/main/scala/kafka/producer/Producer.scala	(revision 1400581)
+++ core/src/main/scala/kafka/producer/Producer.scala	(working copy)
@@ -23,8 +23,9 @@
 import kafka.serializer.Encoder
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.common.{QueueFullException, InvalidConfigException}
-import kafka.metrics.KafkaMetricsGroup
+import kafka.metrics._
 
+
 class Producer[K,V](config: ProducerConfig,
                     private val eventHandler: EventHandler[K,V]) // for testing only
 extends Logging {
@@ -48,6 +49,8 @@
     case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
   }
 
+  KafkaCSVMetricsReporter.startCSVMetricReporter(config.props)
+
   def this(config: ProducerConfig) =
     this(config,
          new DefaultEventHandler[K,V](config,
Index: core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala	(revision 1400581)
+++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.consumer
 
-import scala.collection.mutable._
 import scala.collection.JavaConversions._
 import org.I0Itec.zkclient._
 import joptsimple._
@@ -25,10 +24,11 @@
 import java.util.Random
 import java.io.PrintStream
 import kafka.message._
-import kafka.utils.{Utils, Logging, ZkUtils, CommandLineUtils}
-import kafka.utils.ZKStringSerializer
 import kafka.serializer.StringDecoder
+import kafka.utils._
+import kafka.metrics.{KafkaCSVMetricsReporter, KafkaMetricsReporterMBean, KafkaMetricsReporter, KafkaMetricsConfig}
 
+
 /**
  * Consumer that dumps messages out to standard out.
  *
@@ -107,7 +107,14 @@
             .ofType(classOf[java.lang.Integer])
     val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
             "skip it instead of halt.")
+    val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
+    val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
+            "set, the csv metrics will be outputed here")
+      .withRequiredArg
+      .describedAs("metrics dictory")
+      .ofType(classOf[java.lang.String])
 
+
     val options: OptionSet = tryParse(parser, args)
     CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
     val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
@@ -122,6 +129,20 @@
     else
       new Whitelist(topicArg)
 
+    val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
+    if (csvMetricsReporterEnabled) {
+      val csvReporterProps = new Properties()
+      csvReporterProps.put("kafka.metrics.polling.interval.secs", "5")
+      csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
+      if (options.has(metricsDirectoryOpt))
+        csvReporterProps.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
+      else
+        csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics")
+      csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true")
+      val verifiableProps = new VerifiableProperties(csvReporterProps)
+      KafkaCSVMetricsReporter.startCSVMetricReporter(verifiableProps)
+    }
+
     val props = new Properties()
     props.put("groupid", options.valueOf(groupIdOpt))
     props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString)
Index: core/src/main/scala/kafka/consumer/ConsumerConfig.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerConfig.scala	(revision 1400581)
+++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala	(working copy)
@@ -45,7 +45,7 @@
   val DefaultClientId = ""
 }
 
-class ConsumerConfig private (props: VerifiableProperties) extends ZKConfig(props) {
+class ConsumerConfig (val props: VerifiableProperties) extends ZKConfig(props) {
   import ConsumerConfig._
 
   def this(originalProps: Properties) {
Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(revision 1400581)
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(working copy)
@@ -33,9 +33,8 @@
 import kafka.common._
 import kafka.client.ClientUtils
 import com.yammer.metrics.core.Gauge
-import kafka.metrics.KafkaMetricsGroup
-import kafka.utils.Utils._
 import kafka.api.OffsetRequest
+import kafka.metrics._
 
 
 /**
@@ -118,6 +117,8 @@
       config.autoCommitIntervalMs, false)
   }
 
+  KafkaCSVMetricsReporter.startCSVMetricReporter(config.props)
+
   def this(config: ConsumerConfig) = this(config, true)
 
   def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T])
Index: core/src/main/scala/kafka/Kafka.scala
===================================================================
--- core/src/main/scala/kafka/Kafka.scala	(revision 1400581)
+++ core/src/main/scala/kafka/Kafka.scala	(working copy)
@@ -17,7 +17,8 @@
 
 package kafka
 
-import metrics.{KafkaMetricsReporterMBean, KafkaMetricsReporter, KafkaMetricsConfig}
+
+import metrics.KafkaCSVMetricsReporter
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
 import utils.{Utils, Logging}
 
@@ -32,15 +33,7 @@
     try {
       val props = Utils.loadProps(args(0))
       val serverConfig = new KafkaConfig(props)
-      val verifiableProps = serverConfig.props
-      val metricsConfig = new KafkaMetricsConfig(verifiableProps)
-      metricsConfig.reporters.foreach(reporterType => {
-        val reporter = Utils.createObject[KafkaMetricsReporter](reporterType)
-        reporter.init(verifiableProps)
-        if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
-          Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
-      })
-
+      KafkaCSVMetricsReporter.startCSVMetricReporter(serverConfig.props)
       val kafkaServerStartble = new KafkaServerStartable(serverConfig)
 
       // attach shutdown handler to catch control-c
Index: core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
===================================================================
--- core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala	(revision 1400581)
+++ core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala	(working copy)
@@ -24,11 +24,33 @@
 import java.io.File
 import com.yammer.metrics.reporting.CsvReporter
 import java.util.concurrent.TimeUnit
-import kafka.utils.{VerifiableProperties, Logging}
+import java.util.concurrent.atomic.AtomicBoolean
+import kafka.utils.{Utils, VerifiableProperties, Logging}
 
 
 private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean
 
+object KafkaCSVMetricsReporter {
+  val CSVReporterStarted: AtomicBoolean = new AtomicBoolean(false)
+
+  def startCSVMetricReporter (verifiableProps: VerifiableProperties) {
+    CSVReporterStarted synchronized {
+      if (CSVReporterStarted.get() == false) {
+        val metricsConfig = new KafkaMetricsConfig(verifiableProps)
+        if(metricsConfig.reporters.size > 0) {
+          metricsConfig.reporters.foreach(reporterType => {
+            val reporter = Utils.createObject[KafkaMetricsReporter](reporterType)
+            reporter.init(verifiableProps)
+            if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
+              Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
+          })
+          CSVReporterStarted.set(true)
+        }
+      }
+    }
+  }
+}
+
 private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
                               with KafkaCSVMetricsReporterMBean
                               with Logging {
Index: perf/src/main/scala/kafka/perf/ProducerPerformance.scala
===================================================================
--- perf/src/main/scala/kafka/perf/ProducerPerformance.scala	(revision 1400581)
+++ perf/src/main/scala/kafka/perf/ProducerPerformance.scala	(working copy)
@@ -24,8 +24,10 @@
 import kafka.message.{CompressionCodec, Message}
 import java.text.SimpleDateFormat
 import java.util.{Random, Properties}
-import kafka.utils.Logging
+import kafka.utils.{VerifiableProperties, Logging}
+import kafka.metrics.{KafkaCSVMetricsReporter, KafkaMetricsReporterMBean, KafkaMetricsReporter, KafkaMetricsConfig}
 
+
 /**
  * Load test for the producer
  */
@@ -113,6 +115,12 @@
       .describedAs("initial message id")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(0)
+    val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
+    val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
+            "set, the csv metrics will be outputed here")
+      .withRequiredArg
+      .describedAs("metrics dictory")
+      .ofType(classOf[java.lang.String])
 
     val options = parser.parse(args : _*)
     for(arg <- List(topicOpt, brokerListOpt, numMessagesOpt)) {
@@ -140,6 +148,21 @@
     val produceRequestTimeoutMs = options.valueOf(produceRequestTimeoutMsOpt).intValue()
     val produceRequestRequiredAcks = options.valueOf(produceRequestRequiredAcksOpt).intValue()
 
+    val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
+
+    if (csvMetricsReporterEnabled) {
+      val props = new Properties()
+      props.put("kafka.metrics.polling.interval.secs", "5")
+      props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
+      if (options.has(metricsDirectoryOpt))
+        props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
+      else
+        props.put("kafka.csv.metrics.dir", "kafka_metrics")
+      props.put("kafka.csv.metrics.reporter.enabled", "true")
+      val verifiableProps = new VerifiableProperties(props)
+      KafkaCSVMetricsReporter.startCSVMetricReporter(verifiableProps)
+    }
+
     // override necessary flags in seqIdMode
     if (seqIdMode) { 
       batchSize = 1
Index: config/server.properties
===================================================================
--- config/server.properties	(revision 1400581)
+++ config/server.properties	(working copy)
@@ -115,8 +115,8 @@
 zk.connectiontimeout.ms=1000000
 
 # metrics reporter properties
-# kafka.metrics.polling.interval.secs=5
-# kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
-# kafka.csv.metrics.dir=kafka_metrics
-# kafka.csv.metrics.reporter.enabled=true
+kafka.metrics.polling.interval.secs=5
+kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
+kafka.csv.metrics.dir=kafka_metrics
+kafka.csv.metrics.reporter.enabled=true
 
