Index: system_test/single_host_multi_brokers/config/server.properties
===================================================================
--- system_test/single_host_multi_brokers/config/server.properties	(revision 1378227)
+++ system_test/single_host_multi_brokers/config/server.properties	(working copy)
@@ -1,119 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-############################# Server Basics #############################
-
-# The id of the broker. This must be set to a unique integer for each broker.
-brokerid=0
-
-# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
-# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
-# may not be what you want.
-#hostname=
-
-
-############################# Socket Server Settings #############################
-
-# The port the socket server listens on
-port=9091
-
-# The number of threads handling network requests
-network.threads=2
- 
-# The number of threads doing disk I/O
-io.threads=2
-
-# The send buffer (SO_SNDBUF) used by the socket server
-socket.send.buffer=1048576
-
-# The receive buffer (SO_RCVBUF) used by the socket server
-socket.receive.buffer=1048576
-
-# The maximum size of a request that the socket server will accept (protection against OOM)
-max.socket.request.bytes=104857600
-
-
-############################# Log Basics #############################
-
-# The directory under which to store log files
-log.dir=/tmp/kafka_server_1_logs
-
-# The number of logical partitions per topic per server. More partitions allow greater parallelism
-# for consumption, but also mean more files.
-num.partitions=1
-
-# Overrides for for the default given by num.partitions on a per-topic basis
-#topic.partition.count.map=topic1:3, topic2:4
-
-############################# Log Flush Policy #############################
-
-# The following configurations control the flush of data to disk. This is the most
-# important performance knob in kafka.
-# There are a few important trade-offs here:
-#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
-#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
-#    3. Throughput: The flush is generally the most expensive operation. 
-# The settings below allow one to configure the flush policy to flush data after a period of time or
-# every N messages (or both). This can be done globally and overridden on a per-topic basis.
-
-# The number of messages to accept before forcing a flush of data to disk
-log.flush.interval=10000
-
-# The maximum amount of time a message can sit in a log before we force a flush
-log.default.flush.interval.ms=1000
-
-# Per-topic overrides for log.default.flush.interval.ms
-#topic.flush.intervals.ms=topic1:1000, topic2:3000
-
-# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
-log.default.flush.scheduler.interval.ms=1000
-
-############################# Log Retention Policy #############################
-
-# The following configurations control the disposal of log segments. The policy can
-# be set to delete segments after a period of time, or after a given size has accumulated.
-# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
-# from the end of the log.
-
-# The minimum age of a log file to be eligible for deletion
-log.retention.hours=168
-
-# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
-# segments don't drop below log.retention.size.
-#log.retention.size=1073741824
-
-# The maximum size of a log segment file. When this size is reached a new log segment will be created.
-#log.file.size=536870912
-log.file.size=100
-
-# The interval at which log segments are checked to see if they can be deleted according 
-# to the retention policies
-log.cleanup.interval.mins=1
-
-############################# Zookeeper #############################
-
-# Enable connecting to zookeeper
-enable.zookeeper=true
-
-# Zk connection string (see zk docs for details).
-# This is a comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
-# You can also append an optional chroot string to the urls to specify the
-# root directory for all kafka znodes.
-zk.connect=localhost:2181
-
-# Timeout in ms for connecting to zookeeper
-zk.connectiontimeout.ms=1000000
Index: system_test/single_host_multi_brokers/config/consumer.properties
===================================================================
--- system_test/single_host_multi_brokers/config/consumer.properties	(revision 1378227)
+++ system_test/single_host_multi_brokers/config/consumer.properties	(working copy)
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.consumer.ConsumerConfig for more details
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=127.0.0.1:2181
-
-# timeout in ms for connecting to zookeeper
-zk.connectiontimeout.ms=1000000
-
-#consumer group id
-groupid=test-consumer-group
-
-#consumer timeout
-#consumer.timeout.ms=5000
Index: system_test/single_host_multi_brokers/config/log4j.properties
===================================================================
--- system_test/single_host_multi_brokers/config/log4j.properties	(revision 1378227)
+++ system_test/single_host_multi_brokers/config/log4j.properties	(working copy)
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-log4j.rootLogger=INFO, stdout
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
-
-#log4j.appender.fileAppender=org.apache.log4j.FileAppender
-#log4j.appender.fileAppender.File=kafka-request.log
-#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
-#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
-
-
-# Turn on all our debugging info
-#log4j.logger.kafka=INFO
-#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
-
-# to print message checksum from ProducerPerformance
-log4j.logger.kafka.perf=DEBUG
-log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
-log4j.logger.kafka.producer.async=DEBUG
-log4j.logger.kafka.server.KafkaApis=TRACE
-log4j.logger.kafka.server.ReplicaManager=DEBUG
-
-# to print message checksum from ProducerPerformance
-log4j.logger.kafka.perf=DEBUG
-log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
-
Index: system_test/single_host_multi_brokers/config/producer.properties
===================================================================
--- system_test/single_host_multi_brokers/config/producer.properties	(revision 1378227)
+++ system_test/single_host_multi_brokers/config/producer.properties	(working copy)
@@ -1,69 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.producer.ProducerConfig for more details
-
-############################# Producer Basics #############################
-
-# configure brokers statically
-# format: brokerid1:host1:port1,brokerid2:host2:port2 ...
-broker.list=localhost:9091,localhost:9092,localhost:9093
-
-# name of the partitioner class for partitioning events; default partition spreads data randomly
-#partitioner.class=
-
-# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
-producer.type=sync
-
-# specify the compression codec for all data generated: 0: no compression, 1: gzip
-compression.codec=0
-
-# message encoder
-serializer.class=kafka.serializer.StringEncoder
-
-# allow topic level compression
-#compressed.topics=
-
-# max message size; messages larger than that size are discarded; default is 1000000
-#max.message.size=
-
-
-############################# Async Producer #############################
-# maximum time, in milliseconds, for buffering data on the producer queue 
-#queue.time=
-
-# the maximum size of the blocking queue for buffering on the producer 
-#queue.size=
-
-# Timeout for event enqueue:
-# 0: events will be enqueued immediately or dropped if the queue is full
-# -ve: enqueue will block indefinitely if the queue is full
-# +ve: enqueue will block up to this many milliseconds if the queue is full
-#queue.enqueueTimeout.ms=
-
-# the number of messages batched at the producer 
-#batch.size=
-
-# the callback handler for one or multiple events 
-#callback.handler=
-
-# properties required to initialize the callback handler 
-#callback.handler.props=
-
-# the handler for events 
-#event.handler=
-
-# properties required to initialize the event handler 
-#event.handler.props=
-
Index: system_test/single_host_multi_brokers/config/zookeeper.properties
===================================================================
--- system_test/single_host_multi_brokers/config/zookeeper.properties	(revision 1378227)
+++ system_test/single_host_multi_brokers/config/zookeeper.properties	(working copy)
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# the directory where the snapshot is stored.
-dataDir=/tmp/zookeeper_source
-# the port at which the clients will connect
-clientPort=2181
-# disable the per-ip limit on the number of connections since this is a non-production config
-maxClientCnxns=0
Index: system_test/single_host_multi_brokers/bin/run-test.sh
===================================================================
--- system_test/single_host_multi_brokers/bin/run-test.sh	(revision 1378227)
+++ system_test/single_host_multi_brokers/bin/run-test.sh	(working copy)
@@ -1,529 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# ==================================================================
-# run-test.sh
-# 
-# ==================================================================
-
-# ====================================
-# Do not change the followings
-# (keep this section at the beginning
-# of this script)
-# ====================================
-readonly osname=`uname -s`                            # OS name
-readonly system_test_root=$(dirname $0)/../..         # path of <kafka install>/system_test
-readonly common_dir=${system_test_root}/common        # common util scripts for system_test
-source   ${common_dir}/util.sh                        # include the util script
-
-readonly base_dir=$(dirname $0)/..                    # root of this test suite
-readonly base_dir_full_path=`cd $base_dir; pwd`       # full path of the root of this test suite
-readonly config_dir=${base_dir}/config
-
-readonly test_start_time="$(date +%s)"                # time starting the test
-max_reelection_latency_ms=0
-min_reelection_latency_ms=10000
-sum_reelection_latency_ms=0
-reelection_counter=0
-
-# ====================================
-# No need to change the following
-# configurations in most cases
-# ====================================
-readonly num_kafka_server=3                           # same no. of property files such as server_{1..n}.properties
-                                                      # will be automatically generated
-readonly replica_factor=3                             # should be less than or equal to "num_kafka_server"
-readonly my_brokerid_to_start=0                       # this should be '0' for now
-readonly my_server_port_to_start=9091                 # if using this default, the ports to be used will be 9091, 9092, ...
-readonly producer_msg_batch_size=100                  # batch no. of messsages by producer
-readonly consumer_timeout_ms=10000                    # elapsed time for consumer to timeout and exit
-
-readonly test_topic=mytest
-readonly max_wait_for_consumer_complete=30
-readonly zk_prop_pathname=${config_dir}/zookeeper.properties
-readonly zk_log4j_log_pathname=${base_dir}/zookeeper.log
-
-readonly producer_prop_pathname=${config_dir}/producer.properties
-readonly consumer_prop_pathname=${config_dir}/consumer.properties
-
-readonly producer_perf_log_pathname=${base_dir}/producer_perf_output.log
-readonly producer_perf_mid_log_pathname=${base_dir}/producer_perf_mid.log
-readonly producer_perf_mid_sorted_log_pathname=${base_dir}/producer_perf_mid_sorted.log
-readonly producer_perf_mid_sorted_uniq_log_pathname=${base_dir}/producer_perf_mid_sorted_uniq.log
-
-readonly console_consumer_log_pathname=${base_dir}/console_consumer.log
-readonly console_consumer_mid_log_pathname=${base_dir}/console_consumer_mid.log
-readonly console_consumer_mid_sorted_log_pathname=${base_dir}/console_consumer_mid_sorted.log
-readonly console_consumer_mid_sorted_uniq_log_pathname=${base_dir}/console_consumer_mid_sorted_uniq.log
-
-readonly this_test_stderr_output_log_pathname=${base_dir}/this_test_stderr_output.log
-
-# ====================================
-# arrays for kafka brokers properties
-# ====================================
-kafka_data_log_dirs=
-kafka_log4j_log_pathnames=
-kafka_prop_pathnames=
-kafka_brokerids=
-kafka_sock_ports=
-#kafka_first_data_file_sizes=
-#kafka_first_data_file_checksums=
-
-# ====================================
-# Misc
-# ====================================
-zk_port=
-zk_data_log_dir=
-pid_zk=
-kafka_pids=
-test_failure_counter=0
-leader_elected_timestamp=
-jmx_ports=
-jmx_ports[1]=9990
-jmx_ports[2]=9991
-jmx_ports[3]=9992
-
-initialize() {
-    info "initializing ..."
-
-    zk_port=`grep clientPort ${zk_prop_pathname} | awk -F '=' '{print $2}'`
-    zk_data_log_dir=`grep dataDir ${zk_prop_pathname} | awk -F '=' '{print $2}'`
-
-    for ((i=1; i<=$num_kafka_server; i++))
-    do
-        kafka_log4j_log_pathnames[${i}]=$base_dir/kafka_server_${i}.log
-        kafka_prop_pathnames[${i}]=${config_dir}/server_${i}.properties
-
-        kafka_data_log_dirs[${i}]=`grep ^log.dir ${kafka_prop_pathnames[${i}]} | awk -F '=' '{print $2}'`
-        kafka_brokerids[${i}]=`grep ^brokerid= ${kafka_prop_pathnames[${i}]} | awk -F '=' '{print $2}'`
-        kafka_sock_ports[${i}]=`grep ^port= ${kafka_prop_pathnames[${i}]} | awk -F '=' '{print $2}'`
-
-        info "kafka $i data dir   : ${kafka_data_log_dirs[$i]}"
-        info "kafka $i log4j log  : ${kafka_log4j_log_pathnames[$i]}"
-        info "kafka $i prop file  : ${kafka_prop_pathnames[$i]}"
-        info "kafka $i brokerid   : ${kafka_brokerids[$i]}"
-        info "kafka $i socket     : ${kafka_sock_ports[$i]}"
-        echo
-    done
-
-    info "zookeeper port     : $zk_port"
-    info "zookeeper data dir : $zk_data_log_dir"
-    echo
-}
-
-cleanup() {
-    info "cleaning up kafka server log/data dir"
-    for ((i=1; i<=$num_kafka_server; i++))
-    do
-        rm -rf ${kafka_data_log_dirs[$i]}
-        rm -f ${kafka_log4j_log_pathnames[$i]}
-    done
-
-    rm -rf $zk_data_log_dir
-    rm -f $zk_log4j_log_pathname
-    rm -f $this_test_stderr_output_log_pathname
-
-    rm -f $producer_perf_log_pathname
-    rm -f $producer_perf_mid_log_pathname
-    rm -f $producer_perf_mid_sorted_log_pathname
-    rm -f $producer_perf_mid_sorted_uniq_log_pathname
-
-    rm -f $console_consumer_log_pathname
-    rm -f $console_consumer_mid_log_pathname
-    rm -f $console_consumer_mid_sorted_log_pathname
-    rm -f $console_consumer_mid_sorted_uniq_log_pathname
-}
-
-get_leader_brokerid() {
-    log_line=`grep -i -h 'completed the leader state transition' ${base_dir}/kafka_server_*.log | sort | tail -1`
-    info "found the log line: $log_line"
-    broker_id=`echo $log_line | sed s'/^.*INFO Replica Manager on Broker //g' | awk -F ':' '{print $1}'`
-
-    return $broker_id
-}
-
-get_elected_leader_unix_timestamp() {
-    log_line=`grep -i -h 'completed the leader state transition' ${base_dir}/kafka_server_*.log | sort | tail -1`
-    info "found the log line: $log_line"
-
-    this_timestamp=`echo $log_line | cut -f2 -d '[' | cut -f1 -d ']'`
-    elected_leader_unix_timestamp_ms=`echo $this_timestamp | cut -f2 -d ','`
-    this_timestamp=`echo ${this_timestamp%,*}`
-
-    if [ "x${osname}" == "xDarwin" ]; then
-        elected_leader_unix_timestamp=`date -j -f "%Y-%M-%d %H:%M:%S" "$this_timestamp" +%s`
-    else
-        elected_leader_unix_timestamp=`date -d "$this_timestamp" +%s`
-    fi
-
-    full_elected_leader_unix_timestamp="${elected_leader_unix_timestamp}.${elected_leader_unix_timestamp_ms}"
-}
-
-get_server_shutdown_unix_timestamp() {
-    s_idx=$1
-
-    log_line=`grep -i -h 'shut down completed' ${kafka_log4j_log_pathnames[$s_idx]} | tail -1`
-    info "found the log line: $log_line"
-
-    this_timestamp=`echo $log_line | cut -f2 -d '[' | cut -f1 -d ']'`
-    server_shutdown_unix_timestamp_ms=`echo $this_timestamp | cut -f2 -d ','`
-    this_timestamp=`echo ${this_timestamp%,*}`
-
-    if [ "x${osname}" == "xDarwin" ]; then
-        server_shutdown_unix_timestamp=`date -j -f "%Y-%M-%d %H:%M:%S" "$this_timestamp" +%s`
-    else
-        server_shutdown_unix_timestamp=`date -d "$this_timestamp" +%s`
-    fi
-
-    full_server_shutdown_unix_timestamp="${server_shutdown_unix_timestamp}.${server_shutdown_unix_timestamp_ms}"
-}
-
-start_zk() {
-    info "starting zookeeper"
-    $base_dir/../../bin/zookeeper-server-start.sh $zk_prop_pathname \
-        2>&1 > ${zk_log4j_log_pathname} &
-    pid_zk=$!
-}
-
-stop_server() {
-    s_idx=$1
-
-    info "stopping server: $s_idx"
-
-    if [ "x${kafka_pids[${s_idx}]}" != "x" ]; then
-        kill_child_processes 0 ${kafka_pids[${s_idx}]};
-    fi
-
-    kafka_pids[${s_idx}]=
-}
-
-start_server() {
-    s_idx=$1
-
-    info "starting kafka server on jmx port ${jmx_ports[${s_idx}]}"
-    JMX_PORT=${jmx_ports[${s_idx}]} $base_dir/bin/kafka-run-class.sh kafka.Kafka ${kafka_prop_pathnames[$s_idx]} \
-        2>&1 >> ${kafka_log4j_log_pathnames[$s_idx]} &
-    kafka_pids[${s_idx}]=$!
-    info "  -> kafka_pids[$s_idx]: ${kafka_pids[$s_idx]}"
-}
-
-start_servers_cluster() {
-    info "starting cluster"
-
-    for ((i=1; i<=$num_kafka_server; i++)) 
-    do
-        start_server $i
-    done
-}
-
-start_producer_perf() {
-    this_topic=$1
-    broker_list_str=$2
-    no_msg_to_produce=$3
-    init_msg_id=$4
-
-    info "starting producer performance"
-
-    ${base_dir}/bin/kafka-run-class.sh kafka.perf.ProducerPerformance \
-        --broker-list ${broker_list_str} \
-        --topic ${this_topic} \
-        --messages $no_msg_to_produce \
-        --message-size 100 \
-        --threads 1 \
-        --initial-message-id $init_msg_id \
-        2>&1 >> $producer_perf_log_pathname
-}
-
-start_console_consumer() {
-    this_consumer_topic=$1
-    this_zk_conn_str=$2
-
-    info "starting console consumer"
-    $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
-        --zookeeper $this_zk_conn_str \
-        --topic $this_consumer_topic \
-        --formatter 'kafka.consumer.ConsoleConsumer$DecodedMessageFormatter' \
-        --consumer-timeout-ms $consumer_timeout_ms \
-        --from-beginning \
-        2>&1 >> $console_consumer_log_pathname &
-}
-
-shutdown_servers() {
-
-    info "shutting down servers"
-    for ((i=1; i<=$num_kafka_server; i++))
-    do
-        if [ "x${kafka_pids[$i]}" != "x" ]; then
-            kill_child_processes 0 ${kafka_pids[$i]};
-        fi
-    done
-
-    info "shutting down zookeeper servers"
-    if [ "x${pid_zk}" != "x" ]; then kill_child_processes 0 ${pid_zk}; fi
-
-    force_shutdown_producer
-    force_shutdown_consumer
-
-     # running processes are not terminated properly in a Hudson job,
-     # this is a temporary workaround to kill all processes
-     `ps axuw | grep "kafka\|run\-" | grep -v grep | grep -v slave | grep -v vi | grep -v "run\-test\.sh" | awk '{print $2}' | xargs kill -9`
-
-}
-
-force_shutdown_producer() {
-    info "force shutdown producer"
-    `ps auxw | grep ProducerPerformance | awk '{print $2}' | xargs kill -9 2> /dev/null`
-}
-
-force_shutdown_consumer() {
-    info "force shutdown consumer"
-    `ps auxw | grep ConsoleConsumer | awk '{print $2}' | xargs kill -9 2> /dev/null`
-}
-
-create_topic() {
-    this_topic_to_create=$1
-    this_zk_conn_str=$2
-    this_replica_factor=$3
-
-    info "creating topic [$this_topic_to_create] on [$this_zk_conn_str]"
-    $base_dir/../../bin/kafka-create-topic.sh --topic $this_topic_to_create \
-        --zookeeper $this_zk_conn_str --replica $this_replica_factor
-}
-
-validate_results() {
-
-    echo
-    info "========================================================"
-    info "VALIDATING TEST RESULTS"
-    info "========================================================"
-
-    # get the checksums and sizes of the replica data files
-    for ((i=1; i<=$num_kafka_server; i++))
-    do
-        first_data_file_dir=${kafka_data_log_dirs[$i]}/${test_topic}-0
-        first_data_file=`ls ${first_data_file_dir} | head -1`
-        first_data_file_pathname=${first_data_file_dir}/$first_data_file
-        kafka_first_data_file_sizes[$i]=`ls -l ${first_data_file_pathname} | awk '{print $5}'`
-        kafka_first_data_file_checksums[$i]=`cksum ${first_data_file_pathname} | awk '{print $1}'`
-        info "## broker[$i] data file: ${first_data_file_pathname} : [${kafka_first_data_file_sizes[$i]}]"
-        info "##     ==> crc ${kafka_first_data_file_checksums[$i]}"
-    done
-
-    # get the MessageID from messages produced and consumed, sort them and output to log files
-    grep MessageID $console_consumer_log_pathname | sed s'/^.*MessageID://g' | awk -F ':' '{print $1}' > $console_consumer_mid_log_pathname
-    grep MessageID $producer_perf_log_pathname    | sed s'/^.*MessageID://g' | awk -F ':' '{print $1}' > $producer_perf_mid_log_pathname
-
-    sort $console_consumer_mid_log_pathname > $console_consumer_mid_sorted_log_pathname
-    sort $producer_perf_mid_log_pathname    > $producer_perf_mid_sorted_log_pathname
-
-    sort -u $console_consumer_mid_sorted_log_pathname > $console_consumer_mid_sorted_uniq_log_pathname
-    sort -u $producer_perf_mid_sorted_log_pathname    > $producer_perf_mid_sorted_uniq_log_pathname
-
-    msg_count_from_console_consumer=`cat $console_consumer_mid_log_pathname | wc -l | tr -d ' '`
-    uniq_msg_count_from_console_consumer=`cat $console_consumer_mid_sorted_uniq_log_pathname | wc -l | tr -d ' '`
-
-    msg_count_from_producer_perf=`cat $producer_perf_mid_log_pathname | wc -l | tr -d ' '`
-    uniq_msg_count_from_producer_perf=`cat $producer_perf_mid_sorted_uniq_log_pathname | wc -l | tr -d ' '`
-
-    # report the findings
-    echo
-    info "## no. of messages published            : $msg_count_from_producer_perf"
-    info "## producer unique msg published        : $uniq_msg_count_from_producer_perf"
-    info "## console consumer msg rec'd           : $msg_count_from_console_consumer"
-    info "## console consumer unique msg rec'd    : $uniq_msg_count_from_console_consumer"
-    echo
-
-    validation_start_unix_ts=`date +%s`
-    curr_unix_ts=`date +%s`
-    size_unmatched_idx=1
-
-    # do a while-loop to check every 5 sec if the replica file sizes are matched
-    # (up to the value of $max_wait_for_consumer_complete)
-    while [[ $(( $curr_unix_ts - $validation_start_unix_ts )) -le $max_wait_for_consumer_complete && $size_unmatched_idx -gt 0 ]]
-    do
-        info "wait 5s (up to ${max_wait_for_consumer_complete}s) and check replicas data sizes"
-        sleep 5
-        
-        first_element_value=${kafka_first_data_file_sizes[1]}
-        for ((i=2; i<=${#kafka_first_data_file_sizes[@]}; i++))
-        do
-            if [ $first_element_value -ne ${kafka_first_data_file_sizes[$i]} ]; then
-                size_unmatched_idx=1
-                break
-            else
-                size_unmatched_idx=0
-            fi
-        done
-
-        curr_unix_ts=`date +%s`
-    done
-    echo
-
-    # validate that sizes of all replicas should match
-    first_element_value=${kafka_first_data_file_sizes[1]}
-    for ((i=2; i<=${#kafka_first_data_file_sizes[@]}; i++))
-    do
-        if [ ${kafka_first_data_file_sizes[$i]} -eq 0 ]; then
-            info "## FAILURE: File[$i] zero file size found"
-        elif [ $first_element_value -ne ${kafka_first_data_file_sizes[$i]} ]; then
-             info "## FAILURE: Unmatched size found"
-             test_failure_counter=$(( $test_failure_counter + 1 ))
-        else
-            info "## PASSED: Data files sizes matched"
-         fi
-    done
-
-    # validate that checksums of all replicas should match
-    first_element_value=${kafka_first_data_file_checksums[1]}
-    for ((i=2; i<=${#kafka_first_data_file_checksums[@]}; i++))
-    do
-        if [ ${kafka_first_data_file_sizes[$i]} -eq 0 ]; then
-            info "## FAILURE: Checksum cannot be validated because file[$i] zero file size found"
-        elif [ $first_element_value -ne ${kafka_first_data_file_checksums[$i]} ]; then
-             info "## FAILURE: Unmatched checksum found"
-             test_failure_counter=$(( $test_failure_counter + 1 ))
-        else
-            info "## PASSED: Data files checksums matched"
-         fi
-    done
-
-    # validate that there is no data loss
-    if [ $uniq_msg_count_from_producer_perf -ne $uniq_msg_count_from_console_consumer ]; then
-        info "## FAILURE: Data loss found"
-        test_failure_counter=$(( $test_failure_counter + 1 ))
-    else
-        info "## PASSED: Message counts matched"
-    fi
-
-    avg_reelection_latency_ms=`echo "$sum_reelection_latency_ms / $reelection_counter" | bc`
-    info "## Max latency : $max_reelection_latency_ms ms"
-    info "## Min latency : $min_reelection_latency_ms ms"
-    info "## Avg latency : $avg_reelection_latency_ms ms"
-
-    # report PASSED or FAILED
-    info "========================================================"
-    if [ $test_failure_counter -eq 0 ]; then
-        info "## Test PASSED"
-        exit 0
-    else
-        info "## Test FAILED"
-        exit 1
-    fi
-    info "========================================================"
-}
-
-
-start_test() {
-    echo
-    info "======================================="
-    info "####  Kafka Replicas System Test   ####"
-    info "======================================="
-    echo
-
-    # Ctrl-c trap. Catches INT signal
-    trap "force_shutdown_producer; force_shutdown_consumer; shutdown_servers; exit 0" INT
-    trap "force_shutdown_producer; force_shutdown_consumer; shutdown_servers; exit 0" TERM
-    trap "force_shutdown_producer; force_shutdown_consumer; shutdown_servers; exit 0" KILL
-
-
-    generate_kafka_properties_files $base_dir_full_path $num_kafka_server $my_brokerid_to_start $my_server_port_to_start 
-
-    initialize
-
-    cleanup
-    sleep 2
-
-    start_zk
-    sleep 2
-
-    start_servers_cluster
-    sleep 2
-
-    create_topic $test_topic localhost:$zk_port $replica_factor 2> $this_test_stderr_output_log_pathname
-
-    info "sleeping for 5s"
-    sleep 5 
-    echo
-
-    for ((i=1; i<=$num_kafka_server; i++))
-    do
-        echo
-        info "======================================="
-        info "Iteration $i of $num_kafka_server"
-        info "======================================="
-        echo
-        info "looking up leader"
-        get_leader_brokerid
-        ldr_bkr_id=$?
-        info "current leader's broker id : $ldr_bkr_id"
-
-        svr_idx=$(($ldr_bkr_id))
-
-        stop_server $svr_idx
-        info "sleeping for 10s"
-        sleep 10
-
-        get_server_shutdown_unix_timestamp $svr_idx
-        get_elected_leader_unix_timestamp
-
-        reelected_leader_latency=`echo "$full_elected_leader_unix_timestamp - $full_server_shutdown_unix_timestamp" | bc`
-        reelected_leader_latency_ms_float=`echo "$reelected_leader_latency * 1000" | bc`
-        reelected_leader_latency_ms=${reelected_leader_latency_ms_float/.*}
-
-        info "---------------------------------------"
-        info "leader re-election latency : $reelected_leader_latency_ms ms"
-        info "---------------------------------------"
-
-        sum_reelection_latency_ms=$(($sum_reelection_latency_ms + $reelected_leader_latency_ms))
-        reelection_counter=$(($reelection_counter + 1))
-
-        # update $max_reelection_latency_ms
-        if [ $reelected_leader_latency_ms -gt $max_reelection_latency_ms ]; then
-            max_reelection_latency_ms=$reelected_leader_latency_ms
-        fi
-
-        # update $min_reelection_latency_ms
-        if [ $reelected_leader_latency_ms -le $min_reelection_latency_ms ]; then
-            min_reelection_latency_ms=$reelected_leader_latency_ms
-        fi
-
-        init_id=$(( ($i - 1) * $producer_msg_batch_size ))
-        start_producer_perf $test_topic localhost:9091,localhost:9092,localhost:9093 $producer_msg_batch_size $init_id
-        info "sleeping for 15s"
-        sleep 15
-        echo
-
-        start_server $svr_idx
-        info "sleeping for 30s"
-        sleep 30
-    done
-
-    start_console_consumer $test_topic localhost:$zk_port
-    info "sleeping for 30s"
-    sleep 30
-
-    shutdown_servers
-    echo
-
-    validate_results
-    echo
-}
-
-# =================================================
-# Main Test
-# =================================================
-
-start_test
Index: system_test/single_host_multi_brokers/bin/kafka-run-class.sh
===================================================================
--- system_test/single_host_multi_brokers/bin/kafka-run-class.sh	(revision 1378227)
+++ system_test/single_host_multi_brokers/bin/kafka-run-class.sh	(working copy)
@@ -1,67 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-if [ $# -lt 1 ];
-then
-  echo "USAGE: $0 classname [opts]"
-  exit 1
-fi
-
-base_dir=$(dirname $0)/..
-kafka_inst_dir=${base_dir}/../..
-
-for file in $kafka_inst_dir/project/boot/scala-2.8.0/lib/*.jar;
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
-for file in $kafka_inst_dir/core/target/scala_2.8.0/*.jar;
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
-for file in $kafka_inst_dir/core/lib/*.jar;
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
-for file in $kafka_inst_dir/perf/target/scala_2.8.0/kafka*.jar;
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
-for file in $kafka_inst_dir/core/lib_managed/scala_2.8.0/compile/*.jar;
-do
-  if [ ${file##*/} != "sbt-launch.jar" ]; then
-    CLASSPATH=$CLASSPATH:$file
-  fi
-done
-if [ -z "$KAFKA_JMX_OPTS" ]; then
-  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "
-fi
-if [ -z "$KAFKA_OPTS" ]; then
-  KAFKA_OPTS="-Xmx512M -server  -Dlog4j.configuration=file:$base_dir/config/log4j.properties"
-fi
-if [  $JMX_PORT ]; then
-  KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
-fi
-if [ -z "$JAVA_HOME" ]; then
-  JAVA="java"
-else
-  JAVA="$JAVA_HOME/bin/java"
-fi
-
-$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@
Index: system_test/single_host_multi_brokers/README
===================================================================
--- system_test/single_host_multi_brokers/README	(revision 1378227)
+++ system_test/single_host_multi_brokers/README	(working copy)
@@ -1,39 +0,0 @@
-==================
-Test Descriptions
-==================
-This test suite performs Kafka system test on the replication feature as described here:
-1. Start the Kafka cluster
-2. Create topic
-3. Find the leader
-4. Stop the leader in Step 3
-5. Send n messages
-6. Consume the messages
-7. Start the leader in Step 3
-8. Goto Step 3 for all servers in the cluster
-9. Validate test results
-
-==================
-Quick Start
-==================
-1. Modify the values of "num_kafka_server" & "replica_factor" as needed.
-2. Execute the test as: 
-   <kafka home>/system_test/single_host_multi_brokers $ bin/run-test.sh
-
-==================
-Expected Results
-==================
-The following items should match:
-1. The checksums of the data files in all replicas
-2. The sizes of the data files in all replicas
-3. The no. of messages produced and consumed
-
-==================
-Notes
-==================
-1. There is no need to copy and paste the config/server.properties
-   files to match the no. of brokers in the Kafka cluster.
-2. The required no. of server properties files will be automatically
-   generated according to the value of "num_kafka_server".
-3. The default values in the generated properties files will be
-   copied from the settings in config/server.properties while the
-   brokerid and server port will be incremented accordingly.
Index: system_test/utils/kafka_system_test_utils.py
===================================================================
--- system_test/utils/kafka_system_test_utils.py	(revision 1378227)
+++ system_test/utils/kafka_system_test_utils.py	(working copy)
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/env python
 
 # ===================================
@@ -16,6 +32,7 @@
 import traceback
 
 import system_test_utils
+import metrics
 
 from datetime  import datetime
 from time      import mktime
@@ -79,19 +96,22 @@
 
     if not os.path.exists(testcasePathName + "/config") : os.makedirs(testcasePathName + "/config")
     if not os.path.exists(testcasePathName + "/logs")   : os.makedirs(testcasePathName + "/logs")
+    if not os.path.exists(testcasePathName + "/dashboards")   : os.makedirs(testcasePathName + "/dashboards")
 
-    dashboardsPathName = testcaseEnv.testCaseLogsDir + "/dashboards"
+    dashboardsPathName = testcasePathName + "/dashboards"
     if not os.path.exists(dashboardsPathName) : os.makedirs(dashboardsPathName)
 
     for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
         entityId = clusterEntityConfigDict["entity_id"]
         role     = clusterEntityConfigDict["role"]
 
-        logger.debug("entity_id : " + entityId, extra=d)
-        logger.debug("role      : " + role,     extra=d)
-
         metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics")
         if not os.path.exists(metricsPathName) : os.makedirs(metricsPathName)
+        
+        # create the role directory under dashboards
+        dashboardsRoleDir = dashboardsPathName + "/" + role
+        if not os.path.exists(dashboardsRoleDir) : os.makedirs(dashboardsRoleDir)
+        
 
 
 def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv):
@@ -192,40 +212,6 @@
         outfile.write(line)
     outfile.close()
 
-
-def start_metrics_collection(jmxHost, jmxPort, mBeanObjectName, mBeanAttributes, entityId, clusterEntityConfigDictList, testcaseEnv):
-    logger.info("starting metrics collection on jmx port: " + jmxPort, extra=d)
-    jmxUrl    = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi"
-    kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
-    javaHome  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "java_home")
-    metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", entityId, "metrics")
-
-    startMetricsCmdList = ["ssh " + jmxHost,
-                           "'JAVA_HOME=" + javaHome,
-                           "JMX_PORT= " + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool",
-                           "--jmx-url " + jmxUrl,
-                           "--object-name " + mBeanObjectName + " &> ",
-                           metricsPathName + "/metrics.csv & echo pid:$! > ",
-                           metricsPathName + "/entity_pid'"]
-   
-    startMetricsCommand = " ".join(startMetricsCmdList) 
-    logger.debug("executing command: [" + startMetricsCommand + "]", extra=d)
-    system_test_utils.async_sys_call(startMetricsCommand)
-
-    pidCmdStr = "ssh " + jmxHost + " 'cat " + metricsPathName + "/entity_pid'"
-    logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
-    subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
-
-    # keep track of the remote entity pid in a dictionary
-    for line in subproc.stdout.readlines():
-        if line.startswith("pid"):
-            line = line.rstrip('\n')
-            logger.debug("found pid line: [" + line + "]", extra=d)
-            tokens  = line.split(':')
-            thisPid = tokens[1]
-            testcaseEnv.entityParentPidDict[thisPid] = thisPid
-
-
 def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv):
     logger.info("calling generate_properties_files", extra=d)
 
@@ -402,6 +388,7 @@
     if role == "zookeeper":
         cmdList = ["ssh " + hostname,
                   "'JAVA_HOME=" + javaHome,
+                  "JMX_PORT=" + jmxPort,
                   kafkaHome + "/bin/zookeeper-server-start.sh ",
                   configPathName + "/" + configFile + " &> ",
                   logPathName + "/" + logFile + " & echo pid:$! > ",
@@ -418,7 +405,7 @@
     elif role == "broker":
         cmdList = ["ssh " + hostname,
                   "'JAVA_HOME=" + javaHome,
-                  "JMX_PORT=" + jmxPort,
+                 "JMX_PORT=" + jmxPort,
                   kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka",
                   configPathName + "/" + configFile + " &> ",
                   logPathName + "/" + logFile + " & echo pid:$! > ",
@@ -446,10 +433,8 @@
             tokens = line.split(':')
             testcaseEnv.entityParentPidDict[entityId] = tokens[1]
 
-    # if it is a broker, start metric collection
-    if role == "broker":
-        start_metrics_collection(hostname, jmxPort, "kafka:type=kafka.SocketServerStats", \
-            "AvgFetchRequestMs, AvgProduceRequestMs", entityId, clusterEntityConfigDictList, testcaseEnv)
+    time.sleep(1)
+    metrics.start_metrics_collection(hostname, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
 
 def start_console_consumer(systemTestEnv, testcaseEnv):
@@ -460,6 +445,8 @@
     for consumerConfig in consumerConfigList:
         host              = consumerConfig["hostname"]
         entityId          = consumerConfig["entity_id"]
+        jmxPort           = consumerConfig["jmx_port"] 
+        role              = consumerConfig["role"] 
         kafkaHome         = system_test_utils.get_data_by_lookup_keyval( \
                                 clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
         javaHome          = system_test_utils.get_data_by_lookup_keyval( \
@@ -476,15 +463,17 @@
         commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["consumerConfigPathName"])
         cmdList = ["ssh " + host,
                    "'JAVA_HOME=" + javaHome,
+                   "JMX_PORT=" + jmxPort,
                    kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
                    commandArgs + " &> " + consumerLogPathName + "'"]
 
         cmdStr = " ".join(cmdList)
         logger.debug("executing command: [" + cmdStr + "]", extra=d)
-        system_test_utils.sys_call(cmdStr)
+        system_test_utils.async_sys_call(cmdStr)
+        time.sleep(2)
+        metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
 
-
 def start_producer_performance(systemTestEnv, testcaseEnv):
 
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
@@ -494,6 +483,8 @@
     for producerConfig in producerConfigList:
         host              = producerConfig["hostname"]
         entityId          = producerConfig["entity_id"]
+        jmxPort           = producerConfig["jmx_port"] 
+        role              = producerConfig["role"] 
         kafkaHome         = system_test_utils.get_data_by_lookup_keyval( \
                                 clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
         javaHome          = system_test_utils.get_data_by_lookup_keyval( \
@@ -510,12 +501,15 @@
         commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"])
         cmdList = ["ssh " + host,
                    "'JAVA_HOME=" + javaHome,
+                   "JMX_PORT=" + jmxPort,
                    kafkaRunClassBin + " kafka.perf.ProducerPerformance",
                    commandArgs + " &> " + producerLogPathName + "'"]
 
         cmdStr = " ".join(cmdList)
         logger.debug("executing command: [" + cmdStr + "]", extra=d)
-        system_test_utils.sys_call(cmdStr) 
+        system_test_utils.async_sys_call(cmdStr)
+        time.sleep(1)
+        metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
 
 def stop_remote_entity(systemTestEnv, entityId, parentPid):
@@ -525,8 +519,8 @@
     pidStack  = system_test_utils.get_remote_child_processes(hostname, parentPid)
 
     logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
-    system_test_utils.sigterm_remote_process(hostname, pidStack)
-    time.sleep(1)
+#    system_test_utils.sigterm_remote_process(hostname, pidStack)
+#    time.sleep(1)
     system_test_utils.sigkill_remote_process(hostname, pidStack)
 
 
@@ -632,10 +626,9 @@
             validationStatusDict["Validate leader election successful"] = "PASSED"
             return True
         except Exception, e:
-            logger.error("leader info not completed:", extra=d)
+            logger.error("leader info not completed: {0}".format(e), extra=d)
+            traceback.print_exc()
             validationStatusDict["Validate leader election successful"] = "FAILED"
-            print leaderDict
-            print e
             return False
     else:
         validationStatusDict["Validate leader election successful"] = "FAILED"
@@ -677,5 +670,14 @@
         logger.debug("executing command [" + cmdStr + "]", extra=d)
         system_test_utils.sys_call(cmdStr)
 
+def get_entity_log_directory(testCaseBaseDir, entity_id, role):
+    return testCaseBaseDir + "/logs/" + role + "-" + entity_id
+
+def get_entities_for_role(clusterConfig, role):
+    return filter(lambda entity: entity['role'] == role, clusterConfig)
     
+def stop_consumer():
+    system_test_utils.sys_call("ps -ef | grep ConsoleConsumer | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15")
 
+def stop_producer():
+    system_test_utils.sys_call("ps -ef | grep ProducerPerformance | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15")
Index: system_test/utils/setup_utils.py
===================================================================
--- system_test/utils/setup_utils.py	(revision 1378227)
+++ system_test/utils/setup_utils.py	(working copy)
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/env python
 
 import logging
Index: system_test/utils/pyh.py
===================================================================
--- system_test/utils/pyh.py	(revision 0)
+++ system_test/utils/pyh.py	(revision 0)
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# @file: pyh.py
+# @purpose: a HTML tag generator
+# @author: Emmanuel Turlay <turlay@cern.ch>
+
+__doc__ = """The pyh.py module is the core of the PyH package. PyH lets you
+generate HTML tags from within your python code.
+See http://code.google.com/p/pyh/ for documentation.
+"""
+__author__ = "Emmanuel Turlay <turlay@cern.ch>"
+__version__ = '$Revision: 63 $'
+__date__ = '$Date: 2010-05-21 03:09:03 +0200 (Fri, 21 May 2010) $'
+
+from sys import _getframe, stdout, modules, version
+nOpen={}
+
+nl = '\n'
+doctype = '<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">\n'
+charset = '<meta http-equiv="Content-Type" content="text/html;charset=utf-8" />\n'
+
+tags = ['html', 'body', 'head', 'link', 'meta', 'div', 'p', 'form', 'legend', 
+        'input', 'select', 'span', 'b', 'i', 'option', 'img', 'script',
+        'table', 'tr', 'td', 'th', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
+        'fieldset', 'a', 'title', 'body', 'head', 'title', 'script', 'br', 'table',
+        'ul', 'li', 'ol', 'embed']
+
+selfClose = ['input', 'img', 'link', 'br']
+
+class Tag(list):
+    tagname = ''
+    
+    def __init__(self, *arg, **kw):
+        self.attributes = kw
+        if self.tagname : 
+            name = self.tagname
+            self.isSeq = False
+        else: 
+            name = 'sequence'
+            self.isSeq = True
+        self.id = kw.get('id', name)
+        #self.extend(arg)
+        for a in arg: self.addObj(a)
+
+    def __iadd__(self, obj):
+        if isinstance(obj, Tag) and obj.isSeq:
+            for o in obj: self.addObj(o)
+        else: self.addObj(obj)
+        return self
+    
+    def addObj(self, obj):
+        if not isinstance(obj, Tag): obj = str(obj)
+        id=self.setID(obj)
+        setattr(self, id, obj)
+        self.append(obj)
+
+    def setID(self, obj):
+        if isinstance(obj, Tag):
+            id = obj.id
+            n = len([t for t in self if isinstance(t, Tag) and t.id.startswith(id)])
+        else:
+            id = 'content'
+            n = len([t for t in self if not isinstance(t, Tag)])
+        if n: id = '%s_%03i' % (id, n)
+        if isinstance(obj, Tag): obj.id = id
+        return id
+
+    def __add__(self, obj):
+        if self.tagname: return Tag(self, obj)
+        self.addObj(obj)
+        return self
+
+    def __lshift__(self, obj):
+        self += obj
+        if isinstance(obj, Tag): return obj
+
+    def render(self):
+        result = ''
+        if self.tagname:
+            result = '<%s%s%s>' % (self.tagname, self.renderAtt(), self.selfClose()*' /')
+        if not self.selfClose():
+            for c in self:
+                if isinstance(c, Tag):
+                    result += c.render()
+                else: result += c
+            if self.tagname: 
+                result += '</%s>' % self.tagname
+        result += '\n'
+        return result
+
+    def renderAtt(self):
+        result = ''
+        for n, v in self.attributes.iteritems():
+            if n != 'txt' and n != 'open':
+                if n == 'cl': n = 'class'
+                result += ' %s="%s"' % (n, v)
+        return result
+
+    def selfClose(self):
+        return self.tagname in selfClose        
+    
+def TagFactory(name):
+    class f(Tag):
+        tagname = name
+    f.__name__ = name
+    return f
+
+thisModule = modules[__name__]
+
+for t in tags: setattr(thisModule, t, TagFactory(t)) 
+
+def ValidW3C():
+    out = a(img(src='http://www.w3.org/Icons/valid-xhtml10', alt='Valid XHTML 1.0 Strict'), href='http://validator.w3.org/check?uri=referer')
+    return out
+
+class PyH(Tag):
+    tagname = 'html'
+    
+    def __init__(self, name='MyPyHPage'):
+        self += head()
+        self += body()
+        self.attributes = dict(xmlns='http://www.w3.org/1999/xhtml', lang='en')
+        self.head += title(name)
+
+    def __iadd__(self, obj):
+        if isinstance(obj, head) or isinstance(obj, body): self.addObj(obj)
+        elif isinstance(obj, meta) or isinstance(obj, link): self.head += obj
+        else:
+            self.body += obj
+            id=self.setID(obj)
+            setattr(self, id, obj)
+        return self
+
+    def addJS(self, *arg):
+        for f in arg: self.head += script(type='text/javascript', src=f)
+
+    def addCSS(self, *arg):
+        for f in arg: self.head += link(rel='stylesheet', type='text/css', href=f)
+    
+    def printOut(self,file=''):
+        if file: f = open(file, 'w')
+        else: f = stdout
+        f.write(doctype)
+        f.write(self.render())
+        f.flush()
+        if file: f.close()
+    
Index: system_test/utils/system_test_utils.py
===================================================================
--- system_test/utils/system_test_utils.py	(revision 1378227)
+++ system_test/utils/system_test_utils.py	(working copy)
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/env python
 
 # ===================================
@@ -314,7 +330,6 @@
 
     return True
 
-
 def copy_source_to_remote_hosts(hostname, sourceDir, destDir):
 
     cmdStr = "rsync -avz --delete-before " + sourceDir + "/ " + hostname + ":" + destDir
Index: system_test/utils/testcase_env.py
===================================================================
--- system_test/utils/testcase_env.py	(revision 1378227)
+++ system_test/utils/testcase_env.py	(working copy)
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/env python
 
 # ===================================
@@ -42,10 +58,10 @@
         self.systemTestBaseDir = systemTestEnv.SYSTEM_TEST_BASE_DIR
 
         # to be initialized in the Test Module
-        self.testSuiteBaseDir  = ""
-        self.testCaseBaseDir   = ""
-        self.testCaseLogsDir   = ""
-
+        self.testSuiteBaseDir      = ""
+        self.testCaseBaseDir       = ""
+        self.testCaseLogsDir       = ""
+        self.testCaseDashboardsDir = ""
         # ================================
         # dictionary to keep track of
         # user-defined environment variables
Index: system_test/utils/metrics.py
===================================================================
--- system_test/utils/metrics.py	(revision 0)
+++ system_test/utils/metrics.py	(revision 0)
@@ -0,0 +1,256 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#!/usr/bin/env python
+
+# ===================================
+# file: metrics.py
+# ===================================
+
+import inspect
+import json
+import logging
+import os
+import signal
+import subprocess
+import sys
+import traceback
+
+import csv
+import time 
+import matplotlib.pyplot as plt
+from collections import namedtuple
+import numpy
+
+from pyh import *
+import kafka_system_test_utils
+import system_test_utils
+
+logger     = logging.getLogger("namedLogger")
+thisClassName = '(metrics)'
+d = {'name_of_class': thisClassName}
+
+def read_metrics_definition(metricsFile):
+    metricsFileData = open(metricsFile, "r").read()
+    metricsJsonData = json.loads(metricsFileData)
+    allDashboards = metricsJsonData['dashboards']
+    allGraphs = []
+    for dashboard in allDashboards:
+        dashboardName = dashboard['name']
+        graphs = dashboard['graphs']
+        for graph in graphs:
+            bean = graph['bean_name']
+            allGraphs.append(graph)
+            attributes = graph['attributes']
+            #print "Filtering on attributes " + attributes     
+    return allGraphs
+            
+def get_dashboard_definition(metricsFile, role):
+    metricsFileData = open(metricsFile, "r").read()
+    metricsJsonData = json.loads(metricsFileData)
+    allDashboards = metricsJsonData['dashboards']
+    dashboardsForRole = []
+    for dashboard in allDashboards:
+        if dashboard['role'] == role:
+            dashboardsForRole.append(dashboard) 
+    return dashboardsForRole
+
+def ensure_valid_headers(headers, attributes):
+    if headers[0] != "time":
+        raise Exception("First column should be time")
+    for header in headers:
+        logger.debug(header, extra=d)
+    # there should be exactly one column with a name that matches attributes
+    try:
+        attributeColumnIndex = headers.index(attributes)
+        return attributeColumnIndex
+    except ValueError as ve:
+        raise Exception("There should be exactly one column that matches attribute: {0} in".format(attributes) +  
+                        " headers: {0}".format(",".join(headers)))
+        
+def plot_graphs(inputCsvFiles, labels, title, xLabel, yLabel, attribute, outputGraphFile):
+    # create empty plot
+    fig=plt.figure()
+    fig.subplots_adjust(bottom=0.2)
+    ax=fig.add_subplot(111)
+    labelx = -0.3  # axes coords
+    ax.set_xlabel(xLabel)
+    ax.set_ylabel(yLabel)
+    ax.grid()
+    #ax.yaxis.set_label_coords(labelx, 0.5)
+    Coordinates = namedtuple("Coordinates", 'x y')
+    plots = []
+    coordinates = []
+    # read data for all files, organize by label in a dict
+    for fileAndLabel in zip(inputCsvFiles, labels):
+        inputCsvFile = fileAndLabel[0]
+        label = fileAndLabel[1]
+        csv_reader = list(csv.reader(open(inputCsvFile, "rb")))
+        x,y = [],[]
+        xticks_labels = []
+        try:
+            # read first line as the headers
+            headers = csv_reader.pop(0)
+            attributeColumnIndex = ensure_valid_headers(headers, attribute)
+            logger.debug("Column index for attribute {0} is {1}".format(attribute, attributeColumnIndex), extra=d)
+            start_time = int(csv_reader[0][0])
+            for line in csv_reader:
+                yVal = float(line[attributeColumnIndex])                
+                xVal = (int(line[0])-start_time)/1000
+                y.append(yVal)
+                epoch=int(line[0])/1000
+                x.append(xVal)
+                xticks_labels.append(time.strftime("%H:%M:%S", time.localtime(epoch)))
+                coordinates.append(Coordinates(xVal, yVal))
+            p1 = ax.plot(x,y)
+            plots.append(p1)
+        except Exception as e:
+            logger.error("ERROR while plotting data for " + inputCsvFile, extra=d)
+            traceback.print_exc()
+    # find xmin, xmax, ymin, ymax from all csv files
+    xmin = min(map(lambda coord: coord.x, coordinates))
+    xmax = max(map(lambda coord: coord.x, coordinates))
+    ymin = min(map(lambda coord: coord.y, coordinates))
+    ymax = max(map(lambda coord: coord.y, coordinates))
+    # set x and y axes limits
+    plt.xlim(xmin, xmax)
+    plt.ylim(ymin, ymax)
+    # set ticks accordingly
+    xticks = numpy.arange(xmin, xmax, 0.2*xmax)
+#    yticks = numpy.arange(ymin, ymax)
+    plt.xticks(xticks,xticks_labels,rotation=17)
+#    plt.yticks(yticks)
+    plt.legend(plots,labels, loc=2)
+    plt.title(title)
+    plt.savefig(outputGraphFile)
+
+def draw_all_graphs(metricsDescriptionFile, testcaseEnv, clusterConfig):
+    # go through each role and plot graphs for the role's metrics
+    roles = set(map(lambda config: config['role'], clusterConfig))
+    for role in roles:
+        dashboards = get_dashboard_definition(metricsDescriptionFile, role)
+        entities = kafka_system_test_utils.get_entities_for_role(clusterConfig, role)
+        for dashboard in dashboards:
+            graphs = dashboard['graphs']
+            # draw each graph for all entities
+            draw_graph_for_role(graphs, entities, role, testcaseEnv)
+        
+def draw_graph_for_role(graphs, entities, role, testcaseEnv):
+    for graph in graphs:
+        graphName = graph['graph_name'] 
+        yLabel = graph['y_label']
+        inputCsvFiles = []
+        graphLegendLabels = []
+        for entity in entities:
+            entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entity['entity_id'], "metrics")
+            entityMetricCsvFile = entityMetricsDir + "/" + graph['bean_name'] + ".csv"
+            inputCsvFiles.append(entityMetricCsvFile)
+            graphLegendLabels.append(role + "-" + entity['entity_id'])
+#            print "Plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
+        try:
+            # plot one graph per mbean attribute
+            labels = graph['y_label'].split(',')
+            fullyQualifiedAttributeNames = map(lambda attribute: graph['bean_name'] + ':' + attribute, 
+                                           graph['attributes'].split(','))
+            attributes = graph['attributes'].split(',')
+            for labelAndAttribute in zip(labels, fullyQualifiedAttributeNames, attributes):            
+                outputGraphFile = testcaseEnv.testCaseDashboardsDir + "/" + role + "/" + labelAndAttribute[1] + ".svg"            
+                plot_graphs(inputCsvFiles, graphLegendLabels, graph['graph_name'] + '-' + labelAndAttribute[2], 
+                            "time", labelAndAttribute[0], labelAndAttribute[1], outputGraphFile)
+#            print "Finished plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
+        except Exception as e:
+            logger.error("ERROR while plotting graph {0}: {1}".format(outputGraphFile, e), extra=d)
+            traceback.print_exc()
+
+def build_all_dashboards(metricsDefinitionFile, testcaseDashboardsDir, clusterConfig):
+    metricsHtmlFile = testcaseDashboardsDir + "/metrics.html"
+    centralDashboard = PyH('Kafka Metrics Dashboard')
+    centralDashboard << h1('Kafka Metrics Dashboard', cl='center')
+    roles = set(map(lambda config: config['role'], clusterConfig))
+    for role in roles:
+        entities = kafka_system_test_utils.get_entities_for_role(clusterConfig, role)
+        dashboardPagePath = build_dashboard_for_role(metricsDefinitionFile, role, 
+                                                     entities, testcaseDashboardsDir)
+        centralDashboard << a(role, href = dashboardPagePath)
+        centralDashboard << br()
+            
+    centralDashboard.printOut(metricsHtmlFile)
+
+def build_dashboard_for_role(metricsDefinitionFile, role, entities, testcaseDashboardsDir):
+    # build all dashboards for the input entity's based on its role. It can be one of kafka, zookeeper, producer
+    # consumer
+    dashboards = get_dashboard_definition(metricsDefinitionFile, role)
+    entityDashboard = PyH('Kafka Metrics Dashboard for ' + role)
+    entityDashboard << h1('Kafka Metrics Dashboard for ' + role, cl='center')
+    entityDashboardHtml = testcaseDashboardsDir + "/" + role + "-dashboards.html"
+    for dashboard in dashboards:
+        # place the graph svg files in this dashboard
+        allGraphs = dashboard['graphs']
+        for graph in allGraphs:
+            attributes = map(lambda attribute: graph['bean_name'] + ':' + attribute, 
+                                           graph['attributes'].split(','))
+            for attribute in attributes:                
+                graphFileLocation = testcaseDashboardsDir + "/" + role + "/" + attribute + ".svg"
+                entityDashboard << embed(src = graphFileLocation, type = "image/svg+xml")
+    entityDashboard.printOut(entityDashboardHtml)
+    return entityDashboardHtml
+
+def start_metrics_collection(jmxHost, jmxPort, role, entityId, systemTestEnv, testcaseEnv):
+    logger.info("starting metrics collection on jmx port : " + jmxPort, extra=d)
+    jmxUrl = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi"
+    clusterConfig = systemTestEnv.clusterEntityConfigDictList
+    metricsDefinitionFile = systemTestEnv.METRICS_PATHNAME
+    entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics")
+    dashboardsForRole = get_dashboard_definition(metricsDefinitionFile, role)
+    mbeansForRole = get_mbeans_for_role(dashboardsForRole)
+    
+    kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "kafka_home")
+    javaHome  = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "java_home")
+    
+    for mbean in mbeansForRole:
+        outputCsvFile = entityMetricsDir + "/" + mbean + ".csv"
+        startMetricsCmdList = ["ssh " + jmxHost,
+                               "'JAVA_HOME=" + javaHome,
+                               "JMX_PORT= " + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool",
+                               "--jmx-url " + jmxUrl,
+                               "--object-name " + mbean + " 1> ",
+                                outputCsvFile + " & echo pid:$! > ",
+                                entityMetricsDir + "/entity_pid'"]
+
+        startMetricsCommand = " ".join(startMetricsCmdList) 
+        logger.debug("executing command: [" + startMetricsCommand + "]", extra=d)
+        system_test_utils.async_sys_call(startMetricsCommand)
+            
+        pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid'"
+        logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
+        subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
+
+            # keep track of the remote entity pid in a dictionary
+        for line in subproc.stdout.readlines():
+            if line.startswith("pid"):
+                line = line.rstrip('\n')
+                logger.debug("found pid line: [" + line + "]", extra=d)
+                tokens  = line.split(':')
+                thisPid = tokens[1]
+                testcaseEnv.entityParentPidDict[thisPid] = thisPid
+
+def stop_metrics_collection(jmxHost, jmxPort):
+    logger.info("stopping metrics collection on " + jmxHost + ":" + jmxPort, extra=d)
+    system_test_utils.sys_call("ps -ef | grep JmxTool | grep -v grep | grep " + jmxPort + " | awk '{print $2}' | xargs kill -9")
+
+def get_mbeans_for_role(dashboardsForRole):
+    graphs = reduce(lambda x,y: x+y, map(lambda dashboard: dashboard['graphs'], dashboardsForRole))
+    return set(map(lambda metric: metric['bean_name'], graphs))
Index: system_test/system_test_env.py
===================================================================
--- system_test/system_test_env.py	(revision 1378227)
+++ system_test/system_test_env.py	(working copy)
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/env python
 
 # ===================================
@@ -23,6 +39,8 @@
     SYSTEM_TEST_MODULE_EXT    = ".py"
     CLUSTER_CONFIG_FILENAME   = "cluster_config.json"
     CLUSTER_CONFIG_PATHNAME   = os.path.abspath(SYSTEM_TEST_BASE_DIR + "/" + CLUSTER_CONFIG_FILENAME)
+    METRICS_FILENAME          = "metrics.json"
+    METRICS_PATHNAME          = os.path.abspath(SYSTEM_TEST_BASE_DIR + "/" + METRICS_FILENAME)
 
     clusterEntityConfigDictList  = []
     systemTestResultsList        = []
Index: system_test/replication_testsuite/config/server.properties
===================================================================
--- system_test/replication_testsuite/config/server.properties	(revision 1378227)
+++ system_test/replication_testsuite/config/server.properties	(working copy)
@@ -118,3 +118,5 @@
 
 # Timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000
+
+monitoring.period.secs=1
Index: system_test/replication_testsuite/replica_basic_test.py
===================================================================
--- system_test/replication_testsuite/replica_basic_test.py	(revision 1378227)
+++ system_test/replication_testsuite/replica_basic_test.py	(working copy)
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/env python
 
 # ===================================
@@ -11,6 +27,7 @@
 import subprocess
 import sys
 import time
+import traceback
 
 from   system_test_env    import SystemTestEnv
 sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR)
@@ -20,6 +37,7 @@
 
 # product specific: Kafka
 import kafka_system_test_utils
+import metrics
 
 class ReplicaBasicTest(SetupUtils):
 
@@ -85,7 +103,8 @@
                 #### => update testcaseEnv
                 self.testcaseEnv.testCaseBaseDir = testCasePathName
                 self.testcaseEnv.testCaseLogsDir = self.testcaseEnv.testCaseBaseDir + "/logs"
-    
+                self.testcaseEnv.testCaseDashboardsDir = self.testcaseEnv.testCaseBaseDir + "/dashboards"
+
                 # get testcase description
                 testcaseDescription = ""
                 for k,v in testcaseNonEntityDataDict.items():
@@ -214,8 +233,9 @@
                 # starting producer 
                 self.log_message("starting producer")
                 kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv)
-                self.anonLogger.info("sleeping for 5s")
-                time.sleep(5)
+                self.anonLogger.info("sleeping for 10s")
+                time.sleep(10)
+                kafka_system_test_utils.stop_producer()
     
                 # starting previously terminated broker 
                 if (bounceLeaderFlag.lower() == "true" and not self.testcaseEnv.entityParentPidDict[leaderEntityId]):
@@ -231,26 +251,45 @@
                 # starting consumer
                 self.log_message("starting consumer")
                 kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
-    
+                time.sleep(10)
+                kafka_system_test_utils.stop_consumer()
+                
                 # this testcase is completed - so stopping all entities
                 self.log_message("stopping all entities")
                 for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
                     kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
-    
+                    
                 # validate the data matched
                 self.log_message("validating data matched")
                 result = kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
-    
+                
                 # =============================================
                 # collect logs from remote hosts
                 # =============================================
                 kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
     
+                # ==========================
+                # draw graphs
+                # ==========================
+                metrics.draw_all_graphs(self.systemTestEnv.METRICS_PATHNAME, 
+                                        self.testcaseEnv, 
+                                        self.systemTestEnv.clusterEntityConfigDictList)
+                
+                # build dashboard, one for each role
+                metrics.build_all_dashboards(self.systemTestEnv.METRICS_PATHNAME,
+                                             self.testcaseEnv.testCaseDashboardsDir,
+                                             self.systemTestEnv.clusterEntityConfigDictList)
+                
+                # stop metrics processes
+                for entity in self.systemTestEnv.clusterEntityConfigDictList:
+                    metrics.stop_metrics_collection(entity['hostname'], entity['jmx_port'])
+                                        
             except Exception as e:
-                self.log_message("Exception caught : ")
-                print e
+                self.log_message("Exception while running test {0}".format(e))
+                traceback.print_exc()
                 self.log_message("stopping all entities")
                 for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
                     kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
 
+
  
Index: system_test/system_test_runner.py
===================================================================
--- system_test/system_test_runner.py	(revision 1378227)
+++ system_test/system_test_runner.py	(working copy)
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/evn python
 
 # ===================================
Index: system_test/cluster_config.json
===================================================================
--- system_test/cluster_config.json	(revision 1378227)
+++ system_test/cluster_config.json	(working copy)
@@ -4,7 +4,7 @@
             "entity_id": "0",
             "hostname": "localhost",
             "role": "zookeeper",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
             "java_home": "/export/apps/jdk/JDK-1_6_0_27",
             "jmx_port": "9990"
         },
@@ -12,7 +12,7 @@
             "entity_id": "1",
             "hostname": "localhost",
             "role": "broker",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
             "java_home": "/export/apps/jdk/JDK-1_6_0_27",
             "jmx_port": "9991"
         },
@@ -20,7 +20,7 @@
             "entity_id": "2",
             "hostname": "localhost",
             "role": "broker",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
             "java_home": "/export/apps/jdk/JDK-1_6_0_27",
             "jmx_port": "9992"
         },
@@ -28,7 +28,7 @@
             "entity_id": "3",
             "hostname": "localhost",
             "role": "broker",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
             "java_home": "/export/apps/jdk/JDK-1_6_0_27",
             "jmx_port": "9993"
         },
@@ -36,7 +36,7 @@
             "entity_id": "4",
             "hostname": "localhost",
             "role": "producer_performance",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
             "java_home": "/export/apps/jdk/JDK-1_6_0_27",
             "jmx_port": "9994"
         },
@@ -44,7 +44,7 @@
             "entity_id": "5",
             "hostname": "localhost",
             "role": "console_consumer",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
             "java_home": "/export/apps/jdk/JDK-1_6_0_27",
             "jmx_port": "9995"
         }
Index: system_test/metrics.json
===================================================================
--- system_test/metrics.json	(revision 0)
+++ system_test/metrics.json	(revision 0)
@@ -0,0 +1,174 @@
+{
+    "dashboards": [
+        {
+            "role": "broker",
+            "graphs": [
+               { 
+                  "graph_name": "SocketServerThroughput",
+                  "y_label": "bytes-read-per-second,bytes-written-per-second",
+                  "bean_name": "kafka:type=kafka.SocketServerStats",
+                  "attributes": "BytesReadPerSecond,BytesWrittenPerSecond"
+               },
+               { 
+                  "graph_name": "FetchRequestPurgatoryNumDelayedRequests",
+                  "y_label": "num-delayed-requests",
+                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests",
+                  "attributes": "Value"
+               },
+               { 
+                  "graph_name": "MeanFetchRequestPurgatorySatisfactionRate",
+                  "y_label": "mean-request-satisfaction-rate",
+                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=SatisfactionRate",
+                  "attributes": "MeanRate"
+               },
+               { 
+                  "graph_name": "FetchRequestPurgatoryTimeToSatisfy",
+                  "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
+                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=TimeToSatisfyInNs",
+                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+               },
+               { 
+                  "graph_name": "FetchRequestPurgatoryExpirationRate",
+                  "y_label": "expiration-rate",
+                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=ExpirationRate",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "ProducerRequestPurgatoryNumDelayedRequests",
+                  "y_label": "num-delayed-requests",
+                  "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests",
+                  "attributes": "Value"
+               },
+               {
+                  "graph_name": "MeanProducerRequestPurgatorySatisfactionRate",
+                  "y_label": "mean-request-satisfaction-rate",
+                  "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=SatisfactionRate",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "ProducerRequestPurgatoryExpirationRate",
+                  "y_label": "expiration-rate",
+                  "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=ExpirationRate",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-CaughtUpFollowerFetchRequestsPerSecond",
+                  "y_label": "mean-caught-up-follower-fetch-requests-per-second",
+                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=CaughtUpFollowerRequestsPerSecond-all",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-ExpiredRequestRate",
+                  "y_label": "mean-expired-request-rate",
+                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=ExpiredRequestsPerSecond-all",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-FollowerCatchUpLatency",
+                  "y_label": "mean-follower-catchup-time-ns,95th-percentile-follower-catchup-time-ns,99th-percentile-follower-catchup-time-ns,999th-percentile-follower-catchup-time-ns",
+                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=FollowerCatchUpTimeInNs",
+                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-SatisfactionTimeInNs",
+                  "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
+                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfactionTimeInNs",
+                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-SatisfiedRequestsPerSecond",
+                  "y_label": "mean-satisfaction-requests-per-second",
+                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfiedRequestsPerSecond",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-Throughput-all",
+                  "y_label": "mean-purgatory-throughput-all",
+                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=Throughput-all",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-Follower-ExpiredRequestRate",
+                  "y_label": "mean-expired-request-rate",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-ExpiredRequestsPerSecond",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-Follower-SatisfactionTimeInNs",
+                  "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfactionTimeInNs",
+                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-Follower-SatisfiedRequestsPerSecond",
+                  "y_label": "mean-satisfaction-requests-per-second",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfiedRequestsPerSecond",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-Follower-Throughput-all",
+                  "y_label": "mean-purgatory-throughput-all",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-Throughput-all",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-NonFollower-ExpiredRequestRate",
+                  "y_label": "mean-expired-request-rate",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-ExpiredRequestsPerSecond",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-NonFollower-SatisfactionTimeInNs",
+                  "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfactionTimeInNs",
+                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-NonFollower-SatisfiedRequestsPerSecond",
+                  "y_label": "mean-satisfaction-requests-per-second",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfiedRequestsPerSecond",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-NonFollower-Throughput-all",
+                  "y_label": "mean-purgatory-throughput-all",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-Throughput-all",
+                  "attributes": "MeanRate"
+               }
+             ]
+       },
+        {
+            "role": "producer_performance",
+            "graphs": [
+               {
+                  "graph_name": "ProducerStats",
+                  "y_label": "avg-producer-latency-ms,max-producer-latency-ms,produce-request-throughput",
+                  "bean_name": "kafka:type=kafka.KafkaProducerStats",
+                  "attributes": "AvgProduceRequestMs,MaxProduceRequestMs,ProduceRequestsPerSecond"
+               }
+             ]
+       },
+       {
+            "role": "console_consumer",
+            "graphs": [
+               {
+                  "graph_name": "SimpleConsumerRequestStats",
+                  "y_label": "simple-consumer-throughput,simple-consumer-throughput-bytes,simple-consumer-latency-ms",
+                  "bean_name": "kafka:type=kafka.SimpleConsumerStats",
+                  "attributes": "FetchRequestsPerSecond,ConsumerThroughput,AvgFetchRequestMs"
+               }
+             ]
+       },
+        {
+            "role": "zookeeper",
+            "graphs": [
+               {
+                  "graph_name": "ZookeeperServerStats",
+                  "y_label": "zookeeper-latency-ms",
+                  "bean_name": "org.apache.ZooKeeperService:name0=StandaloneServer_port-1",
+                  "attributes": "AvgRequestLatency"
+               }
+             ]
+       }
+    ]
+}
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala	(revision 1378227)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala	(working copy)
@@ -192,8 +192,8 @@
 }
 
 @threadsafe
-class SyncProducerStats extends SyncProducerStatsMBean {
-  private val produceRequestStats = new SnapshotStats
+class SyncProducerStats(monitoringDurationNs: Long) extends SyncProducerStatsMBean {
+  private val produceRequestStats = new SnapshotStats(monitoringDurationNs)
 
   def recordProduceRequest(requestNs: Long) = produceRequestStats.recordRequestMetric(requestNs)
 
@@ -208,8 +208,10 @@
 
 object SyncProducerStats extends Logging {
   private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats"
-  private val stats = new SyncProducerStats
+  private val stats = new SyncProducerStats(1L * 1000 * 1000 * 1000)
   swallow(Utils.registerMBean(stats, kafkaProducerstatsMBeanName))
 
-  def recordProduceRequest(requestMs: Long) = stats.recordProduceRequest(requestMs)
+  def recordProduceRequest(requestMs: Long) = {
+    stats.recordProduceRequest(requestMs)
+  }
 }
Index: core/src/main/scala/kafka/message/FileMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/FileMessageSet.scala	(revision 1378227)
+++ core/src/main/scala/kafka/message/FileMessageSet.scala	(working copy)
@@ -252,8 +252,8 @@
 }
 
 @threadsafe
-class LogFlushStats extends LogFlushStatsMBean {
-  private val flushRequestStats = new SnapshotStats
+class LogFlushStats(monitorDurationNs: Long) extends LogFlushStatsMBean {
+  private val flushRequestStats = new SnapshotStats(monitorDurationNs)
 
   def recordFlushRequest(requestMs: Long) = flushRequestStats.recordRequestMetric(requestMs)
 
@@ -270,7 +270,7 @@
 
 object LogFlushStats extends Logging {
   private val LogFlushStatsMBeanName = "kafka:type=kafka.LogFlushStats"
-  private val stats = new LogFlushStats
+  private val stats = new LogFlushStats(1L * 1000 * 1000 * 1000)
   Utils.registerMBean(stats, LogFlushStatsMBeanName)
 
   def recordFlushRequest(requestMs: Long) = stats.recordFlushRequest(requestMs)
Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(revision 1378227)
+++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(working copy)
@@ -134,8 +134,8 @@
 }
 
 @threadsafe
-class SimpleConsumerStats extends SimpleConsumerStatsMBean {
-  private val fetchRequestStats = new SnapshotStats
+class SimpleConsumerStats(monitoringDurationNs: Long) extends SimpleConsumerStatsMBean {
+  private val fetchRequestStats = new SnapshotStats(monitoringDurationNs)
 
   def recordFetchRequest(requestNs: Long) = fetchRequestStats.recordRequestMetric(requestNs)
 
@@ -154,7 +154,7 @@
 
 object SimpleConsumerStats extends Logging {
   private val simpleConsumerstatsMBeanName = "kafka:type=kafka.SimpleConsumerStats"
-  private val stats = new SimpleConsumerStats
+  private val stats = new SimpleConsumerStats(1 * 1000L * 1000L * 1000L)
   Utils.registerMBean(stats, simpleConsumerstatsMBeanName)
 
   def recordFetchRequest(requestMs: Long) = stats.recordFetchRequest(requestMs)
Index: core/src/main/scala/kafka/tools/JmxTool.scala
===================================================================
--- core/src/main/scala/kafka/tools/JmxTool.scala	(revision 1378227)
+++ core/src/main/scala/kafka/tools/JmxTool.scala	(working copy)
@@ -26,37 +26,43 @@
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.math._
+import kafka.utils.Logging
 
+object JmxTool extends Logging {
 
-object JmxTool {
-
   def main(args: Array[String]) {
     // Parse command line
     val parser = new OptionParser
-    val objectNameOpt = 
+    val objectNameOpt =
       parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
-                                    "can be given multiple times to specify more than one query. If no objects are specified " +   
-                                    "all objects will be queried.")
-      .withRequiredArg
-      .describedAs("name")
-      .ofType(classOf[String])
+        "can be given multiple times to specify more than one query. If no objects are specified " +
+        "all objects will be queried.")
+        .withRequiredArg
+        .describedAs("name")
+        .ofType(classOf[String])
+    val attributesOpt =
+      parser.accepts("attributes", "The whitelist of attributes to query. This is a comma-separated list. If no " +
+        "attributes are specified all objects will be queried.")
+        .withOptionalArg()
+        .describedAs("name")
+        .ofType(classOf[String])
     val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats.")
       .withRequiredArg
       .describedAs("ms")
       .ofType(classOf[java.lang.Integer])
-      .defaultsTo(5000)
+      .defaultsTo(2000)
     val helpOpt = parser.accepts("help", "Print usage information.")
-    val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + 
-                                                      "See java.text.SimpleDateFormat for options.")
+    val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +
+      "See java.text.SimpleDateFormat for options.")
       .withOptionalArg()
       .describedAs("format")
       .ofType(classOf[String])
     val jmxServiceUrlOpt =
       parser.accepts("jmx-url", "The url to connect to to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
-      .withRequiredArg
-      .describedAs("service-url")
-      .ofType(classOf[String])
-      .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
+        .withRequiredArg
+        .describedAs("service-url")
+        .ofType(classOf[String])
+        .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
 
     val options = parser.parse(args : _*)
 
@@ -67,44 +73,62 @@
 
     val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt))
     val interval = options.valueOf(reportingIntervalOpt).intValue
+    val attributesWhitelistExists = options.has(attributesOpt)
+    val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",")) else None
     val dateFormatExists = options.has(dateFormatOpt)
     val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None
     val jmxc = JMXConnectorFactory.connect(url, null)
     val mbsc = jmxc.getMBeanServerConnection()
 
-    val queries: Iterable[ObjectName] = 
+    val queries: Iterable[ObjectName] =
       if(options.has(objectNameOpt))
         options.valuesOf(objectNameOpt).map(new ObjectName(_))
       else
         List(null)
+
     val names = queries.map((name: ObjectName) => asSet(mbsc.queryNames(name, null))).flatten
-    val attributes: Iterable[(ObjectName, Array[String])] = 
+    val allAttributes: Iterable[(ObjectName, Array[String])] =
       names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName)))
 
+
+    val numExpectedAttributes: Map[ObjectName, Int] =
+      attributesWhitelistExists match {
+        case true => queries.map((_, attributesWhitelist.get.size)).toMap
+        case false => names.map((name: ObjectName) =>
+          (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName).size)).toMap
+      }
+
     // print csv header
-    val keys = List("time") ++ queryAttributes(mbsc, names).keys.toArray.sorted
-    println(keys.map("\"" + _ + "\"").mkString(", "))
+    val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted
+    if(keys.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1)
+      println(keys.map("\"" + _ + "\"").mkString(","))
 
     while(true) {
       val start = System.currentTimeMillis
-      val attributes = queryAttributes(mbsc, names)
+      val attributes = queryAttributes(mbsc, names, attributesWhitelist)
       attributes("time") = dateFormat match {
         case Some(dFormat) => dFormat.format(new Date)
         case None => System.currentTimeMillis().toString
       }
-      println(keys.map(attributes(_)).mkString(", "))
+      if(attributes.keySet.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1)
+        println(keys.map(attributes(_)).mkString(","))
       val sleep = max(0, interval - (System.currentTimeMillis - start))
       Thread.sleep(sleep)
     }
   }
 
-  def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName]) = {
+  def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesWhitelist: Option[Array[String]]) = {
     var attributes = new mutable.HashMap[String, Any]()
-	for(name <- names) {
-	  val mbean = mbsc.getMBeanInfo(name)
+    for(name <- names) {
+      val mbean = mbsc.getMBeanInfo(name)
       for(attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName))) {
         val attr = attrObj.asInstanceOf[Attribute]
-        attributes(name + ":" + attr.getName) = attr.getValue
+        attributesWhitelist match {
+          case Some(allowedAttributes) =>
+            if(allowedAttributes.contains(attr.getName))
+              attributes(name + ":" + attr.getName) = attr.getValue
+          case None => attributes(name + ":" + attr.getName) = attr.getValue
+        }
       }
     }
     attributes
Index: core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
===================================================================
--- core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala	(revision 1378227)
+++ core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala	(working copy)
@@ -33,7 +33,7 @@
    *
    * @return The sub-group identifier.
    */
-  def metricsGroupIdent: String
+  def metricsGroupIdent: String = ""
 
   /**
    * Creates a new MetricName object for gauges, meters, etc. created for this
Index: core/src/main/scala/kafka/server/RequestPurgatory.scala
===================================================================
--- core/src/main/scala/kafka/server/RequestPurgatory.scala	(revision 1378227)
+++ core/src/main/scala/kafka/server/RequestPurgatory.scala	(working copy)
@@ -72,8 +72,6 @@
   private val satisfactionRateBeanName = "SatisfactionRate"
   private val expirationRateBeanName = "ExpirationRate"
 
-  override def metricsGroupIdent = ""
-
   val satisfactionRateMeter = newMeter(
       satisfactionRateBeanName,
       "requests",
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1378227)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -459,9 +459,8 @@
    */
   class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) {
 
-    this.logIdent = "[etchRequestPurgatory-%d], ".format(brokerId)
+    this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId)
 
-    override def metricsGroupIdent = metricsGroup
 
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
@@ -598,8 +597,6 @@
 
     this.logIdent = "[ProducerRequestPurgatory-%d], ".format(brokerId)
 
-    override def metricsGroupIdent = metricsGroup
-
     protected def checkSatisfied(followerFetchRequestKey: RequestKey,
                                  delayedProduce: DelayedProduce) =
       delayedProduce.isSatisfied(followerFetchRequestKey)
@@ -617,7 +614,6 @@
 
   private class DelayedRequestMetrics {
     private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
-      override def metricsGroupIdent = metricsGroup
       val caughtUpFollowerFetchRequestMeter =
         newMeter("CaughtUpFollowerFetchRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS)
       val followerCatchUpTimeHistogram = if (keyLabel == MetricKey.globalLabel)
@@ -645,7 +641,6 @@
                                              keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
       private val metricPrefix = if (forFollower) "Follower" else "NonFollower"
 
-      override def metricsGroupIdent = metricsGroup
       val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel)
         Some(newMeter(metricPrefix + "-SatisfiedRequestsPerSecond",
           "requests", TimeUnit.SECONDS))
@@ -705,7 +700,6 @@
       aggregateProduceRequestMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs))
     }
 
-
     private def recordDelayedFetchThroughput(forFollower: Boolean, response: FetchResponse) {
       val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
         else aggregateNonFollowerFetchRequestMetrics
Index: config/log4j.properties
===================================================================
--- config/log4j.properties	(revision 1378227)
+++ config/log4j.properties	(working copy)
@@ -25,7 +25,8 @@
 
 
 # Turn on all our debugging info
-#log4j.logger.kafka=INFO
+log4j.logger.kafka.perf=DEBUG
+log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
 #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
 
 log4j.logger.kafka.perf=DEBUG
