From 1a5c40511171c41497f47ffc46478c66d6c30a51 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Wed, 27 May 2015 17:26:31 -0700 Subject: [PATCH] KAFKA-2171; Quotas for Kafka --- .../kafka/clients/producer/internals/Sender.java | 10 + .../scala/kafka/tools/ConsumerPerformance.scala | 2 +- .../transaction_testsuite/cluster_config.json | 103 +++++++ .../config/console_consumer.properties | 2 + .../transaction_testsuite/config/server.properties | 150 +++++++++++ .../config/zookeeper.properties | 23 ++ .../testcase_20001/testcase_20001_properties.json | 141 ++++++++++ .../testcase_20002/testcase_20002_properties.json | 150 +++++++++++ .../testcase_20003/testcase_20003_properties.json | 151 +++++++++++ .../transaction_testsuite/transaction_test.py | 284 ++++++++++++++++++++ system_test/utils/kafka_system_test_utils.py | 154 +++++++++++ system_test/utils/replication_utils.py | 3 + system_test/utils/testcase_env.py | 12 + 13 files changed, 1184 insertions(+), 1 deletion(-) create mode 100644 system_test/transaction_testsuite/cluster_config.json create mode 100644 system_test/transaction_testsuite/config/console_consumer.properties create mode 100644 system_test/transaction_testsuite/config/producer_performance.properties create mode 100644 system_test/transaction_testsuite/config/server.properties create mode 100644 system_test/transaction_testsuite/config/zookeeper.properties create mode 100644 system_test/transaction_testsuite/testcase_20001/testcase_20001_properties.json create mode 100644 system_test/transaction_testsuite/testcase_20002/testcase_20002_properties.json create mode 100644 system_test/transaction_testsuite/testcase_20003/testcase_20003_properties.json create mode 100644 system_test/transaction_testsuite/transaction_test.py diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 1e943d6..4fd4d67 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ProduceRequest; @@ -352,6 +353,7 @@ public class Sender implements Runnable { public final Sensor batchSizeSensor; public final Sensor compressionRateSensor; public final Sensor maxRecordSizeSensor; + public final Sensor bytesSentSensor public SenderMetrics(Metrics metrics) { this.metrics = metrics; @@ -401,6 +403,12 @@ public class Sender implements Runnable { m = new MetricName("record-size-avg", metricGrpName, "The average record size", metricTags); this.maxRecordSizeSensor.add(m, new Avg()); + this.bytesSentSensor = metrics.sensor("producer-rate"); + m = new MetricName("bytes-produced-rate", metricGrpName, "The average number of bytes produced per second", metricTags); + this.bytesSentSensor.add(m, new Rate()); + m = new MetricName("bytes-produced-total", metricGrpName, "The total number of bytes produced", metricTags); + this.bytesSentSensor.add(m, new Total()); + m = new MetricName("requests-in-flight", metricGrpName, "The current number of in-flight requests awaiting a response.", metricTags); this.metrics.addMetric(m, new Measurable() { public double measure(MetricConfig config, long now) { @@ -481,6 +489,8 @@ public class Sender implements Runnable { this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, now); this.compressionRateSensor.record(batch.records.compressionRate()); this.maxRecordSizeSensor.record(batch.maxRecordSize, now); + this.bytesSentSensor.record(batch.records.sizeInBytes(), now); + records += batch.recordCount; } this.recordsPerRequestSensor.record(records, now); diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 903318d..0d0d678 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -196,7 +196,7 @@ object ConsumerPerformance { props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest") props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - props.put("consumer.timeout.ms", "1000") + props.put("consumer.timeout.ms", "10000") props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString) } val numThreads = options.valueOf(numThreadsOpt).intValue diff --git a/system_test/transaction_testsuite/cluster_config.json b/system_test/transaction_testsuite/cluster_config.json new file mode 100644 index 0000000..456c9f0 --- /dev/null +++ b/system_test/transaction_testsuite/cluster_config.json @@ -0,0 +1,103 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9100" + }, + { + "entity_id": "1", + "hostname": "localhost", + "role": "broker", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9101" + }, + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9102" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "broker", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9103" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "broker", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9104" + }, + { + "entity_id": "5", + "hostname": "localhost", + "role": "producer_performance", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9105" + }, + { + "entity_id": "6", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9106" + }, + { + "entity_id": "7", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9107" + }, + { + "entity_id": "8", + "hostname": "localhost", + "role": "jmx_tool_producer", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9108" + }, + { + "entity_id": "9", + "hostname": "localhost", + "role": "jmx_tool_consumer", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9109" + }, + { + "entity_id": "10", + "hostname": "localhost", + "role": "jmx_tool_consumer", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9110" + } + ] +} diff --git a/system_test/transaction_testsuite/config/console_consumer.properties b/system_test/transaction_testsuite/config/console_consumer.properties new file mode 100644 index 0000000..a2ab8b9 --- /dev/null +++ b/system_test/transaction_testsuite/config/console_consumer.properties @@ -0,0 +1,2 @@ +auto.offset.reset=smallest +auto.commit.interval.ms=1000 diff --git a/system_test/transaction_testsuite/config/producer_performance.properties b/system_test/transaction_testsuite/config/producer_performance.properties new file mode 100644 index 0000000..e69de29 diff --git a/system_test/transaction_testsuite/config/server.properties b/system_test/transaction_testsuite/config/server.properties new file mode 100644 index 0000000..a1ee838 --- /dev/null +++ b/system_test/transaction_testsuite/config/server.properties @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned +# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost +# may not be what you want. +#host.name= + + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9091 + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=2 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# The directory under which to store log files +log.dir=/tmp/kafka_server_logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=5 + +# Overrides for for the default given by num.partitions on a per-topic basis +#topic.partition.count.map=topic1:3, topic2:4 + +############################# Log Flush Policy ############################# + +# The following configurations control the flush of data to disk. This is the most +# important performance knob in kafka. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. +# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). +# 3. Throughput: The flush is generally the most expensive operation. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +log.flush.interval.ms=1000 + +# Per-topic overrides for log.flush.interval.ms +#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000 + +# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. +log.flush.scheduler.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 +log.retention.bytes=-1 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +#log.segment.size=536870912 +log.segment.bytes=16384 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.cleanup.interval.mins=1 + +############################# Zookeeper ############################# + +# Enable connecting to zookeeper +enable.zookeeper=true + +# Zk connection string (see zk docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 + +monitoring.period.secs=1 +message.max.bytes=1000000 +queued.max.requests=500 +log.roll.hours=168 +log.index.size.max.bytes=10485760 +log.index.interval.bytes=4096 +auto.create.topics.enable=true +controller.socket.timeout.ms=30000 +controller.message.queue.size=10 +default.replication.factor=1 +replica.lag.time.max.ms=10000 +replica.lag.max.messages=4000 +replica.socket.timeout.ms=30000 +replica.socket.receive.buffer.bytes=65536 +replica.fetch.max.bytes=1048576 +replica.fetch.wait.max.ms=500 +replica.fetch.min.bytes=4096 +num.replica.fetchers=1 + +transaction.topic.num.partitions=2 +transaction.topic.replication.factor=4 +offsets.topic.num.partitions=2 +offsets.topic.replication.factor=4 + +quota.producer.default=20000 +quota.consumer.default=20000 +quota.producer.bytes.per.second.overrides= +quota.consumer.bytes.per.second.overrides= diff --git a/system_test/transaction_testsuite/config/zookeeper.properties b/system_test/transaction_testsuite/config/zookeeper.properties new file mode 100644 index 0000000..adff28b --- /dev/null +++ b/system_test/transaction_testsuite/config/zookeeper.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=/tmp/zookeeper +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 +syncLimit=5 +initLimit=10 +tickTime=2000 diff --git a/system_test/transaction_testsuite/testcase_20001/testcase_20001_properties.json b/system_test/transaction_testsuite/testcase_20001/testcase_20001_properties.json new file mode 100644 index 0000000..0793eab --- /dev/null +++ b/system_test/transaction_testsuite/testcase_20001/testcase_20001_properties.json @@ -0,0 +1,141 @@ +{ + "description": {"01":"To Test : 'Quota management test.'", + "02":"Set up a Zk and Kafka cluster.", + "03":"Produce messages to a single topics", + "04":"Start JmxTool to read byte rate of producer", + "05":"Stop JmxTool after producer stops", + "06":"Start consumers.", + "07":"Start JmxTool to read byte rate of consumers", + "08":"Stop JmxTool after consumers stop", + "09":"Verify that there are no duplicate messages or lost messages on any consumer group.", + "10":"Verify that the recorded rate is close to quota", + "11":"Producer dimensions : mode:async, acks:-1, comp:0" + }, + "testcase_args": { + "bounce_leaders": "true", + "replica_factor": "2", + "num_partition": "3", + "topic": "topic_test", + "num_iteration": "0", + "sleep_seconds_between_producer_calls": "1", + "message_producing_free_time_sec": "60", + "message_consuming_free_time_sec": "300", + "num_messages_to_produce_per_producer_call": "50", + "num_topics_for_auto_generated_string": "-1", + "quota.producer.default": "20000", + "quota.consumer.default": "20000", + "quota.producer.bytes.per.second.overrides": "", + "quota.consumer.bytes.per.second.overrides": "" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2108", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_0.log", + "config_filename": "zookeeper_0.properties" + }, + { + "entity_id": "1", + "port": "9091", + "broker.id": "1", + "log.segment.bytes": "16384", + "log.dir": "/tmp/kafka_server_1_logs", + "default.replication.factor": "2", + "num.partitions": "3", + "log_filename": "kafka_server_1.log", + "config_filename": "kafka_server_1.properties" + }, + { + "entity_id": "2", + "port": "9092", + "broker.id": "2", + "log.segment.bytes": "16384", + "log.dir": "/tmp/kafka_server_2_logs", + "default.replication.factor": "2", + "num.partitions": "3", + "log_filename": "kafka_server_2.log", + "config_filename": "kafka_server_2.properties" + }, + { + "entity_id": "3", + "port": "9093", + "broker.id": "3", + "log.segment.bytes": "16384", + "log.dir": "/tmp/kafka_server_3_logs", + "default.replication.factor": "2", + "num.partitions": "3", + "log_filename": "kafka_server_3.log", + "config_filename": "kafka_server_3.properties" + }, + { + "entity_id": "4", + "port": "9094", + "broker.id": "4", + "log.segment.bytes": "16384", + "log.dir": "/tmp/kafka_server_4_logs", + "default.replication.factor": "2", + "num.partitions": "3", + "log_filename": "kafka_server_4.log", + "config_filename": "kafka_server_4.properties" + }, + { + "entity_id": "5", + "topic": "topic_test", + "threads": "1", + "compression-codec": "0", + "message-size": "2000", + "message": "2000", + "new-producer": "true", + "request-num-acks": "-1", + "message-send-gap-ms": "0", + "sync": "false", + "producer-num-retries": "5", + "log_filename": "producer_performance.log", + "config_filename": "producer_performance_5.properties" + }, + { + "entity_id": "6", + "topic": "topic_test", + "group.id": "group1", + "consumer-timeout-ms": "30000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer_6.properties" + }, + { + "entity_id": "7", + "topic": "topic_test", + "group.id": "group2", + "consumer-timeout-ms": "30000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer_7.properties" + }, + { + "entity_id": "8", + "log_filename": "jmx_tool.log", + "target_jmx_ip": "127.0.0.1", + "target_jmx_port": "9105", + "jmx_object_name": "kafka.producer:type=producer-metrics,client-id=producer-performance", + "jmx_attributes": "bytes-produced-rate,bytes-produced-total", + "monitored_clientId": "producer-performance" + }, + { + "entity_id": "9", + "log_filename": "jmx_tool.log", + "target_jmx_ip": "127.0.0.1", + "target_jmx_port": "9106", + "jmx_object_name": "kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=group1", + "jmx_attributes": "OneMinuteRate,Count", + "monitored_clientId": "group1" + }, + { + "entity_id": "10", + "log_filename": "jmx_tool.log", + "target_jmx_ip": "127.0.0.1", + "target_jmx_port": "9107", + "jmx_object_name": "kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=group2", + "jmx_attributes": "OneMinuteRate,Count", + "monitored_clientId": "group2" + } + ] +} diff --git a/system_test/transaction_testsuite/testcase_20002/testcase_20002_properties.json b/system_test/transaction_testsuite/testcase_20002/testcase_20002_properties.json new file mode 100644 index 0000000..f7bda8a --- /dev/null +++ b/system_test/transaction_testsuite/testcase_20002/testcase_20002_properties.json @@ -0,0 +1,150 @@ +{ + "description": {"01":"To Test : 'Quota management test.'", + "02":"Set up a Zk and Kafka cluster.", + "03":"Produce messages to a single topics", + "04":"Start JmxTool to read byte rate of producer", + "05":"Stop JmxTool after producer stops", + "06":"Start consumers.", + "07":"Start JmxTool to read byte rate of consumers", + "08":"Stop JmxTool after consumers stop", + "09":"Verify that there are no duplicate messages or lost messages on any consumer group.", + "10":"Verify that the recorded rate is close to quota", + "11":"Producer dimensions : mode:async, acks:-1, comp:0", + "12":"Override quota for producer and one of the consumer" + }, + "testcase_args": { + "bounce_leaders": "true", + "replica_factor": "2", + "num_partition": "3", + "topic": "topic_test", + "num_iteration": "0", + "sleep_seconds_between_producer_calls": "1", + "message_producing_free_time_sec": "60", + "message_consuming_free_time_sec": "300", + "num_messages_to_produce_per_producer_call": "50", + "num_topics_for_auto_generated_string": "-1", + "quota.producer.default": "20000", + "quota.consumer.default": "20000", + "quota.producer.bytes.per.second.overrides": "producer-performance=15000", + "quota.consumer.bytes.per.second.overrides": "group1=15000" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2108", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_0.log", + "config_filename": "zookeeper_0.properties" + }, + { + "entity_id": "1", + "port": "9091", + "broker.id": "1", + "log.segment.bytes": "16384", + "log.dir": "/tmp/kafka_server_1_logs", + "default.replication.factor": "2", + "num.partitions": "3", + "log_filename": "kafka_server_1.log", + "config_filename": "kafka_server_1.properties", + "quota.producer.bytes.per.second.overrides": "producer-performance=15000", + "quota.consumer.bytes.per.second.overrides": "group1=15000" + }, + { + "entity_id": "2", + "port": "9092", + "broker.id": "2", + "log.segment.bytes": "16384", + "log.dir": "/tmp/kafka_server_2_logs", + "default.replication.factor": "2", + "num.partitions": "3", + "log_filename": "kafka_server_2.log", + "config_filename": "kafka_server_2.properties", + "quota.producer.bytes.per.second.overrides": "producer-performance=15000", + "quota.consumer.bytes.per.second.overrides": "group1=15000" + }, + { + "entity_id": "3", + "port": "9093", + "broker.id": "3", + "log.segment.bytes": "16384", + "log.dir": "/tmp/kafka_server_3_logs", + "default.replication.factor": "2", + "num.partitions": "3", + "log_filename": "kafka_server_3.log", + "config_filename": "kafka_server_3.properties", + "quota.producer.bytes.per.second.overrides": "producer-performance=15000", + "quota.consumer.bytes.per.second.overrides": "group1=15000" + }, + { + "entity_id": "4", + "port": "9094", + "broker.id": "4", + "log.segment.bytes": "16384", + "log.dir": "/tmp/kafka_server_4_logs", + "default.replication.factor": "2", + "num.partitions": "3", + "log_filename": "kafka_server_4.log", + "config_filename": "kafka_server_4.properties", + "quota.producer.bytes.per.second.overrides": "producer-performance=15000", + "quota.consumer.bytes.per.second.overrides": "group1=15000" + }, + { + "entity_id": "5", + "topic": "topic_test", + "threads": "1", + "compression-codec": "0", + "message-size": "2000", + "message": "2000", + "new-producer": "true", + "request-num-acks": "-1", + "message-send-gap-ms": "0", + "sync": "false", + "producer-num-retries": "5", + "log_filename": "producer_performance.log", + "config_filename": "producer_performance_5.properties" + }, + { + "entity_id": "6", + "topic": "topic_test", + "group.id": "group1", + "consumer-timeout-ms": "30000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer_6.properties" + }, + { + "entity_id": "7", + "topic": "topic_test", + "group.id": "group2", + "consumer-timeout-ms": "30000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer_7.properties" + }, + { + "entity_id": "8", + "log_filename": "jmx_tool.log", + "target_jmx_ip": "127.0.0.1", + "target_jmx_port": "9105", + "jmx_object_name": "kafka.producer:type=producer-metrics,client-id=producer-performance", + "jmx_attributes": "bytes-produced-rate,bytes-produced-total", + "monitored_clientId": "producer-performance" + }, + { + "entity_id": "9", + "log_filename": "jmx_tool.log", + "target_jmx_ip": "127.0.0.1", + "target_jmx_port": "9106", + "jmx_object_name": "kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=group1", + "jmx_attributes": "OneMinuteRate,Count", + "monitored_clientId": "group1" + }, + { + "entity_id": "10", + "log_filename": "jmx_tool.log", + "target_jmx_ip": "127.0.0.1", + "target_jmx_port": "9107", + "jmx_object_name": "kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=group2", + "jmx_attributes": "OneMinuteRate,Count", + "monitored_clientId": "group2" + } + ] +} diff --git a/system_test/transaction_testsuite/testcase_20003/testcase_20003_properties.json b/system_test/transaction_testsuite/testcase_20003/testcase_20003_properties.json new file mode 100644 index 0000000..1bdcfc5 --- /dev/null +++ b/system_test/transaction_testsuite/testcase_20003/testcase_20003_properties.json @@ -0,0 +1,151 @@ +{ + "description": {"01":"To Test : 'Quota management test.'", + "02":"Set up a Zk and Kafka cluster.", + "03":"Produce messages to a single topics", + "04":"Start JmxTool to read byte rate of producer", + "05":"Stop JmxTool after producer stops", + "06":"Start consumers.", + "07":"Start JmxTool to read byte rate of consumers", + "08":"Stop JmxTool after consumers stop", + "09":"Verify that there are no duplicate messages or lost messages on any consumer group.", + "10":"Verify that the recorded rate is close to quota", + "11":"Producer dimensions : mode:async, acks:-1, comp:0", + "12":"Override quota for producer and consumers", + "13":"consumers share the same clientId and groupId" + }, + "testcase_args": { + "bounce_leaders": "true", + "replica_factor": "2", + "num_partition": "3", + "topic": "topic_test", + "num_iteration": "0", + "sleep_seconds_between_producer_calls": "1", + "message_producing_free_time_sec": "60", + "message_consuming_free_time_sec": "300", + "num_messages_to_produce_per_producer_call": "50", + "num_topics_for_auto_generated_string": "-1", + "quota.producer.default": "20000", + "quota.consumer.default": "20000", + "quota.producer.bytes.per.second.overrides": "producer-performance=15000", + "quota.consumer.bytes.per.second.overrides": "group1=15000" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2108", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_0.log", + "config_filename": "zookeeper_0.properties" + }, + { + "entity_id": "1", + "port": "9091", + "broker.id": "1", + "log.segment.bytes": "16384", + "log.dir": "/tmp/kafka_server_1_logs", + "default.replication.factor": "2", + "num.partitions": "3", + "log_filename": "kafka_server_1.log", + "config_filename": "kafka_server_1.properties", + "quota.producer.bytes.per.second.overrides": "producer-performance=15000", + "quota.consumer.bytes.per.second.overrides": "group1=15000" + }, + { + "entity_id": "2", + "port": "9092", + "broker.id": "2", + "log.segment.bytes": "16384", + "log.dir": "/tmp/kafka_server_2_logs", + "default.replication.factor": "2", + "num.partitions": "3", + "log_filename": "kafka_server_2.log", + "config_filename": "kafka_server_2.properties", + "quota.producer.bytes.per.second.overrides": "producer-performance=15000", + "quota.consumer.bytes.per.second.overrides": "group1=15000" + }, + { + "entity_id": "3", + "port": "9093", + "broker.id": "3", + "log.segment.bytes": "16384", + "log.dir": "/tmp/kafka_server_3_logs", + "default.replication.factor": "2", + "num.partitions": "3", + "log_filename": "kafka_server_3.log", + "config_filename": "kafka_server_3.properties", + "quota.producer.bytes.per.second.overrides": "producer-performance=15000", + "quota.consumer.bytes.per.second.overrides": "group1=15000" + }, + { + "entity_id": "4", + "port": "9094", + "broker.id": "4", + "log.segment.bytes": "16384", + "log.dir": "/tmp/kafka_server_4_logs", + "default.replication.factor": "2", + "num.partitions": "3", + "log_filename": "kafka_server_4.log", + "config_filename": "kafka_server_4.properties", + "quota.producer.bytes.per.second.overrides": "producer-performance=15000", + "quota.consumer.bytes.per.second.overrides": "group1=15000" + }, + { + "entity_id": "5", + "topic": "topic_test", + "threads": "1", + "compression-codec": "0", + "message-size": "2000", + "message": "2000", + "new-producer": "true", + "request-num-acks": "-1", + "message-send-gap-ms": "0", + "sync": "false", + "producer-num-retries": "5", + "log_filename": "producer_performance.log", + "config_filename": "producer_performance_5.properties" + }, + { + "entity_id": "6", + "topic": "topic_test", + "group.id": "group1", + "consumer-timeout-ms": "30000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer_6.properties" + }, + { + "entity_id": "7", + "topic": "topic_test", + "group.id": "group1", + "consumer-timeout-ms": "30000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer_7.properties" + }, + { + "entity_id": "8", + "log_filename": "jmx_tool.log", + "target_jmx_ip": "127.0.0.1", + "target_jmx_port": "9105", + "jmx_object_name": "kafka.producer:type=producer-metrics,client-id=producer-performance", + "jmx_attributes": "bytes-produced-rate,bytes-produced-total", + "monitored_clientId": "producer-performance" + }, + { + "entity_id": "9", + "log_filename": "jmx_tool.log", + "target_jmx_ip": "127.0.0.1", + "target_jmx_port": "9106", + "jmx_object_name": "kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=group1", + "jmx_attributes": "OneMinuteRate,Count", + "monitored_clientId": "group1" + }, + { + "entity_id": "10", + "log_filename": "jmx_tool.log", + "target_jmx_ip": "127.0.0.1", + "target_jmx_port": "9107", + "jmx_object_name": "kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=group1", + "jmx_attributes": "OneMinuteRate,Count", + "monitored_clientId": "group1" + } + ] +} diff --git a/system_test/transaction_testsuite/transaction_test.py b/system_test/transaction_testsuite/transaction_test.py new file mode 100644 index 0000000..d92fa4d --- /dev/null +++ b/system_test/transaction_testsuite/transaction_test.py @@ -0,0 +1,284 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#!/usr/bin/env python + +# =================================== +# transaction_management_test.py +# =================================== + +import os +import signal +import sys +import time +import traceback +import random + +from system_test_env import SystemTestEnv +sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR) + +from setup_utils import SetupUtils +from replication_utils import ReplicationUtils +import system_test_utils +from testcase_env import TestcaseEnv + +# product specific: Kafka +import kafka_system_test_utils +import metrics + +class TransactionManagementTest(ReplicationUtils, SetupUtils): + + testModuleAbsPathName = os.path.realpath(__file__) + testSuiteAbsPathName = os.path.abspath(os.path.dirname(testModuleAbsPathName)) + + def __init__(self, systemTestEnv): + + # SystemTestEnv - provides cluster level environment settings + # such as entity_id, hostname, kafka_home, java_home which + # are available in a list of dictionary named + # "clusterEntityConfigDictList" + self.systemTestEnv = systemTestEnv + + super(TransactionManagementTest, self).__init__(self) + + # dict to pass user-defined attributes to logger argument: "extra" + d = {'name_of_class': self.__class__.__name__} + + def signal_handler(self, signal, frame): + self.log_message("Interrupt detected - User pressed Ctrl+c") + + # perform the necessary cleanup here when user presses Ctrl+c and it may be product specific + self.log_message("stopping all entities - please wait ...") + kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv) + sys.exit(1) + + def runTest(self): + + # ====================================================================== + # get all testcase directories under this testsuite + # ====================================================================== + testCasePathNameList = system_test_utils.get_dir_paths_with_prefix( + self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX) + testCasePathNameList.sort() + + replicationUtils = ReplicationUtils(self) + + # ============================================================= + # launch each testcase one by one: testcase_1, testcase_2, ... + # ============================================================= + for testCasePathName in testCasePathNameList: + + skipThisTestCase = False + + try: + # ====================================================================== + # A new instance of TestcaseEnv to keep track of this testcase's env vars + # and initialize some env vars as testCasePathName is available now + # ====================================================================== + self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self) + self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName + self.testcaseEnv.initWithKnownTestCasePathName(testCasePathName) + self.testcaseEnv.testcaseArgumentsDict = self.testcaseEnv.testcaseNonEntityDataDict["testcase_args"] + + # ====================================================================== + # SKIP if this case is IN testcase_to_skip.json or NOT IN testcase_to_run.json + # ====================================================================== + testcaseDirName = self.testcaseEnv.testcaseResultsDict["_test_case_name"] + + if self.systemTestEnv.printTestDescriptionsOnly: + self.testcaseEnv.printTestCaseDescription(testcaseDirName) + continue + elif self.systemTestEnv.isTestCaseToSkip(self.__class__.__name__, testcaseDirName): + self.log_message("Skipping : " + testcaseDirName) + skipThisTestCase = True + continue + else: + self.testcaseEnv.printTestCaseDescription(testcaseDirName) + system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName) + + # ============================================================================== # + # ============================================================================== # + # Product Specific Testing Code Starts Here: # + # ============================================================================== # + # ============================================================================== # + + # initialize self.testcaseEnv with user-defined environment variables (product specific) + self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = False + self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False + + # initialize signal handler + signal.signal(signal.SIGINT, self.signal_handler) + + # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file: + # system_test/_testsuite/testcase_/testcase__properties.json + self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data( + self.testcaseEnv.testcasePropJsonPathName) + + # clean up data directories specified in zookeeper.properties and kafka_server_.properties + kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase + # for collecting logs from remote machines + kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv) + + # TestcaseEnv - initialize producer & consumer config / log file pathnames + kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv) + + # generate remote hosts log/config dirs if not exist + kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # generate properties files for zookeeper, kafka, producer, and consumer: + # 1. copy system_test/_testsuite/config/*.properties to + # system_test/_testsuite/testcase_/config/ + # 2. update all properties files in system_test/_testsuite/testcase_/config + # by overriding the settings specified in: + # system_test/_testsuite/testcase_/testcase__properties.json + kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName, + self.testcaseEnv, self.systemTestEnv) + + # ============================================= + # preparing all entities to start the test + # ============================================= + self.log_message("starting zookeepers") + kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 2s") + time.sleep(2) + + self.log_message("starting brokers") + kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 5s") + time.sleep(5) + + self.log_message("creating test topic") + kafka_system_test_utils.create_topic_for_producer_performance(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 5s") + time.sleep(5) + + # ============================================= + # starting producer + # ============================================= + self.log_message("starting producer in the background") + kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv, False) + msgProducingFreeTimeSec = self.testcaseEnv.testcaseArgumentsDict["message_producing_free_time_sec"] + self.anonLogger.info("sleeping for 2s") + time.sleep(2) + + # ============================================= + # starting JmxTool to read producer rate + # ============================================= + self.log_message("starting JmxTool to read producer rate") + kafka_system_test_utils.start_jmx_tool_producers(self.systemTestEnv, self.testcaseEnv) + + self.anonLogger.info("sleeping for " + msgProducingFreeTimeSec + " sec to produce some messages") + time.sleep(int(msgProducingFreeTimeSec)) + + # ============================================= + # tell producer to stop + # ============================================= + self.log_message("stopping background producer") + self.testcaseEnv.lock.acquire() + self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True + time.sleep(1) + self.testcaseEnv.lock.release() + time.sleep(1) + + # ============================================= + # wait for producer thread's update of + # "backgroundProducerStopped" to be "True" + # ============================================= + while 1: + self.testcaseEnv.lock.acquire() + self.logger.info("status of backgroundProducerStopped : [" + \ + str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d) + if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]: + time.sleep(1) + self.logger.info("all producer threads completed", extra=self.d) + break + time.sleep(1) + self.testcaseEnv.lock.release() + time.sleep(2) + + # ============================================= + # stopping JmxTool from reading producer rate + # ============================================= + self.log_message("stopping JmxTool from reading producer rate") + for entityId, parentPid in self.testcaseEnv.entityJmxToolProducerParentPidDict.items(): + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + + # ============================================= + # starting consumer + # ============================================= + self.log_message("starting console consumer") + kafka_system_test_utils.start_console_consumers(self.systemTestEnv, self.testcaseEnv) + msgConsumingFreeTimeSec = self.testcaseEnv.testcaseArgumentsDict["message_consuming_free_time_sec"] + self.anonLogger.info("sleeping for 2s") + time.sleep(2) + + # ============================================= + # starting JmxTool to read consumer rate + # ============================================= + self.log_message("starting JmxTool to read consumer rate") + kafka_system_test_utils.start_jmx_tool_consumers(self.systemTestEnv, self.testcaseEnv) + + self.anonLogger.info("sleeping for " + msgConsumingFreeTimeSec + " sec to consume some messages") + time.sleep(int(msgConsumingFreeTimeSec)) + + # ============================================= + # stopping JmxTool from reading consumer rate + # ============================================= + self.log_message("stopping JmxTool from reading consumer rate") + for entityId, parentPid in self.testcaseEnv.entityJmxToolConsumerParentPidDict.items(): + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + + + # ============================================= + # this testcase is completed - stop all entities + # ============================================= + self.log_message("stopping all entities") + for entityId, parentPid in self.testcaseEnv.entityBrokerParentPidDict.items(): + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + + for entityId, parentPid in self.testcaseEnv.entityZkParentPidDict.items(): + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + + # make sure all entities are stopped + kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv) + + # ============================================= + # collect logs from remote hosts + # ============================================= + kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # ============================================= + # validate the data matched and checksum + # ============================================= + self.log_message("validating data matched") + kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv, replicationUtils) + + # ============================================= + # validate that the rate matches the quota + # ============================================= + self.log_message("validating data matched") + kafka_system_test_utils.validate_rate_matched_quota(self.systemTestEnv, self.testcaseEnv, replicationUtils) + + except Exception as e: + self.log_message("Exception while running test {0}".format(e)) + traceback.print_exc() + + finally: + if not skipThisTestCase and not self.systemTestEnv.printTestDescriptionsOnly: + self.log_message("stopping all entities - please wait ...") + kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv) diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index a9b73f7..1d7d86d 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -541,6 +541,7 @@ def start_brokers(systemTestEnv, testcaseEnv): for brokerEntityId in brokerEntityIdList: start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId) + def start_console_consumers(systemTestEnv, testcaseEnv, onlyThisEntityId=None): if onlyThisEntityId is not None: @@ -552,6 +553,20 @@ def start_console_consumers(systemTestEnv, testcaseEnv, onlyThisEntityId=None): for entityId in consoleConsumerEntityIdList: start_entity_in_background(systemTestEnv, testcaseEnv, entityId) +def start_jmx_tool_producers(systemTestEnv, testcaseEnv): + clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + jmxToolProducerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + clusterEntityConfigDictList, "role", "jmx_tool_producer", "entity_id") + for entityId in jmxToolProducerEntityIdList: + start_entity_in_background(systemTestEnv, testcaseEnv, entityId) + +def start_jmx_tool_consumers(systemTestEnv, testcaseEnv): + clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + jmxToolConsumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + clusterEntityConfigDictList, "role", "jmx_tool_consumer", "entity_id") + for entityId in jmxToolConsumerEntityIdList: + start_entity_in_background(systemTestEnv, testcaseEnv, entityId) + def start_mirror_makers(systemTestEnv, testcaseEnv, onlyThisEntityId=None): @@ -710,6 +725,10 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): logFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename") useNewProducer = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "new-producer") + targetJmxIP = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "target_jmx_ip") + targetJmxPort = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "target_jmx_port") + jmxObjectName = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "jmx_object_name") + jmxAttributes = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "jmx_attributes") mmConsumerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "mirror_consumer_config_filename") mmProducerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, @@ -742,6 +761,19 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): configPathName + "/" + configFile + " >> ", logPathName + "/" + logFile + " & echo pid:$! > ", logPathName + "/entity_" + entityId + "_pid'"] + elif role == 'jmx_tool_producer' or role == 'jmx_tool_consumer': + cmdList = ["ssh " + hostname, + "'JAVA_HOME=" + javaHome, + "JMX_PORT=" + jmxPort, + "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/log4j.properties" % kafkaHome, + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool", + "--jmx-url service:jmx:rmi:///jndi/rmi://" + targetJmxIP+ ":" + targetJmxPort + "/jmxrmi", + "--reporting-interval 1000", + "--object-name " + jmxObjectName, + "--attributes " + jmxAttributes + " >> ", + logPathName + "/" + logFile + " & echo pid:$! > ", + logPathName + "/entity_" + entityId + "_pid'"] + elif role == "mirror_maker": if useNewProducer.lower() == "true": @@ -857,6 +889,10 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): testcaseEnv.entityMirrorMakerParentPidDict[entityId] = tokens[1] elif role == "console_consumer": testcaseEnv.entityConsoleConsumerParentPidDict[entityId] = tokens[1] + elif role == "jmx_tool_producer": + testcaseEnv.entityJmxToolProducerParentPidDict[entityId] = tokens[1] + elif role == "jmx_tool_consumer": + testcaseEnv.entityJmxToolConsumerParentPidDict[entityId] = tokens[1] def start_console_consumer(systemTestEnv, testcaseEnv): @@ -1068,6 +1104,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk useNewProducer = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "new-producer") retryBackoffMs = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-retry-backoff-ms") numOfRetries = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-num-retries") + msgSendGapMs = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message-send-gap-ms") # for optional properties in testcase_xxxx_properties.json, # check the length of returned value for those properties: @@ -1133,6 +1170,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk "--request-num-acks " + requestNumAcks, "--producer-retry-backoff-ms " + retryBackoffMs, "--producer-num-retries " + numOfRetries, + "--message-send-gap-ms " + msgSendGapMs, "--csv-reporter-enabled", "--metrics-dir " + metricsDir, boolArgumentsStr, @@ -1353,6 +1391,116 @@ def get_message_checksum(logPathName): return messageChecksumList +def validate_rate_matched_quota(systemTestEnv, testcaseEnv, replicationUtils): + logger.info("#### validate_rate_matched_quota", extra=d) + + validationStatusDict = testcaseEnv.validationStatusDict + clusterConfigsList = systemTestEnv.clusterEntityConfigDictList + + producerQuotaDefault = float(testcaseEnv.testcaseArgumentsDict["quota.producer.default"]) + consumerQuotaDefault = float(testcaseEnv.testcaseArgumentsDict["quota.consumer.default"]) + producerQuota = {} + consumerQuota = {} + + for field in testcaseEnv.testcaseArgumentsDict["quota.producer.bytes.per.second.overrides"].split(','): + field = field.strip() + if len(field) == 0: + continue + clientId = field.split('=')[0] + rate = float(field.split('=')[1]) + producerQuota[clientId] = rate + + for field in testcaseEnv.testcaseArgumentsDict["quota.consumer.bytes.per.second.overrides"].split(','): + field = field.strip() + if len(field) == 0: + continue + clientId = field.split('=')[0] + rate = float(field.split('=')[1]) + consumerQuota[clientId] = rate + + jmxToolProducerIdList = system_test_utils.get_data_from_list_of_dicts( \ + clusterConfigsList, "role", "jmx_tool_producer", "entity_id") + + jmxToolConsumerIdList = system_test_utils.get_data_from_list_of_dicts( \ + clusterConfigsList, "role", "jmx_tool_consumer", "entity_id") + + for entityId in jmxToolProducerIdList + jmxToolConsumerIdList: + if entityId in jmxToolProducerIdList: + role = "jmx_tool_producer" + else: + role = "jmx_tool_consumer" + + logPath = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "default") + logFile = system_test_utils.get_data_by_lookup_keyval( + testcaseEnv.testcaseConfigsList, "entity_id", entityId, "log_filename") + actualRate = get_rate(logPath + "/" + logFile) + + monitoredClientId = system_test_utils.get_data_by_lookup_keyval( + testcaseEnv.testcaseConfigsList, "entity_id", entityId, "monitored_clientId") + expectedRate = 0 + if role == "jmx_tool_producer": + if monitoredClientId in producerQuota: + expectedRate = producerQuota[monitoredClientId] + else: + expectedRate = producerQuotaDefault + else: + if monitoredClientId in consumerQuota: + expectedRate = consumerQuota[monitoredClientId] + else: + expectedRate = consumerQuotaDefault + + validationStatusDict["Expected rate by " + role[9:] + "::" + monitoredClientId] = expectedRate + validationStatusDict["Actual rate by " + role[9:] + "::" + monitoredClientId] = actualRate + + misMatchPercentage = abs(expectedRate - actualRate) * 100.0 / expectedRate + testName = "Validate for byte rate matched on role::clientId [" + \ + role[9:] + "::" + monitoredClientId + "]" + + if misMatchPercentage <= replicationUtils.quotaRateMismatchThresholdPercent: + validationStatusDict[testName] = "PASSED" + logger.info(testName + " passes with quota = " + \ + str(expectedRate) + " and rate = " + str(actualRate), extra=d) + else: + validationStatusDict[testName] = "FAILED" + logger.error(testName + " failed with quota = " + \ + str(expectedRate) + " and rate = " + str(actualRate), extra=d) + + + + +def get_rate(logPathName): + lastRate = None + lastCount = 0 + + rateIndex = None + countIndex = None + + for line in open(logPathName, "r"): + # skip the first line + if "time" in line: + line = line.lower().strip() + matchObj = re.match("\"time\",\"(.*)\",\"(.*)\"", line) + + if "rate" in matchObj.group(1): + rateIndex = 1 + countIndex = 2 + elif "rate" in matchObj.group(2): + rateIndex = 2 + countIndex = 1 + else: + raise Exception("rate is not found in either " + fields[1] + " or " + fields[2]) + else: + # Assume the line is in format time,rate,count + fields = line.strip().split(',') + rate = float(fields[rateIndex]) + count = float(fields[countIndex]) + if count > lastCount: + lastCount = count + lastRate = rate + + return lastRate + + def validate_data_matched(systemTestEnv, testcaseEnv, replicationUtils): logger.info("#### Inside validate_data_matched", extra=d) @@ -1687,6 +1835,12 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv): for entityId, zkParentPid in testcaseEnv.entityZkParentPidDict.items(): stop_remote_entity(systemTestEnv, entityId, zkParentPid) + for entityId, jmxToolParentPid in testcaseEnv.entityJmxToolProducerParentPidDict.items(): + stop_remote_entity(systemTestEnv, entityId, jmxToolParentPid) + + for entityId, jmxToolParentPid in testcaseEnv.entityJmxToolConsumerParentPidDict.items(): + stop_remote_entity(systemTestEnv, entityId, jmxToolParentPid) + def start_migration_tool(systemTestEnv, testcaseEnv, onlyThisEntityId=None): clusterConfigList = systemTestEnv.clusterEntityConfigDictList diff --git a/system_test/utils/replication_utils.py b/system_test/utils/replication_utils.py index cfd80b2..806d64d 100644 --- a/system_test/utils/replication_utils.py +++ b/system_test/utils/replication_utils.py @@ -68,3 +68,6 @@ class ReplicationUtils(object): # Data Loss Percentage Threshold in Ack = 1 cases self.ackOneDataLossThresholdPercent = 5.0 + # Quota Rate Mismatch Percentage Threshold + self.quotaRateMismatchThresholdPercent = 10.0 + diff --git a/system_test/utils/testcase_env.py b/system_test/utils/testcase_env.py index 1d2fb57..8941696 100644 --- a/system_test/utils/testcase_env.py +++ b/system_test/utils/testcase_env.py @@ -59,6 +59,18 @@ class TestcaseEnv(): # { 0: 12345, 1: 12389, ... } self.entityConsoleConsumerParentPidDict = {} + # dictionary of entity_id to ppid for jxm-tool-producer entities + # key: entity_id + # val: ppid of jmx tool associated to that entity_id + # { 0: 12345, 1: 12389, ... } + self.entityJmxToolProducerParentPidDict = {} + + # dictionary of entity_id to ppid for jxm-tool-consumer entities + # key: entity_id + # val: ppid of jmx tool associated to that entity_id + # { 0: 12345, 1: 12389, ... } + self.entityJmxToolConsumerParentPidDict = {} + # dictionary of entity_id to ppid for migration tool entities # key: entity_id # val: ppid of broker associated to that entity_id -- 1.7.9.5