From 5c5538f037cbb23a4f0de5d4b17a28fe8ac1deed Mon Sep 17 00:00:00 2001
From: Dong Lin <lindong@cis.upenn.edu>
Date: Wed, 3 Jun 2015 11:28:27 -0700
Subject: [PATCH] KAFKA-2171; System Test for Quotas

---
 .../kafka/clients/producer/internals/Sender.java   |   10 +
 system_test/quota_testsuite/cluster_config.json    |   76 ++++++
 .../config/console_consumer.properties             |    2 +
 .../quota_testsuite/config/server.properties       |  150 ++++++++++
 .../quota_testsuite/config/zookeeper.properties    |   23 ++
 .../quota_testsuite/quota_management_test.py       |  286 ++++++++++++++++++++
 .../testcase_6001/testcase_6001_properties.json    |  108 ++++++++
 .../testcase_6002/testcase_6002_properties.json    |  111 ++++++++
 .../testcase_6003/testcase_6003_properties.json    |  112 ++++++++
 system_test/testcase_to_run_all.json               |    6 +
 system_test/utils/kafka_system_test_utils.py       |  191 ++++++++++++-
 system_test/utils/replication_utils.py             |    3 +
 system_test/utils/testcase_env.py                  |   12 +
 13 files changed, 1084 insertions(+), 6 deletions(-)
 create mode 100644 system_test/quota_testsuite/cluster_config.json
 create mode 100644 system_test/quota_testsuite/config/console_consumer.properties
 create mode 100644 system_test/quota_testsuite/config/producer_performance.properties
 create mode 100644 system_test/quota_testsuite/config/server.properties
 create mode 100644 system_test/quota_testsuite/config/zookeeper.properties
 create mode 100644 system_test/quota_testsuite/quota_management_test.py
 create mode 100644 system_test/quota_testsuite/testcase_6001/testcase_6001_properties.json
 create mode 100644 system_test/quota_testsuite/testcase_6002/testcase_6002_properties.json
 create mode 100644 system_test/quota_testsuite/testcase_6003/testcase_6003_properties.json

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 1e943d6..fef09b4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ProduceRequest;
@@ -352,6 +353,7 @@ public class Sender implements Runnable {
         public final Sensor batchSizeSensor;
         public final Sensor compressionRateSensor;
         public final Sensor maxRecordSizeSensor;
+        public final Sensor bytesSentSensor;
 
         public SenderMetrics(Metrics metrics) {
             this.metrics = metrics;
@@ -401,6 +403,12 @@ public class Sender implements Runnable {
             m = new MetricName("record-size-avg", metricGrpName, "The average record size", metricTags);
             this.maxRecordSizeSensor.add(m, new Avg());
 
+            this.bytesSentSensor = metrics.sensor("producer-rate");
+            m = new MetricName("bytes-produced-rate", metricGrpName, "The average number of bytes produced per second", metricTags);
+            this.bytesSentSensor.add(m, new Rate());
+            m = new MetricName("bytes-produced-total", metricGrpName, "The total number of bytes produced", metricTags);
+            this.bytesSentSensor.add(m, new Total());
+
             m = new MetricName("requests-in-flight", metricGrpName, "The current number of in-flight requests awaiting a response.", metricTags);
             this.metrics.addMetric(m, new Measurable() {
                 public double measure(MetricConfig config, long now) {
@@ -481,6 +489,8 @@ public class Sender implements Runnable {
                     this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, now);
                     this.compressionRateSensor.record(batch.records.compressionRate());
                     this.maxRecordSizeSensor.record(batch.maxRecordSize, now);
+                    this.bytesSentSensor.record(batch.records.sizeInBytes(), now);
+
                     records += batch.recordCount;
                 }
                 this.recordsPerRequestSensor.record(records, now);
diff --git a/system_test/quota_testsuite/cluster_config.json b/system_test/quota_testsuite/cluster_config.json
new file mode 100644
index 0000000..5664d58
--- /dev/null
+++ b/system_test/quota_testsuite/cluster_config.json
@@ -0,0 +1,76 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9100"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9101"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9105"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9106"
+        },
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9107"
+        },
+        {
+            "entity_id": "8",
+            "hostname": "localhost",
+            "role": "jmx_tool_producer",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9108"
+        },
+        {
+            "entity_id": "9",
+            "hostname": "localhost",
+            "role": "jmx_tool_consumer",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9109"
+        },
+        {
+            "entity_id": "10",
+            "hostname": "localhost",
+            "role": "jmx_tool_consumer",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9110"
+        }
+    ]
+}
diff --git a/system_test/quota_testsuite/config/console_consumer.properties b/system_test/quota_testsuite/config/console_consumer.properties
new file mode 100644
index 0000000..a2ab8b9
--- /dev/null
+++ b/system_test/quota_testsuite/config/console_consumer.properties
@@ -0,0 +1,2 @@
+auto.offset.reset=smallest
+auto.commit.interval.ms=1000
diff --git a/system_test/quota_testsuite/config/producer_performance.properties b/system_test/quota_testsuite/config/producer_performance.properties
new file mode 100644
index 0000000..e69de29
diff --git a/system_test/quota_testsuite/config/server.properties b/system_test/quota_testsuite/config/server.properties
new file mode 100644
index 0000000..a1ee838
--- /dev/null
+++ b/system_test/quota_testsuite/config/server.properties
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
+# may not be what you want.
+#host.name=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9091
+
+# The number of threads handling network requests
+num.network.threads=2
+
+# The number of threads doing disk I/O
+num.io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=5
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+#    3. Throughput: The flush is generally the most expensive operation.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.flush.interval.ms=1000
+
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+log.retention.bytes=-1
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.segment.size=536870912
+log.segment.bytes=16384
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
+
+monitoring.period.secs=1
+message.max.bytes=1000000
+queued.max.requests=500
+log.roll.hours=168
+log.index.size.max.bytes=10485760
+log.index.interval.bytes=4096
+auto.create.topics.enable=true
+controller.socket.timeout.ms=30000
+controller.message.queue.size=10
+default.replication.factor=1
+replica.lag.time.max.ms=10000
+replica.lag.max.messages=4000
+replica.socket.timeout.ms=30000
+replica.socket.receive.buffer.bytes=65536
+replica.fetch.max.bytes=1048576
+replica.fetch.wait.max.ms=500
+replica.fetch.min.bytes=4096
+num.replica.fetchers=1
+
+transaction.topic.num.partitions=2
+transaction.topic.replication.factor=4
+offsets.topic.num.partitions=2
+offsets.topic.replication.factor=4
+
+quota.producer.default=20000
+quota.consumer.default=20000
+quota.producer.bytes.per.second.overrides=
+quota.consumer.bytes.per.second.overrides=
diff --git a/system_test/quota_testsuite/config/zookeeper.properties b/system_test/quota_testsuite/config/zookeeper.properties
new file mode 100644
index 0000000..adff28b
--- /dev/null
+++ b/system_test/quota_testsuite/config/zookeeper.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
+syncLimit=5
+initLimit=10
+tickTime=2000
diff --git a/system_test/quota_testsuite/quota_management_test.py b/system_test/quota_testsuite/quota_management_test.py
new file mode 100644
index 0000000..eb43d5a
--- /dev/null
+++ b/system_test/quota_testsuite/quota_management_test.py
@@ -0,0 +1,286 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#!/usr/bin/env python
+
+# ===================================
+# quota_management_test.py
+# ===================================
+
+import os
+import signal
+import sys
+import time
+import traceback
+import random
+
+from   system_test_env    import SystemTestEnv
+sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR)
+
+from   setup_utils        import SetupUtils
+from   replication_utils  import ReplicationUtils
+import system_test_utils
+from   testcase_env       import TestcaseEnv
+
+# product specific: Kafka
+import kafka_system_test_utils
+import metrics
+
+class QuotaManagementTest(ReplicationUtils, SetupUtils):
+
+    testModuleAbsPathName = os.path.realpath(__file__)
+    testSuiteAbsPathName  = os.path.abspath(os.path.dirname(testModuleAbsPathName))
+
+    def __init__(self, systemTestEnv):
+
+        # SystemTestEnv - provides cluster level environment settings
+        #     such as entity_id, hostname, kafka_home, java_home which
+        #     are available in a list of dictionary named
+        #     "clusterEntityConfigDictList"
+        self.systemTestEnv = systemTestEnv
+
+        super(QuotaManagementTest, self).__init__(self)
+
+        # dict to pass user-defined attributes to logger argument: "extra"
+        d = {'name_of_class': self.__class__.__name__}
+
+    def signal_handler(self, signal, frame):
+        self.log_message("Interrupt detected - User pressed Ctrl+c")
+
+        # perform the necessary cleanup here when user presses Ctrl+c and it may be product specific
+        self.log_message("stopping all entities - please wait ...")
+        kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)
+        sys.exit(1)
+
+    def runTest(self):
+
+        # ======================================================================
+        # get all testcase directories under this testsuite
+        # ======================================================================
+        testCasePathNameList = system_test_utils.get_dir_paths_with_prefix(
+            self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX)
+        testCasePathNameList.sort()
+
+        replicationUtils = ReplicationUtils(self)
+
+        # =============================================================
+        # launch each testcase one by one: testcase_1, testcase_2, ...
+        # =============================================================
+        for testCasePathName in testCasePathNameList:
+
+            skipThisTestCase = False
+
+            try:
+                # ======================================================================
+                # A new instance of TestcaseEnv to keep track of this testcase's env vars
+                # and initialize some env vars as testCasePathName is available now
+                # ======================================================================
+                self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self)
+                self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName
+                self.testcaseEnv.initWithKnownTestCasePathName(testCasePathName)
+                self.testcaseEnv.testcaseArgumentsDict = self.testcaseEnv.testcaseNonEntityDataDict["testcase_args"]
+
+                # ======================================================================
+                # SKIP if this case is IN testcase_to_skip.json or NOT IN testcase_to_run.json
+                # ======================================================================
+                testcaseDirName = self.testcaseEnv.testcaseResultsDict["_test_case_name"]
+
+                if self.systemTestEnv.printTestDescriptionsOnly:
+                    self.testcaseEnv.printTestCaseDescription(testcaseDirName)
+                    continue
+                elif self.systemTestEnv.isTestCaseToSkip(self.__class__.__name__, testcaseDirName):
+                    self.log_message("Skipping : " + testcaseDirName)
+                    skipThisTestCase = True
+                    continue
+                else:
+                    self.testcaseEnv.printTestCaseDescription(testcaseDirName)
+                    system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName)
+
+                # ============================================================================== #
+                # ============================================================================== #
+                #                   Product Specific Testing Code Starts Here:                   #
+                # ============================================================================== #
+                # ============================================================================== #
+
+                # initialize self.testcaseEnv with user-defined environment variables (product specific)
+                self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]    = False
+                self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False
+
+                # initialize signal handler
+                signal.signal(signal.SIGINT, self.signal_handler)
+
+                # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file:
+                #   system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
+                self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data(
+                    self.testcaseEnv.testcasePropJsonPathName)
+
+                # clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
+                kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+
+                # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase
+                # for collecting logs from remote machines
+                kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv)
+
+                # TestcaseEnv - initialize producer & consumer config / log file pathnames
+                kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
+
+                # generate remote hosts log/config dirs if not exist
+                kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+
+                # generate properties files for zookeeper, kafka, producer, and consumer:
+                # 1. copy system_test/<suite_name>_testsuite/config/*.properties to
+                #    system_test/<suite_name>_testsuite/testcase_<n>/config/
+                # 2. update all properties files in system_test/<suite_name>_testsuite/testcase_<n>/config
+                #    by overriding the settings specified in:
+                #    system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
+                kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName,
+                    self.testcaseEnv, self.systemTestEnv)
+
+                # =============================================
+                # preparing all entities to start the test
+                # =============================================
+                self.log_message("starting zookeepers")
+                kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 2s")
+                time.sleep(2)
+
+                self.log_message("starting brokers")
+                kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 5s")
+                time.sleep(5)
+
+                self.log_message("creating test topic")
+                kafka_system_test_utils.create_topic_for_producer_performance(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 5s")
+                time.sleep(5)
+
+                # =============================================
+                # starting producer
+                # =============================================
+                self.log_message("starting producer in the background")
+                kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv, False)
+                msgProducingFreeTimeSec = self.testcaseEnv.testcaseArgumentsDict["message_producing_free_time_sec"]
+                self.anonLogger.info("sleeping for 2s")
+                time.sleep(2)
+                
+                # =============================================
+                # starting JmxTool to read producer rate
+                # =============================================
+                self.log_message("starting JmxTool to read producer rate")
+                kafka_system_test_utils.start_jmx_tool_producers(self.systemTestEnv, self.testcaseEnv)
+
+                self.anonLogger.info("sleeping for " + msgProducingFreeTimeSec + " sec to produce some messages")
+                time.sleep(int(msgProducingFreeTimeSec))
+
+                # =============================================
+                # tell producer to stop
+                # =============================================
+                self.log_message("stopping background producer")
+                self.testcaseEnv.lock.acquire()
+                self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True
+                time.sleep(1)
+                self.testcaseEnv.lock.release()
+                time.sleep(1)
+
+                # =============================================
+                # wait for producer thread's update of
+                # "backgroundProducerStopped" to be "True"
+                # =============================================
+                while 1:
+                    self.testcaseEnv.lock.acquire()
+                    self.logger.info("status of backgroundProducerStopped : [" + \
+                        str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d)
+                    if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
+                        time.sleep(1)
+                        self.logger.info("all producer threads completed", extra=self.d)
+                        break
+                    time.sleep(1)
+                    self.testcaseEnv.lock.release()
+                    time.sleep(2)
+
+                # =============================================
+                # stopping JmxTool from reading producer rate
+                # =============================================
+                self.log_message("stopping JmxTool from reading producer rate")
+                for entityId, parentPid in self.testcaseEnv.entityJmxToolProducerParentPidDict.items():
+                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+                    
+                # =============================================
+                # starting consumer
+                # =============================================
+                self.log_message("starting console consumer")
+                kafka_system_test_utils.start_console_consumers(self.systemTestEnv, self.testcaseEnv)
+                msgConsumingFreeTimeSec = self.testcaseEnv.testcaseArgumentsDict["message_consuming_free_time_sec"]
+                self.anonLogger.info("sleeping for 2s")
+                time.sleep(2)
+                
+                # =============================================
+                # starting JmxTool to read consumer rate
+                # =============================================
+                self.log_message("starting JmxTool to read consumer rate")
+                kafka_system_test_utils.start_jmx_tool_consumers(self.systemTestEnv, self.testcaseEnv)
+
+                self.anonLogger.info("sleeping for " + msgConsumingFreeTimeSec + " sec to consume some messages")
+                time.sleep(int(msgConsumingFreeTimeSec))
+
+                # =============================================
+                # stopping JmxTool from reading consumer rate
+                # =============================================
+                self.log_message("stopping JmxTool from reading consumer rate")
+                for entityId, parentPid in self.testcaseEnv.entityJmxToolConsumerParentPidDict.items():
+                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
+
+                # =============================================
+                # this testcase is completed - stop all entities
+                # =============================================
+                self.log_message("stopping all entities")
+                for entityId, parentPid in self.testcaseEnv.entityBrokerParentPidDict.items():
+                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
+                for entityId, parentPid in self.testcaseEnv.entityZkParentPidDict.items():
+                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
+                # make sure all entities are stopped
+                kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv)
+
+                # =============================================
+                # collect logs from remote hosts
+                # =============================================
+                kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+
+                # =============================================
+                # validate the data matched and checksum
+                # =============================================
+                '''
+                self.log_message("validating data matched")
+                kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv, replicationUtils)
+                '''
+
+                # =============================================
+                # validate that the rate matches the quota
+                # =============================================
+                self.log_message("validating data matched")
+                kafka_system_test_utils.validate_rate_matched_quota(self.systemTestEnv, self.testcaseEnv, replicationUtils)
+
+            except Exception as e:
+                self.log_message("Exception while running test {0}".format(e))
+                traceback.print_exc()
+
+            finally:
+                if not skipThisTestCase and not self.systemTestEnv.printTestDescriptionsOnly:
+                    self.log_message("stopping all entities - please wait ...")
+                    kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)
diff --git a/system_test/quota_testsuite/testcase_6001/testcase_6001_properties.json b/system_test/quota_testsuite/testcase_6001/testcase_6001_properties.json
new file mode 100644
index 0000000..5569e69
--- /dev/null
+++ b/system_test/quota_testsuite/testcase_6001/testcase_6001_properties.json
@@ -0,0 +1,108 @@
+{
+  "description": {"01":"To Test : 'Quota management test.'",
+                  "02":"Set up a Zk and Kafka cluster.",
+                  "03":"Produce messages to a single topics",
+                  "04":"Start JmxTool to read byte rate of producer",
+                  "05":"Stop JmxTool after producer stops",
+                  "06":"Start consumers.",
+                  "07":"Start JmxTool to read byte rate of consumers",
+                  "08":"Stop JmxTool after consumers stop",
+                  "09":"Verify that there are no duplicate messages or lost messages on any consumer group.",
+                  "10":"Verify that the recorded rate is close to quota",
+                  "11":"Producer dimensions : mode:async, acks:-1, comp:0"
+  },
+  "testcase_args": {
+    "bounce_leaders": "true",
+    "replica_factor": "1",
+    "num_partition": "6",
+    "topic": "topic_test",
+    "num_iteration": "0",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "30",
+    "message_consuming_free_time_sec": "300",
+    "num_messages_to_produce_per_producer_call": "50",
+    "num_topics_for_auto_generated_string": "-1",
+    "quota.producer.default": "20000",
+    "quota.consumer.default": "20000",
+    "quota.producer.bytes.per.second.overrides": "",
+    "quota.consumer.bytes.per.second.overrides": ""
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2108",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_0.log",
+      "config_filename": "zookeeper_0.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "16384",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "default.replication.factor": "1",
+      "num.partitions": "6",
+      "log_filename": "kafka_server_1.log",
+      "config_filename": "kafka_server_1.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "topic_test",
+      "threads": "1",
+      "compression-codec": "0",
+      "message-size": "2000",
+      "message": "3000",
+      "new-producer": "true",
+      "request-num-acks": "-1",
+      "message-send-gap-ms": "0",
+      "sync": "false",
+      "producer-num-retries": "5",
+      "log_filename": "producer_performance.log",
+      "config_filename": "producer_performance_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "topic": "topic_test",
+      "group.id": "group1",
+      "consumer-timeout-ms": "60000",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "topic": "topic_test",
+      "group.id": "group2",
+      "consumer-timeout-ms": "60000",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer_7.properties"
+    },
+    {
+      "entity_id": "8",
+      "log_filename": "jmx_tool.log",
+      "target_jmx_ip": "127.0.0.1",
+      "target_jmx_port": "9105",
+      "jmx_object_name": "kafka.producer:type=producer-metrics,client-id=producer-performance",
+      "jmx_attributes": "bytes-produced-rate,bytes-produced-total",
+      "monitored_clientId": "producer-performance"
+    },
+    {
+      "entity_id": "9",
+      "log_filename": "jmx_tool.log",
+      "target_jmx_ip": "127.0.0.1",
+      "target_jmx_port": "9106",
+      "jmx_object_name": "kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=group1",
+      "jmx_attributes": "OneMinuteRate,Count",
+      "monitored_clientId": "group1"
+    },
+    {
+      "entity_id": "10",
+      "log_filename": "jmx_tool.log",
+      "target_jmx_ip": "127.0.0.1",
+      "target_jmx_port": "9107",
+      "jmx_object_name": "kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=group2",
+      "jmx_attributes": "OneMinuteRate,Count",
+      "monitored_clientId": "group2"
+    }
+   ]
+}
diff --git a/system_test/quota_testsuite/testcase_6002/testcase_6002_properties.json b/system_test/quota_testsuite/testcase_6002/testcase_6002_properties.json
new file mode 100644
index 0000000..419d049
--- /dev/null
+++ b/system_test/quota_testsuite/testcase_6002/testcase_6002_properties.json
@@ -0,0 +1,111 @@
+{
+  "description": {"01":"To Test : 'Quota management test.'",
+                  "02":"Set up a Zk and Kafka cluster.",
+                  "03":"Produce messages to a single topics",
+                  "04":"Start JmxTool to read byte rate of producer",
+                  "05":"Stop JmxTool after producer stops",
+                  "06":"Start consumers.",
+                  "07":"Start JmxTool to read byte rate of consumers",
+                  "08":"Stop JmxTool after consumers stop",
+                  "09":"Verify that there are no duplicate messages or lost messages on any consumer group.",
+                  "10":"Verify that the recorded rate is close to quota",
+                  "11":"Producer dimensions : mode:async, acks:-1, comp:0",
+                  "12":"Override quota for producer and one of the consumer"
+  },
+  "testcase_args": {
+    "bounce_leaders": "true",
+    "replica_factor": "1",
+    "num_partition": "6",
+    "topic": "topic_test",
+    "num_iteration": "0",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "30",
+    "message_consuming_free_time_sec": "300",
+    "num_messages_to_produce_per_producer_call": "50",
+    "num_topics_for_auto_generated_string": "-1",
+    "quota.producer.default": "20000",
+    "quota.consumer.default": "20000",
+    "quota.producer.bytes.per.second.overrides": "producer-performance=15000",
+    "quota.consumer.bytes.per.second.overrides": "group1=15000"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2108",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_0.log",
+      "config_filename": "zookeeper_0.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "16384",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "default.replication.factor": "1",
+      "num.partitions": "6",
+      "log_filename": "kafka_server_1.log",
+      "config_filename": "kafka_server_1.properties",
+      "quota.producer.bytes.per.second.overrides": "producer-performance=15000",
+      "quota.consumer.bytes.per.second.overrides": "group1=15000"
+    },
+    {
+      "entity_id": "5",
+      "topic": "topic_test",
+      "threads": "1",
+      "compression-codec": "0",
+      "message-size": "2000",
+      "message": "3000",
+      "new-producer": "true",
+      "request-num-acks": "-1",
+      "message-send-gap-ms": "0",
+      "sync": "false",
+      "producer-num-retries": "5",
+      "log_filename": "producer_performance.log",
+      "config_filename": "producer_performance_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "topic": "topic_test",
+      "group.id": "group1",
+      "consumer-timeout-ms": "60000",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "topic": "topic_test",
+      "group.id": "group2",
+      "consumer-timeout-ms": "60000",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer_7.properties"
+    },
+    {
+      "entity_id": "8",
+      "log_filename": "jmx_tool.log",
+      "target_jmx_ip": "127.0.0.1",
+      "target_jmx_port": "9105",
+      "jmx_object_name": "kafka.producer:type=producer-metrics,client-id=producer-performance",
+      "jmx_attributes": "bytes-produced-rate,bytes-produced-total",
+      "monitored_clientId": "producer-performance"
+    },
+    {
+      "entity_id": "9",
+      "log_filename": "jmx_tool.log",
+      "target_jmx_ip": "127.0.0.1",
+      "target_jmx_port": "9106",
+      "jmx_object_name": "kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=group1",
+      "jmx_attributes": "OneMinuteRate,Count",
+      "monitored_clientId": "group1"
+    },
+    {
+      "entity_id": "10",
+      "log_filename": "jmx_tool.log",
+      "target_jmx_ip": "127.0.0.1",
+      "target_jmx_port": "9107",
+      "jmx_object_name": "kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=group2",
+      "jmx_attributes": "OneMinuteRate,Count",
+      "monitored_clientId": "group2"
+    }
+   ]
+}
diff --git a/system_test/quota_testsuite/testcase_6003/testcase_6003_properties.json b/system_test/quota_testsuite/testcase_6003/testcase_6003_properties.json
new file mode 100644
index 0000000..13caa46
--- /dev/null
+++ b/system_test/quota_testsuite/testcase_6003/testcase_6003_properties.json
@@ -0,0 +1,112 @@
+{
+  "description": {"01":"To Test : 'Quota management test.'",
+                  "02":"Set up a Zk and Kafka cluster.",
+                  "03":"Produce messages to a single topics",
+                  "04":"Start JmxTool to read byte rate of producer",
+                  "05":"Stop JmxTool after producer stops",
+                  "06":"Start consumers.",
+                  "07":"Start JmxTool to read byte rate of consumers",
+                  "08":"Stop JmxTool after consumers stop",
+                  "09":"Verify that there are no duplicate messages or lost messages on any consumer group.",
+                  "10":"Verify that the recorded rate is close to quota",
+                  "11":"Producer dimensions : mode:async, acks:-1, comp:0",
+                  "12":"Override quota for producer and consumers",
+                  "13":"consumers share the same clientId and groupId"
+  },
+  "testcase_args": {
+    "bounce_leaders": "true",
+    "replica_factor": "1",
+    "num_partition": "6",
+    "topic": "topic_test",
+    "num_iteration": "0",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "30",
+    "message_consuming_free_time_sec": "300",
+    "num_messages_to_produce_per_producer_call": "50",
+    "num_topics_for_auto_generated_string": "-1",
+    "quota.producer.default": "20000",
+    "quota.consumer.default": "20000",
+    "quota.producer.bytes.per.second.overrides": "producer-performance=15000",
+    "quota.consumer.bytes.per.second.overrides": "group1=15000"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2108",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_0.log",
+      "config_filename": "zookeeper_0.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "16384",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "default.replication.factor": "1",
+      "num.partitions": "6",
+      "log_filename": "kafka_server_1.log",
+      "config_filename": "kafka_server_1.properties",
+      "quota.producer.bytes.per.second.overrides": "producer-performance=15000",
+      "quota.consumer.bytes.per.second.overrides": "group1=15000"
+    },
+    {
+      "entity_id": "5",
+      "topic": "topic_test",
+      "threads": "1",
+      "compression-codec": "0",
+      "message-size": "2000",
+      "message": "3000",
+      "new-producer": "true",
+      "request-num-acks": "-1",
+      "message-send-gap-ms": "0",
+      "sync": "false",
+      "producer-num-retries": "5",
+      "log_filename": "producer_performance.log",
+      "config_filename": "producer_performance_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "topic": "topic_test",
+      "group.id": "group1",
+      "consumer-timeout-ms": "60000",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "topic": "topic_test",
+      "group.id": "group1",
+      "consumer-timeout-ms": "60000",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer_7.properties"
+    },
+    {
+      "entity_id": "8",
+      "log_filename": "jmx_tool.log",
+      "target_jmx_ip": "127.0.0.1",
+      "target_jmx_port": "9105",
+      "jmx_object_name": "kafka.producer:type=producer-metrics,client-id=producer-performance",
+      "jmx_attributes": "bytes-produced-rate,bytes-produced-total",
+      "monitored_clientId": "producer-performance"
+    },
+    {
+      "entity_id": "9",
+      "log_filename": "jmx_tool.log",
+      "target_jmx_ip": "127.0.0.1",
+      "target_jmx_port": "9106",
+      "jmx_object_name": "kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=group1",
+      "jmx_attributes": "OneMinuteRate,Count",
+      "monitored_clientId": "group1"
+    },
+    {
+      "entity_id": "10",
+      "log_filename": "jmx_tool.log",
+      "target_jmx_ip": "127.0.0.1",
+      "target_jmx_port": "9107",
+      "jmx_object_name": "kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=group1",
+      "jmx_attributes": "OneMinuteRate,Count",
+      "monitored_clientId": "group1"
+    }
+   ]
+}
diff --git a/system_test/testcase_to_run_all.json b/system_test/testcase_to_run_all.json
index 3e80a1f..5b02535 100644
--- a/system_test/testcase_to_run_all.json
+++ b/system_test/testcase_to_run_all.json
@@ -135,5 +135,11 @@
         "testcase_15004",
         "testcase_15005",
         "testcase_15006"
+    ],
+
+    "QuotaManagementTest"  : [
+        "testcase_6001",
+        "testcase_6002",
+        "testcase_6003"
     ]
 }
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index a9b73f7..2cc29fc 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -541,6 +541,7 @@ def start_brokers(systemTestEnv, testcaseEnv):
     for brokerEntityId in brokerEntityIdList:
         start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)
 
+
 def start_console_consumers(systemTestEnv, testcaseEnv, onlyThisEntityId=None):
 
     if onlyThisEntityId is not None:
@@ -552,6 +553,20 @@ def start_console_consumers(systemTestEnv, testcaseEnv, onlyThisEntityId=None):
         for entityId in consoleConsumerEntityIdList:
             start_entity_in_background(systemTestEnv, testcaseEnv, entityId)
 
+def start_jmx_tool_producers(systemTestEnv, testcaseEnv):
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+    jmxToolProducerEntityIdList = system_test_utils.get_data_from_list_of_dicts(
+        clusterEntityConfigDictList, "role", "jmx_tool_producer", "entity_id")
+    for entityId in jmxToolProducerEntityIdList:
+        start_entity_in_background(systemTestEnv, testcaseEnv, entityId)
+
+def start_jmx_tool_consumers(systemTestEnv, testcaseEnv):
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+    jmxToolConsumerEntityIdList = system_test_utils.get_data_from_list_of_dicts(
+        clusterEntityConfigDictList, "role", "jmx_tool_consumer", "entity_id")
+    for entityId in jmxToolConsumerEntityIdList:
+        start_entity_in_background(systemTestEnv, testcaseEnv, entityId)
+
 
 def start_mirror_makers(systemTestEnv, testcaseEnv, onlyThisEntityId=None):
 
@@ -710,6 +725,10 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
     logFile    = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename")
 
     useNewProducer = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "new-producer")
+    targetJmxIP = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "target_jmx_ip")
+    targetJmxPort = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "target_jmx_port")
+    jmxObjectName = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "jmx_object_name")
+    jmxAttributes = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "jmx_attributes")
     mmConsumerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId,
                            "mirror_consumer_config_filename")
     mmProducerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId,
@@ -742,6 +761,19 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
                   configPathName + "/" + configFile + " >> ",
                   logPathName + "/" + logFile + " & echo pid:$! > ",
                   logPathName + "/entity_" + entityId + "_pid'"]
+    elif role == 'jmx_tool_producer' or role == 'jmx_tool_consumer':
+        cmdList = ["ssh " + hostname,
+                  "'JAVA_HOME=" + javaHome,
+                  "JMX_PORT=" + jmxPort,
+                  "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/log4j.properties" % kafkaHome,
+                  kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool",
+                  "--jmx-url service:jmx:rmi:///jndi/rmi://" + targetJmxIP+ ":" + targetJmxPort + "/jmxrmi",
+                  "--reporting-interval 1000",
+                  "--object-name " + jmxObjectName, 
+                  "--attributes " + jmxAttributes + " >> ",
+                  logPathName + "/" + logFile + " & echo pid:$! > ",
+                  logPathName + "/entity_" + entityId + "_pid'"]
+
 
     elif role == "mirror_maker":
         if useNewProducer.lower() == "true":
@@ -802,7 +834,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
         except:
             pass
 
-        props_file_path=write_consumer_properties(consumerProperties)
+        props_file_path=write_consumer_properties(consumerProperties, entityId)
         scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/"
         logger.debug("executing command [" + scpCmdStr + "]", extra=d)
         system_test_utils.sys_call(scpCmdStr)
@@ -825,7 +857,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
                    kafkaHome + "/bin/kafka-run-class.sh kafka.tools.ConsoleConsumer",
                    "--zookeeper " + zkConnectStr,
                    "--topic " + topic,
-                   "--consumer.config /tmp/consumer.properties",
+                   "--consumer.config /tmp/consumer_{0}.properties".format(entityId),
                    "--csv-reporter-enabled",
                    formatterOption,
                    "--from-beginning",
@@ -857,6 +889,10 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
                 testcaseEnv.entityMirrorMakerParentPidDict[entityId] = tokens[1]
             elif role == "console_consumer":
                 testcaseEnv.entityConsoleConsumerParentPidDict[entityId] = tokens[1]
+            elif role == "jmx_tool_producer":
+                testcaseEnv.entityJmxToolProducerParentPidDict[entityId] = tokens[1]
+            elif role == "jmx_tool_consumer":
+                testcaseEnv.entityJmxToolConsumerParentPidDict[entityId] = tokens[1]
 
 
 def start_console_consumer(systemTestEnv, testcaseEnv):
@@ -933,7 +969,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
 
         consumerProperties = {}
         consumerProperties["consumer.timeout.ms"] = timeoutMs
-        props_file_path=write_consumer_properties(consumerProperties)
+        props_file_path=write_consumer_properties(consumerProperties, entityId)
         scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/"
         logger.debug("executing command [" + scpCmdStr + "]", extra=d)
         system_test_utils.sys_call(scpCmdStr)
@@ -944,7 +980,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
                    kafkaRunClassBin + " kafka.tools.ConsoleConsumer",
                    "--zookeeper " + zkConnectStr,
                    "--topic " + topic,
-                   "--consumer.config /tmp/consumer.properties",
+                   "--consumer.config /tmp/consumer_{0}.properties".format(entityId),
                    "--csv-reporter-enabled",
                    #"--metrics-dir " + metricsDir,
                    formatterOption,
@@ -1068,6 +1104,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
     useNewProducer = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "new-producer")
     retryBackoffMs = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-retry-backoff-ms")
     numOfRetries   = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-num-retries")
+    msgSendGapMs   = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message-send-gap-ms")
 
     # for optional properties in testcase_xxxx_properties.json,
     # check the length of returned value for those properties:
@@ -1133,6 +1170,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
                        "--request-num-acks " + requestNumAcks,
                        "--producer-retry-backoff-ms " + retryBackoffMs,
                        "--producer-num-retries " + numOfRetries,
+                       "--message-send-gap-ms " + msgSendGapMs,
                        "--csv-reporter-enabled",
                        "--metrics-dir " + metricsDir,
                        boolArgumentsStr,
@@ -1353,6 +1391,141 @@ def get_message_checksum(logPathName):
 
     return messageChecksumList
 
+def validate_rate_matched_quota(systemTestEnv, testcaseEnv, replicationUtils):
+    logger.info("#### validate_rate_matched_quota", extra=d)
+
+    validationStatusDict = testcaseEnv.validationStatusDict
+    clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
+
+    producerQuotaDefault = float(testcaseEnv.testcaseArgumentsDict["quota.producer.default"])
+    consumerQuotaDefault = float(testcaseEnv.testcaseArgumentsDict["quota.consumer.default"])
+    producerQuota = {}
+    consumerQuota = {}
+
+    producerRate = {}
+    consumerRate = {}
+
+    # initialize producerQuota and consumerQuota
+    for field in testcaseEnv.testcaseArgumentsDict["quota.producer.bytes.per.second.overrides"].split(','):
+        field = field.strip()
+        if len(field) == 0:
+            continue
+        clientId = field.split('=')[0]
+        rate = float(field.split('=')[1])
+        producerQuota[clientId] = rate
+
+    for field in testcaseEnv.testcaseArgumentsDict["quota.consumer.bytes.per.second.overrides"].split(','):
+        field = field.strip()
+        if len(field) == 0:
+            continue
+        clientId = field.split('=')[0]
+        rate = float(field.split('=')[1])
+        consumerQuota[clientId] = rate
+
+    # collect producerRate and consumerRate
+    jmxToolProducerIdList = system_test_utils.get_data_from_list_of_dicts( \
+                    clusterConfigsList, "role", "jmx_tool_producer", "entity_id")
+
+    jmxToolConsumerIdList = system_test_utils.get_data_from_list_of_dicts( \
+                    clusterConfigsList, "role", "jmx_tool_consumer", "entity_id")
+
+    for entityId in jmxToolProducerIdList + jmxToolConsumerIdList:
+        if entityId in jmxToolProducerIdList:
+            role = "jmx_tool_producer"
+        else:
+            role = "jmx_tool_consumer"
+
+        logPath  = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "default")
+        logFile  = system_test_utils.get_data_by_lookup_keyval(
+                   testcaseEnv.testcaseConfigsList, "entity_id", entityId, "log_filename")
+        actualRate = get_rate(logPath + "/" + logFile)
+
+        monitoredClientId = system_test_utils.get_data_by_lookup_keyval(
+                            testcaseEnv.testcaseConfigsList, "entity_id", entityId, "monitored_clientId")
+
+        if role == 'jmx_tool_producer':
+            if monitoredClientId not in producerRate:
+                producerRate[monitoredClientId] = 0
+            producerRate[monitoredClientId] += actualRate
+        elif role == 'jmx_tool_consumer':
+            if monitoredClientId not in consumerRate:
+                consumerRate[monitoredClientId] = 0
+            consumerRate[monitoredClientId] += actualRate
+
+    # validate that rate matches quota per clientId
+    for clientId, rate in producerRate.iteritems():
+        quota = producerQuotaDefault
+        if clientId in producerQuota:
+            quota = producerQuota[clientId]
+
+        validationStatusDict["Expected rate by producer::" + clientId] = quota
+        validationStatusDict["Actual rate by producer::" + clientId] = rate
+
+        misMatchPercentage = abs(quota - rate) * 100.0 / quota
+        testName = "Validate for byte rate matched on producer::" + clientId
+
+        if misMatchPercentage <= replicationUtils.quotaRateMismatchThresholdPercent:
+            validationStatusDict[testName] = "PASSED"
+            logger.info(testName + " passes with quota = " + str(quota) + " and rate = " + str(rate), extra=d)
+        else:
+            validationStatusDict[testName] = "FAILED"
+            logger.error(testName + " failed with quota = " + str(quota) + " and rate = " + str(rate), extra=d)
+
+    for clientId, rate in consumerRate.iteritems():
+        quota = consumerQuotaDefault
+        if clientId in consumerQuota:
+            quota = consumerQuota[clientId]
+
+        validationStatusDict["Expected rate by consumer::" + clientId] = quota
+        validationStatusDict["Actual rate by consumer::" + clientId] = rate
+
+        misMatchPercentage = abs(quota - rate) * 100.0 / quota
+        testName = "Validate for byte rate matched on consumer::" + clientId
+
+        if misMatchPercentage <= replicationUtils.quotaRateMismatchThresholdPercent:
+            validationStatusDict[testName] = "PASSED"
+            logger.info(testName + " passes with quota = " + str(quota) + " and rate = " + str(rate), extra=d)
+        else:
+            validationStatusDict[testName] = "FAILED"
+            logger.error(testName + " failed with quota = " + str(quota) + " and rate = " + str(rate), extra=d)
+
+
+
+
+
+def get_rate(logPathName):
+  lastRate = None
+  lastCount = 0
+
+  rateIndex = None
+  countIndex = None
+
+  for line in open(logPathName, "r"):
+    # skip the first line
+    if "time" in line:
+      line = line.lower().strip()
+      matchObj = re.match("\"time\",\"(.*)\",\"(.*)\"", line)
+
+      if "rate" in matchObj.group(1):
+        rateIndex = 1
+        countIndex = 2
+      elif "rate" in matchObj.group(2):
+        rateIndex = 2
+        countIndex = 1
+      else:
+        raise Exception("rate is not found in either " + fields[1] + " or " + fields[2])
+    else:
+      # Assume the line is in format time,rate,count
+      fields = line.strip().split(',')
+      rate = float(fields[rateIndex])
+      count = float(fields[countIndex])
+      if count > lastCount:
+        lastCount = count
+        lastRate = rate
+
+  return lastRate
+
+
 
 def validate_data_matched(systemTestEnv, testcaseEnv, replicationUtils):
     logger.info("#### Inside validate_data_matched", extra=d)
@@ -1687,6 +1860,12 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv):
     for entityId, zkParentPid in testcaseEnv.entityZkParentPidDict.items():
         stop_remote_entity(systemTestEnv, entityId, zkParentPid)
 
+    for entityId, jmxToolParentPid in testcaseEnv.entityJmxToolProducerParentPidDict.items():
+        stop_remote_entity(systemTestEnv, entityId, jmxToolParentPid)
+
+    for entityId, jmxToolParentPid in testcaseEnv.entityJmxToolConsumerParentPidDict.items():
+        stop_remote_entity(systemTestEnv, entityId, jmxToolParentPid)
+
 
 def start_migration_tool(systemTestEnv, testcaseEnv, onlyThisEntityId=None):
     clusterConfigList = systemTestEnv.clusterEntityConfigDictList
@@ -2501,9 +2680,9 @@ def get_leader_attributes(systemTestEnv, testcaseEnv):
     print leaderDict
     return leaderDict
 
-def write_consumer_properties(consumerProperties):
+def write_consumer_properties(consumerProperties, entityId):
     import tempfile
-    props_file_path = tempfile.gettempdir() + "/consumer.properties"
+    props_file_path = tempfile.gettempdir() + "/consumer_{0}.properties".format(entityId)
     consumer_props_file=open(props_file_path,"w")
     for key,value in consumerProperties.iteritems():
         consumer_props_file.write(key+"="+value+"\n")
diff --git a/system_test/utils/replication_utils.py b/system_test/utils/replication_utils.py
index cfd80b2..806d64d 100644
--- a/system_test/utils/replication_utils.py
+++ b/system_test/utils/replication_utils.py
@@ -68,3 +68,6 @@ class ReplicationUtils(object):
         # Data Loss Percentage Threshold in Ack = 1 cases
         self.ackOneDataLossThresholdPercent = 5.0
 
+        # Quota Rate Mismatch Percentage Threshold
+        self.quotaRateMismatchThresholdPercent = 10.0
+
diff --git a/system_test/utils/testcase_env.py b/system_test/utils/testcase_env.py
index 1d2fb57..8941696 100644
--- a/system_test/utils/testcase_env.py
+++ b/system_test/utils/testcase_env.py
@@ -59,6 +59,18 @@ class TestcaseEnv():
         # { 0: 12345, 1: 12389, ... }
         self.entityConsoleConsumerParentPidDict = {}
 
+        # dictionary of entity_id to ppid for jxm-tool-producer entities
+        # key: entity_id
+        # val: ppid of jmx tool associated to that entity_id
+        # { 0: 12345, 1: 12389, ... }
+        self.entityJmxToolProducerParentPidDict = {}
+
+        # dictionary of entity_id to ppid for jxm-tool-consumer entities
+        # key: entity_id
+        # val: ppid of jmx tool associated to that entity_id
+        # { 0: 12345, 1: 12389, ... }
+        self.entityJmxToolConsumerParentPidDict = {}
+
         # dictionary of entity_id to ppid for migration tool entities
         # key: entity_id
         # val: ppid of broker associated to that entity_id
-- 
1.7.9.5

