diff --git a/system_test/clean_shutdown_testsuite/__init__.py b/system_test/clean_shutdown_testsuite/__init__.py new file mode 100644 index 0000000..8d1c8b6 --- /dev/null +++ b/system_test/clean_shutdown_testsuite/__init__.py @@ -0,0 +1 @@ + diff --git a/system_test/clean_shutdown_testsuite/clean_shutdown_test.py b/system_test/clean_shutdown_testsuite/clean_shutdown_test.py new file mode 100644 index 0000000..957afad --- /dev/null +++ b/system_test/clean_shutdown_testsuite/clean_shutdown_test.py @@ -0,0 +1,323 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#!/usr/bin/env python + +# =================================== +# clean_shutdown_test.py +# =================================== + +import inspect +import logging +import os +import pprint +import signal +import subprocess +import sys +import time +import traceback + +from system_test_env import SystemTestEnv +sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR) + +from setup_utils import SetupUtils +from replication_utils import ReplicationUtils +import system_test_utils +from testcase_env import TestcaseEnv + +# product specific: Kafka +import kafka_system_test_utils +import metrics + +class CleanShutdownTest(ReplicationUtils, SetupUtils): + + testModuleAbsPathName = os.path.realpath(__file__) + testSuiteAbsPathName = os.path.abspath(os.path.dirname(testModuleAbsPathName)) + + def __init__(self, systemTestEnv): + + # SystemTestEnv - provides cluster level environment settings + # such as entity_id, hostname, kafka_home, java_home which + # are available in a list of dictionary named + # "clusterEntityConfigDictList" + self.systemTestEnv = systemTestEnv + + super(CleanShutdownTest, self).__init__(self) + + # dict to pass user-defined attributes to logger argument: "extra" + d = {'name_of_class': self.__class__.__name__} + + def signal_handler(self, signal, frame): + self.log_message("Interrupt detected - User pressed Ctrl+c") + + # perform the necessary cleanup here when user presses Ctrl+c and it may be product specific + self.log_message("stopping all entities - please wait ...") + kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv) + sys.exit(1) + + def runTest(self): + + # ====================================================================== + # get all testcase directories under this testsuite + # ====================================================================== + testCasePathNameList = system_test_utils.get_dir_paths_with_prefix( + self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX) + testCasePathNameList.sort() + + # ============================================================= + # launch each testcase one by one: testcase_1, testcase_2, ... + # ============================================================= + for testCasePathName in testCasePathNameList: + + skipThisTestCase = False + + try: + # ====================================================================== + # A new instance of TestcaseEnv to keep track of this testcase's env vars + # and initialize some env vars as testCasePathName is available now + # ====================================================================== + self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self) + self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName + self.testcaseEnv.initWithKnownTestCasePathName(testCasePathName) + self.testcaseEnv.testcaseArgumentsDict = self.testcaseEnv.testcaseNonEntityDataDict["testcase_args"] + + # ====================================================================== + # SKIP if this case is IN testcase_to_skip.json or NOT IN testcase_to_run.json + # ====================================================================== + testcaseDirName = self.testcaseEnv.testcaseResultsDict["_test_case_name"] + + if self.systemTestEnv.printTestDescriptionsOnly: + self.testcaseEnv.printTestCaseDescription(testcaseDirName) + continue + elif self.systemTestEnv.isTestCaseToSkip(self.__class__.__name__, testcaseDirName): + self.log_message("Skipping : " + testcaseDirName) + skipThisTestCase = True + continue + else: + self.testcaseEnv.printTestCaseDescription(testcaseDirName) + system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName) + + + # ============================================================================== # + # ============================================================================== # + # Product Specific Testing Code Starts Here: # + # ============================================================================== # + # ============================================================================== # + + # get optional testcase arguments + consumerMultiTopicsMode = "false" + try: + consumerMultiTopicsMode = self.testcaseEnv.testcaseArgumentsDict["consumer_multi_topics_mode"] + except: + pass + + # initialize self.testcaseEnv with user-defined environment variables (product specific) + self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = "" + self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = False + self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False + self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"] = [] + + # initialize signal handler + signal.signal(signal.SIGINT, self.signal_handler) + + # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file: + # system_test/_testsuite/testcase_/testcase__properties.json + self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data( + self.testcaseEnv.testcasePropJsonPathName) + + # clean up data directories specified in zookeeper.properties and kafka_server_.properties + kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase + # for collecting logs from remote machines + kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv) + + # TestcaseEnv - initialize producer & consumer config / log file pathnames + kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv) + + # generate remote hosts log/config dirs if not exist + kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # generate properties files for zookeeper, kafka, producer, consumer: + # 1. copy system_test/_testsuite/config/*.properties to + # system_test/_testsuite/testcase_/config/ + # 2. update all properties files in system_test/_testsuite/testcase_/config + # by overriding the settings specified in: + # system_test/_testsuite/testcase_/testcase__properties.json + kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName, + self.testcaseEnv, self.systemTestEnv) + + # ============================================= + # preparing all entities to start the test + # ============================================= + self.log_message("starting zookeepers") + kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 2s") + time.sleep(2) + + self.log_message("starting brokers") + kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 5s") + time.sleep(5) + + # ============================================= + # A while-loop to bounce leader as specified + # by "num_iterations" in testcase_n_properties.json + # ============================================= + i = 1 + numIterations = int(self.testcaseEnv.testcaseArgumentsDict["num_iteration"]) + brokerType = self.testcaseEnv.testcaseArgumentsDict["broker_type"] + bounceBrokerFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_broker"] + + while i <= numIterations: + self.log_message("Iteration " + str(i) + " of " + str(numIterations)) + + # reset flags for producer + self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = False + self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False + + # ============================================= + # starting console consumer + # ============================================= + self.log_message("starting consumer in the background") + kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv, False) + self.logger.info("sleep 5 sec", extra=self.d) + time.sleep(5) + + # ============================================= + # starting producer + # ============================================= + self.log_message("starting producer in the background") + kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv, False) + + # a list of all brokers entityId + brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + self.systemTestEnv.clusterEntityConfigDictList, "role", "broker", "entity_id") + + for brokerEntityId in brokerEntityIdList: + brokerId = system_test_utils.get_data_by_lookup_keyval( + self.testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "brokerid") + + self.log_message("calling ShutdownBroker for clean-shutdown") + cleanShutdownStatus = kafka_system_test_utils.call_ShutdownBroker_for_clean_shutdown( + self.systemTestEnv, self.testcaseEnv, brokerEntityId) + self.log_message("sleep 1 min") + time.sleep(60) + + self.log_message("bouncing broker in entity id: " + brokerEntityId) + kafka_system_test_utils.bounce_broker(self.systemTestEnv, self.testcaseEnv, brokerEntityId) + self.log_message("sleep 10 sec") + time.sleep(10) + + self.anonLogger.info("sleeping for 30s") + time.sleep(30) + + kafka_system_test_utils.start_preferred_replica_election(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 1 min") + time.sleep(60) + + underReplicatedPartitionsCount = kafka_system_test_utils.validate_under_replicated_partitions_count( + self.systemTestEnv, self.testcaseEnv, i) + + # ============================================= + # tell producer to stop + # ============================================= + self.testcaseEnv.lock.acquire() + self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True + time.sleep(1) + self.testcaseEnv.lock.release() + time.sleep(1) + + # ============================================= + # wait for producer thread's update of + # "backgroundProducerStopped" to be "True" + # ============================================= + while 1: + self.testcaseEnv.lock.acquire() + self.logger.info("status of backgroundProducerStopped : [" + \ + str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d) + if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]: + time.sleep(1) + self.logger.info("all producer threads completed", extra=self.d) + self.testcaseEnv.lock.release() + time.sleep(1) + break + time.sleep(1) + self.testcaseEnv.lock.release() + time.sleep(2) + + self.logger.info("sleep 1 min for consumer to timeout", extra=self.d) + time.sleep(60) + + # ============================================= + # collect logs from remote hosts + # ============================================= + kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # ============================================= + # validate the data matched and checksum + # ============================================= + self.log_message("calling validate_data_matched_in_multi_topics_from_single_consumer_producer") + kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer( + self.systemTestEnv, self.testcaseEnv, ReplicationUtils(self), i) + + # validate that there is no "Failed to send messages after ... tries" + kafka_system_test_utils.validate_producer_send_success(self.systemTestEnv, self.testcaseEnv, i) + + # remove the producer & consumer logs for next iteration of tests + kafka_system_test_utils.rename_remote_producer_log(self.systemTestEnv, self.testcaseEnv, i) + kafka_system_test_utils.rename_remote_consumer_log(self.systemTestEnv, self.testcaseEnv, i) + time.sleep(2) + + i += 1 + # while loop + + # ============================================= + # this testcase is completed - stop all entities + # ============================================= + self.log_message("stopping all entities") + for entityId, parentPid in self.testcaseEnv.entityBrokerParentPidDict.items(): + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + + for entityId, parentPid in self.testcaseEnv.entityZkParentPidDict.items(): + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + + # make sure all entities are stopped + kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv) + + self.log_message("calling validate_broker_log_segment_checksum") + kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) + + # ============================================= + # draw graphs + # ============================================= + metrics.draw_all_graphs(self.systemTestEnv.METRICS_PATHNAME, + self.testcaseEnv, + self.systemTestEnv.clusterEntityConfigDictList) + + # build dashboard, one for each role + metrics.build_all_dashboards(self.systemTestEnv.METRICS_PATHNAME, + self.testcaseEnv.testCaseDashboardsDir, + self.systemTestEnv.clusterEntityConfigDictList) + except Exception as e: + self.log_message("Exception while running test {0}".format(e)) + traceback.print_exc() + + finally: + if not skipThisTestCase and not self.systemTestEnv.printTestDescriptionsOnly: + self.log_message("stopping all entities - please wait ...") + kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv) + diff --git a/system_test/clean_shutdown_testsuite/config/server.properties b/system_test/clean_shutdown_testsuite/config/server.properties new file mode 100644 index 0000000..dacf158 --- /dev/null +++ b/system_test/clean_shutdown_testsuite/config/server.properties @@ -0,0 +1,139 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned +# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost +# may not be what you want. +#host.name= + + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9091 + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=2 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# The directory under which to store log files +log.dir=/tmp/kafka_server_logs + +# The number of logical partitions per topic per server. More partitions allow greater parallelism +# for consumption, but also mean more files. +num.partitions=5 + +# Overrides for for the default given by num.partitions on a per-topic basis +#topic.partition.count.map=topic1:3, topic2:4 + +############################# Log Flush Policy ############################# + +# The following configurations control the flush of data to disk. This is the most +# important performance knob in kafka. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. +# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). +# 3. Throughput: The flush is generally the most expensive operation. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +log.flush.interval.ms=1000 + +# Per-topic overrides for log.flush.interval.ms +#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000 + +# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. +log.flush.scheduler.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 +log.retention.bytes=-1 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +#log.segment.size=536870912 +log.segment.bytes=102400 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.cleanup.interval.mins=1 + +############################# Zookeeper ############################# + +# Enable connecting to zookeeper +enable.zookeeper=true + +# Zk connection string (see zk docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zk.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zk.connection.timeout.ms=1000000 + +monitoring.period.secs=1 +message.max.bytes=1000000 +queued.max.requests=500 +log.roll.hours=168 +log.index.size.max.bytes=10485760 +log.index.interval.bytes=4096 +auto.create.topics.enable=true +controller.socket.timeout.ms=30000 +controller.message.queue.size=10 +default.replication.factor=1 +replica.lag.time.max.ms=10000 +replica.lag.max.messages=4000 +replica.socket.timeout.ms=30000 +replica.socket.receive.buffer.bytes=65536 +replica.fetch.max.bytes=1048576 +replica.fetch.wait.max.ms=500 +replica.fetch.min.bytes=4096 +num.replica.fetchers=1 diff --git a/system_test/clean_shutdown_testsuite/config/zookeeper.properties b/system_test/clean_shutdown_testsuite/config/zookeeper.properties new file mode 100644 index 0000000..74cbf90 --- /dev/null +++ b/system_test/clean_shutdown_testsuite/config/zookeeper.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=/tmp/zookeeper +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/system_test/clean_shutdown_testsuite/testcase_9061/cluster_config.json b/system_test/clean_shutdown_testsuite/testcase_9061/cluster_config.json new file mode 100644 index 0000000..2f95dbe --- /dev/null +++ b/system_test/clean_shutdown_testsuite/testcase_9061/cluster_config.json @@ -0,0 +1,67 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9990" + }, + { + "entity_id": "1", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9991" + }, + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9992" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9993" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9994" + }, + { + "entity_id": "5", + "hostname": "localhost", + "role": "producer_performance", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9995" + }, + { + "entity_id": "6", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9996" + } + ] +} diff --git a/system_test/clean_shutdown_testsuite/testcase_9061/testcase_9061_properties.json b/system_test/clean_shutdown_testsuite/testcase_9061/testcase_9061_properties.json new file mode 100644 index 0000000..b4df431 --- /dev/null +++ b/system_test/clean_shutdown_testsuite/testcase_9061/testcase_9061_properties.json @@ -0,0 +1,104 @@ +{ + "description": {"01":"To Test : 'Clean Shutdown'", + "02":"Produce and consume messages to 20 topics - 2 partitions.", + "03":"This test sends messages to 2 replicas", + "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)", + "05":"Restart the terminated broker", + "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully", + "07":"At the end it verifies the log size and contents", + "08":"Use a consumer to verify no message loss.", + "09":"Producer dimensions : mode:async, acks:-1, comp:1", + "10":"Log segment size : 1048576" + }, + "testcase_args": { + "broker_type": "controller", + "bounce_broker": "true", + "replica_factor": "2", + "num_partition": "2", + "num_iteration": "2", + "producer_multi_topics_mode": "true", + "consumer_multi_topics_mode": "true", + "sleep_seconds_between_producer_calls": "10", + "num_topics_for_auto_generated_string": "5", + "broker_down_time_in_sec": "5", + "clean_shutdown_num_retries": "10", + "clean_shutdown_retry_interval_ms": "5000", + "num_messages_to_produce_per_producer_call": "500" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "broker.id": "1", + "num.partitions":"2", + "default.replication.factor": "2", + "log.segment.bytes": "1048576", + "log.dir": "/tmp/kafka_server_1_logs", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "broker.id": "2", + "num.partitions":"2", + "default.replication.factor": "2", + "log.segment.bytes": "1048576", + "log.dir": "/tmp/kafka_server_2_logs", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "broker.id": "3", + "num.partitions":"2", + "default.replication.factor": "2", + "log.segment.bytes": "1048576", + "log.dir": "/tmp/kafka_server_3_logs", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "port": "9094", + "broker.id": "4", + "num.partitions":"2", + "default.replication.factor": "2", + "log.segment.bytes": "1048576", + "log.dir": "/tmp/kafka_server_4_logs", + "log_filename": "kafka_server_9094.log", + "config_filename": "kafka_server_9094.properties" + }, + { + "entity_id": "5", + "topic": "t001", + "threads": "5", + "compression-codec": "1", + "message-size": "500", + "message": "500", + "request-num-acks": "-1", + "producer-retry-backoff-ms": "3500", + "producer-num-retries": "3", + "sync":"false", + "log_filename": "producer_performance_5.log", + "config_filename": "producer_performance_5.properties" + }, + { + "entity_id": "6", + "topic": "t001", + "groupid": "mytestgroup", + "consumer-timeout-ms": "10000", + "zookeeper": "localhost:2188", + "log_filename": "console_consumer_6.log", + "config_filename": "console_consumer_6.properties" + } + ] +} diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index dd082f5..d69b4ee 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -756,7 +756,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): testcaseEnv.entityMirrorMakerParentPidDict[entityId] = tokens[1] -def start_console_consumer(systemTestEnv, testcaseEnv): +def start_console_consumer(systemTestEnv, testcaseEnv, fromBeginning=True): clusterList = systemTestEnv.clusterEntityConfigDictList @@ -828,6 +828,10 @@ def start_console_consumer(systemTestEnv, testcaseEnv): logger.error("Invalid cluster name : " + clusterName, extra=d) sys.exit(1) + fromBeginningOption = "" + if fromBeginning: + fromBeginningOption = "--from-beginning " + cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, @@ -838,7 +842,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv): "--csv-reporter-enabled", #"--metrics-dir " + metricsDir, formatterOption, - "--from-beginning ", + fromBeginningOption, " >> " + consumerLogPathName, " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] @@ -2053,7 +2057,7 @@ def validate_simple_consumer_data_matched_across_replicas(systemTestEnv, testcas validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "FAILED" -def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv, testcaseEnv, replicationUtils): +def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv, testcaseEnv, replicationUtils, round=1): logger.debug("#### Inside validate_data_matched_in_multi_topics_from_single_consumer_producer", extra=d) validationStatusDict = testcaseEnv.validationStatusDict @@ -2106,26 +2110,26 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe logger.info("no. of unique messages on topic [" + topic + "] sent from publisher : " + str(len(producerMsgIdSet)), extra=d) logger.info("no. of unique messages on topic [" + topic + "] received by consumer : " + str(len(consumerMsgIdSet)), extra=d) - validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet)) - validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet)) + validationStatusDict["Unique messages from producer on [" + topic + "] round [" + str(round) + "]"] = str(len(producerMsgIdSet)) + validationStatusDict["Unique messages from consumer on [" + topic + "] round [" + str(round) + "]"] = str(len(consumerMsgIdSet)) missingPercentage = len(missingUniqConsumerMsgId) * 100.00 / len(producerMsgIdSet) logger.info("Data loss threshold % : " + str(replicationUtils.ackOneDataLossThresholdPercent), extra=d) logger.warn("Data loss % on topic : " + topic + " : " + str(missingPercentage), extra=d) if ( len(missingUniqConsumerMsgId) == 0 and len(producerMsgIdSet) > 0 ): - validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" + validationStatusDict["Validate for data matched on topic [" + topic + "] on round [" + str(round) + "]"] = "PASSED" elif (acks == "1"): if missingPercentage <= replicationUtils.ackOneDataLossThresholdPercent: - validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" + validationStatusDict["Validate for data matched on topic [" + topic + "] on round [" + str(round) + "]"] = "PASSED" logger.warn("Test case (Acks = 1) passes with less than " + str(replicationUtils.ackOneDataLossThresholdPercent) \ + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d) else: - validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" + validationStatusDict["Validate for data matched on topic [" + topic + "] on round [" + str(round) + "]"] = "FAILED" logger.error("Test case (Acks = 1) failed with more than " + str(replicationUtils.ackOneDataLossThresholdPercent) \ + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d) else: - validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" + validationStatusDict["Validate for data matched on topic [" + topic + "] on round [" + str(round) + "]"] = "FAILED" logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d) @@ -2204,3 +2208,302 @@ def validate_index_log(systemTestEnv, testcaseEnv, clusterName="source"): else: validationStatusDict["Validate index log in cluster [" + clusterName + "]"] = "FAILED" + +def get_ISR(systemTestEnv, testcaseEnv): + logger.info("Querying Zookeeper for ISR info ...", extra=d) + clusterConfigsList = systemTestEnv.clusterEntityConfigDictList + tcConfigsList = testcaseEnv.testcaseConfigsList + + zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") + firstZkDict = zkDictList[0] + firstZkHost = firstZkDict["hostname"] + firstZkEntityId = firstZkDict["entity_id"] + clusterName = firstZkDict["cluster_name"] + firstZkClientPort = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", firstZkEntityId, "clientPort") + firstZkKafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", firstZkEntityId, "kafka_home") + firstZkJavaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", firstZkEntityId, "java_home") + firstZkKafkaRunClassBin = firstZkKafkaHome + "/bin/kafka-run-class.sh" + numPartition = testcaseEnv.testcaseArgumentsDict["num_partition"] + + zkConnectStr = "" + if clusterName == "source": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + elif clusterName == "target": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + else: + logger.error("Invalid cluster name : " + clusterName, extra=d) + sys.exit(1) + + topicPartitionISRDict = {} + + producerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "producer_performance") + for producerConfig in producerConfigList: + entityId = producerConfig["entity_id"] + topicsStr = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", entityId, "topic") + topics = topicsStr.split(",") + + partitionIndex = 0 + for topic in topics: + while partitionIndex < int(numPartition): + cmdStrList = ["ssh " + firstZkHost, + "\"JAVA_HOME=" + firstZkJavaHome, + firstZkKafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain", + "-server " + zkConnectStr, + "'get /brokers/topics/" + topic + "/partitions/" + str(partitionIndex) + "/leaderAndISR' 2> /dev/null | tail -1\""] + + cmdStr = " ".join(cmdStrList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + + for line in subproc.stdout.readlines(): + if "ISR" in line: + matchObj = re.match(".*\"ISR\":\"(.*?)\",.*", line) + isrList = matchObj.group(1) + topicPartitionISRDict[topic + "-" + str(partitionIndex)] = isrList + break + + partitionIndex += 1 + + return topicPartitionISRDict + + +def bounce_broker(systemTestEnv, testcaseEnv, entityId): + # bouncing this broker + logger.info("stopping broker : " + entityId, extra=d) + stop_remote_entity(systemTestEnv, entityId, testcaseEnv.entityBrokerParentPidDict[entityId]) + + # wait here for the specified amount of time + brokerDownTimeInSec = 5 + try: + brokerDownTimeInSec = int(testcaseEnv.testcaseArgumentsDict["broker_down_time_in_sec"]) + except: + pass # take default + logger.info("sleeping for : " + str(brokerDownTimeInSec), extra=d) + time.sleep(brokerDownTimeInSec) + + # starting previously terminated broker + logger.info("starting the previously terminated broker", extra=d) + start_entity_in_background(systemTestEnv, testcaseEnv, entityId) + +def call_ShutdownBroker_for_clean_shutdown(systemTestEnv, testcaseEnv, shutdownEntityId): + logger.info("Sending ShutdownBroker Command for broker [" + shutdownEntityId + "]", extra=d) + + clusterConfigsList = systemTestEnv.clusterEntityConfigDictList + tcConfigsList = testcaseEnv.testcaseConfigsList + validationStatusDict = testcaseEnv.validationStatusDict + + # get the first Zookeeper attributes + zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") + firstZkDict = zkDictList[0] + hostname = firstZkDict["hostname"] + zkEntityId = firstZkDict["entity_id"] + kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home") + javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home") + clusterName = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "cluster_name") + kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" + + # get zk.connect + zkConnectStr = "" + if clusterName == "source": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + elif clusterName == "target": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + else: + logger.error("Invalid cluster name : " + clusterName, extra=d) + sys.exit(1) + + # get broker attributes + brokerId = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", shutdownEntityId, "broker.id") + + # get optional ShutdownBroker command line args from testcase_xxxx_properties.json + cleanShutdownNumRetries = 10 + try: + cleanShutdownNumRetries = int(testcaseEnv.testcaseArgumentsDict["clean_shutdown_num_retries"]) + except: + pass # take default + cleanShutdownRetryIntervalMs = 5000 + try: + cleanShutdownRetryIntervalMs = int(testcaseEnv.testcaseArgumentsDict["clean_shutdown_retry_interval_ms"]) + except: + pass # take default + + # set up the shell command to call ShutdownBroker + cmdStrList = ["ssh " + hostname, + "\"JAVA_HOME=" + javaHome, + kafkaRunClassBin + " kafka.admin.ShutdownBroker", + "--broker " + brokerId, + "--num.retries " + str(cleanShutdownNumRetries), + "--zookeeper " + zkConnectStr, + "--retry.interval.ms " + str(cleanShutdownRetryIntervalMs) + "\""] + + # execute the command + cmdStr = " ".join(cmdStrList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + rstripped_line = line.rstrip('\n') + if "INFO Shutdown status: complete" in rstripped_line: + logger.info("found clean shutdown status in log : " + rstripped_line, extra=d) + validationStatusDict["Validate for broker " + shutdownEntityId + " clean shutdown"] = "PASSED" + return + + validationStatusDict["Validate for broker " + shutdownEntityId + " clean shutdown"] = "FAILED" + +def start_preferred_replica_election(systemTestEnv, testcaseEnv): + + logger.info("starting kafka-preferred-replica-election.sh ...", extra=d) + + clusterConfigsList = systemTestEnv.clusterEntityConfigDictList + tcConfigsList = testcaseEnv.testcaseConfigsList + + zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") + firstZkDict = zkDictList[0] + hostname = firstZkDict["hostname"] + zkEntityId = firstZkDict["entity_id"] + clientPort = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", zkEntityId, "clientPort") + kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home") + javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home") + clusterName = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "cluster_name") + kafkaToolBin = kafkaHome + "/bin/kafka-preferred-replica-election.sh" + + # get zk.connect + zkConnectStr = "" + if clusterName == "source": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + elif clusterName == "target": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + else: + logger.error("Invalid cluster name : " + clusterName, extra=d) + sys.exit(1) + + cmdStrList = ["ssh " + hostname, + "\"JAVA_HOME=" + javaHome, + kafkaToolBin + " --zookeeper " + zkConnectStr + "\""] + cmdStr = " ".join(cmdStrList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + line = line.rstrip('\n') + logger.debug("#### [" + line + "]", extra=d) + +def validate_under_replicated_partitions_count(systemTestEnv, testcaseEnv, round): + + logger.info("getting under_replicated_partitions_count ...", extra=d) + + underReplicatedPartitionsCount = 0 + + clusterConfigsList = systemTestEnv.clusterEntityConfigDictList + tcConfigsList = testcaseEnv.testcaseConfigsList + validationStatusDict = testcaseEnv.validationStatusDict + + zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") + firstZkDict = zkDictList[0] + hostname = firstZkDict["hostname"] + zkEntityId = firstZkDict["entity_id"] + clientPort = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", zkEntityId, "clientPort") + kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home") + javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home") + clusterName = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "cluster_name") + kafkaToolBin = kafkaHome + "/bin/kafka-list-topic.sh" + + # get zk.connect + zkConnectStr = "" + if clusterName == "source": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + elif clusterName == "target": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + else: + logger.error("Invalid cluster name : " + clusterName, extra=d) + sys.exit(1) + + cmdStrList = ["ssh " + hostname, + "\"JAVA_HOME=" + javaHome, + kafkaToolBin + " --zookeeper " + zkConnectStr + "\""] + cmdStr = " ".join(cmdStrList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + line = line.rstrip('\n') + logger.debug("#### [" + line + "]", extra=d) + if "isUnderReplicated: true" in line: + underReplicatedPartitionsCount += 1 + + logger.debug("under replicated partitions count [" + str(underReplicatedPartitionsCount) + "]", extra=d) + if underReplicatedPartitionsCount > 0: + validationStatusDict["Validate for under replicated partitions count in round [" + str(round) + "]"] = "FAILED" + else: + validationStatusDict["Validate for under replicated partitions count in round [" + str(round) + "]"] = "PASSED" + +def validate_producer_send_success(systemTestEnv, testcaseEnv, round=1): + anonLogger.info("==================================================") + anonLogger.info("validating producer send all messages successfully") + anonLogger.info("==================================================") + + failuresCount = "" + + validationStatusDict = testcaseEnv.validationStatusDict + clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + + prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance") + + for prodPerfCfg in prodPerfCfgList: + producerEntityId = prodPerfCfg["entity_id"] + producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default") + producerLogPathName = producerLogPath + "/producer_performance.log" + + cmdStr = "grep 'Failed to send messages after' " + producerLogPathName + " | wc -l" + + logger.debug("executing command [" + cmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + failuresCount = line.rstrip('\n') + logger.debug("#### [" + failuresCount + "]", extra=d) + + if int(failuresCount) > 0: + validationStatusDict["Validate for producer send all messages successful round [" + str(round) + "]"] = "FAILED" + else: + validationStatusDict["Validate for producer send all messages successful round [" + str(round) + "]"] = "PASSED" + +def rename_remote_producer_log(systemTestEnv, testcaseEnv, iteration): + clusterList = systemTestEnv.clusterEntityConfigDictList + producerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterList, "role", "producer_performance") + for producerConfig in producerConfigList: + host = producerConfig["hostname"] + entityId = producerConfig["entity_id"] + role = producerConfig["role"] + kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home") + + logger.info("removing remote producer_performance log in " + host, extra=d) + producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + + if host != "localhost": + producerrLogPath = replace_kafka_home(producerLogPath, kafkaHome) + + producerLogPathName = producerLogPath + "/producer_performance.log" + + cmdStr = "ssh " + host + " 'mv " + producerLogPathName + " " + producerLogPathName + "_" + str(iteration) + "'" + logger.debug("executing command: [" + cmdStr + "]", extra=d) + system_test_utils.async_sys_call(cmdStr) + +def rename_remote_consumer_log(systemTestEnv, testcaseEnv, iteration): + clusterList = systemTestEnv.clusterEntityConfigDictList + consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterList, "role", "console_consumer") + for consumerConfig in consumerConfigList: + host = consumerConfig["hostname"] + entityId = consumerConfig["entity_id"] + role = consumerConfig["role"] + kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home") + + logger.info("removing remote console_consumer log in " + host, extra=d) + consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + + if host != "localhost": + consumerLogPath = replace_kafka_home(consumerLogPath, kafkaHome) + + consumerLogPathName = consumerLogPath + "/console_consumer.log" + + cmdStr = "ssh " + host + " 'mv " + consumerLogPathName + " " + consumerLogPathName + "_" + str(iteration) + "'" + logger.debug("executing command: [" + cmdStr + "]", extra=d) + system_test_utils.async_sys_call(cmdStr) + +