Index: system_test/clean_shutdown_testsuite/clean_shutdown_test.py =================================================================== --- system_test/clean_shutdown_testsuite/clean_shutdown_test.py (revision 0) +++ system_test/clean_shutdown_testsuite/clean_shutdown_test.py (revision 0) @@ -0,0 +1,351 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#!/usr/bin/env python + +# =================================== +# clean_shutdown_test.py +# =================================== + +import inspect +import logging +import os +import pprint +import signal +import subprocess +import sys +import time +import traceback + +from system_test_env import SystemTestEnv +sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR) + +from setup_utils import SetupUtils +from replication_utils import ReplicationUtils +import system_test_utils +from testcase_env import TestcaseEnv + +# product specific: Kafka +import kafka_system_test_utils +import metrics + +class CleanShutdownTest(ReplicationUtils, SetupUtils): + + testModuleAbsPathName = os.path.realpath(__file__) + testSuiteAbsPathName = os.path.abspath(os.path.dirname(testModuleAbsPathName)) + + def __init__(self, systemTestEnv): + + # SystemTestEnv - provides cluster level environment settings + # such as entity_id, hostname, kafka_home, java_home which + # are available in a list of dictionary named + # "clusterEntityConfigDictList" + self.systemTestEnv = systemTestEnv + + super(CleanShutdownTest, self).__init__(self) + + # dict to pass user-defined attributes to logger argument: "extra" + d = {'name_of_class': self.__class__.__name__} + + def signal_handler(self, signal, frame): + self.log_message("Interrupt detected - User pressed Ctrl+c") + + # perform the necessary cleanup here when user presses Ctrl+c and it may be product specific + self.log_message("stopping all entities - please wait ...") + kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv) + sys.exit(1) + + def runTest(self): + + # ====================================================================== + # get all testcase directories under this testsuite + # ====================================================================== + testCasePathNameList = system_test_utils.get_dir_paths_with_prefix( + self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX) + testCasePathNameList.sort() + + # ============================================================= + # launch each testcase one by one: testcase_1, testcase_2, ... + # ============================================================= + for testCasePathName in testCasePathNameList: + + skipThisTestCase = False + + try: + # ====================================================================== + # A new instance of TestcaseEnv to keep track of this testcase's env vars + # and initialize some env vars as testCasePathName is available now + # ====================================================================== + self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self) + self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName + self.testcaseEnv.initWithKnownTestCasePathName(testCasePathName) + self.testcaseEnv.testcaseArgumentsDict = self.testcaseEnv.testcaseNonEntityDataDict["testcase_args"] + + # ====================================================================== + # SKIP if this case is IN testcase_to_skip.json or NOT IN testcase_to_run.json + # ====================================================================== + testcaseDirName = self.testcaseEnv.testcaseResultsDict["_test_case_name"] + + if self.systemTestEnv.printTestDescriptionsOnly: + self.testcaseEnv.printTestCaseDescription(testcaseDirName) + continue + elif self.systemTestEnv.isTestCaseToSkip(self.__class__.__name__, testcaseDirName): + self.log_message("Skipping : " + testcaseDirName) + skipThisTestCase = True + continue + else: + self.testcaseEnv.printTestCaseDescription(testcaseDirName) + system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName) + + + # ============================================================================== # + # ============================================================================== # + # Product Specific Testing Code Starts Here: # + # ============================================================================== # + # ============================================================================== # + + # get optional testcase arguments + logRetentionTest = "false" + try: + logRetentionTest = self.testcaseEnv.testcaseArgumentsDict["log_retention_test"] + except: + pass + consumerMultiTopicsMode = "false" + try: + consumerMultiTopicsMode = self.testcaseEnv.testcaseArgumentsDict["consumer_multi_topics_mode"] + except: + pass + autoCreateTopic = "false" + try: + autoCreateTopic = self.testcaseEnv.testcaseArgumentsDict["auto_create_topic"] + except: + pass + + + # initialize self.testcaseEnv with user-defined environment variables (product specific) + self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = "" + self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = False + self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False + self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"] = [] + + # initialize signal handler + signal.signal(signal.SIGINT, self.signal_handler) + + # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file: + # system_test/_testsuite/testcase_/testcase__properties.json + self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data( + self.testcaseEnv.testcasePropJsonPathName) + + # clean up data directories specified in zookeeper.properties and kafka_server_.properties + kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase + # for collecting logs from remote machines + kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv) + + # TestcaseEnv - initialize producer & consumer config / log file pathnames + kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv) + + # generate remote hosts log/config dirs if not exist + kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # generate properties files for zookeeper, kafka, producer, consumer: + # 1. copy system_test/_testsuite/config/*.properties to + # system_test/_testsuite/testcase_/config/ + # 2. update all properties files in system_test/_testsuite/testcase_/config + # by overriding the settings specified in: + # system_test/_testsuite/testcase_/testcase__properties.json + kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName, + self.testcaseEnv, self.systemTestEnv) + + # ============================================= + # preparing all entities to start the test + # ============================================= + self.log_message("starting zookeepers") + kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 2s") + time.sleep(2) + + self.log_message("starting brokers") + kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 5s") + time.sleep(5) + + if autoCreateTopic.lower() == "false": + self.log_message("creating topics") + kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 5s") + time.sleep(5) + + # ============================================= + # start ConsoleConsumer if this is a Log Retention test + # ============================================= + if logRetentionTest.lower() == "true": + self.log_message("starting consumer in the background") + kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv) + time.sleep(1) + + # ============================================= + # starting producer + # ============================================= + self.log_message("starting producer in the background") + kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv, False) + msgProducingFreeTimeSec = self.testcaseEnv.testcaseArgumentsDict["message_producing_free_time_sec"] + self.anonLogger.info("sleeping for " + msgProducingFreeTimeSec + " sec to produce some messages") + time.sleep(int(msgProducingFreeTimeSec)) + + # ============================================= + # A while-loop to bounce leader as specified + # by "num_iterations" in testcase_n_properties.json + # ============================================= + i = 1 + numIterations = int(self.testcaseEnv.testcaseArgumentsDict["num_iteration"]) + brokerType = self.testcaseEnv.testcaseArgumentsDict["broker_type"] + bounceBrokerFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_broker"] + + while i <= numIterations: + self.log_message("Iteration " + str(i) + " of " + str(numIterations)) + + # find controller + self.log_message("looking up controller") + controllerDict = kafka_system_test_utils.get_controller_attributes(self.systemTestEnv, self.testcaseEnv) + controllerEntityId = controllerDict['entity_id'] + self.logger.info("found controller in broker id : " + controllerDict['brokerid'], extra=self.d) + + # a list of all brokers entityId + brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + self.systemTestEnv.clusterEntityConfigDictList, "role", "broker", "entity_id") + + for brokerEntityId in brokerEntityIdList: + # check to see if it's a controller -> leave controller to be the last broker to bounce + if brokerEntityId != controllerEntityId: + brokerId = system_test_utils.get_data_by_lookup_keyval( + self.testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "brokerid") + + self.log_message("calling ShutdownBroker for clean-shutdown") + cleanShutdownStatus = kafka_system_test_utils.call_ShutdownBroker_for_clean_shutdown( + self.systemTestEnv, self.testcaseEnv, brokerEntityId, controllerEntityId) + time.sleep(2) + + self.log_message("bouncing broker in entity id: " + brokerEntityId) + kafka_system_test_utils.bounce_broker(self.systemTestEnv, self.testcaseEnv, brokerEntityId) + time.sleep(2) + + # clean shutdown for controller + self.log_message("calling ShutdownBroker for clean-shutdown") + kafka_system_test_utils.call_ShutdownBroker_for_clean_shutdown( + self.systemTestEnv, self.testcaseEnv, controllerEntityId, controllerEntityId) + self.anonLogger.info("sleeping for 5s for replicas to catch up") + time.sleep(5) + # now bounce controller + self.log_message("bouncing controller in entity id : " + controllerEntityId) + kafka_system_test_utils.bounce_broker(self.systemTestEnv, self.testcaseEnv, controllerEntityId) + + self.anonLogger.info("sleeping for 15s") + time.sleep(15) + i += 1 + # while loop + + # ============================================= + # tell producer to stop + # ============================================= + self.testcaseEnv.lock.acquire() + self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True + time.sleep(1) + self.testcaseEnv.lock.release() + time.sleep(1) + + # ============================================= + # wait for producer thread's update of + # "backgroundProducerStopped" to be "True" + # ============================================= + while 1: + self.testcaseEnv.lock.acquire() + self.logger.info("status of backgroundProducerStopped : [" + \ + str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d) + if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]: + time.sleep(1) + self.logger.info("all producer threads completed", extra=self.d) + break + time.sleep(1) + self.testcaseEnv.lock.release() + time.sleep(2) + + # ============================================= + # starting debug consumer + # ============================================= + #self.log_message("starting debug consumers in the background") + #kafka_system_test_utils.start_simple_consumer(self.systemTestEnv, self.testcaseEnv, None) + #self.anonLogger.info("sleeping for 10s") + #time.sleep(10) + + # ============================================= + # starting console consumer + # ============================================= + self.log_message("starting consumer in the background") + kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv) + time.sleep(10) + + # ============================================= + # this testcase is completed - stop all entities + # ============================================= + self.log_message("stopping all entities") + for entityId, parentPid in self.testcaseEnv.entityBrokerParentPidDict.items(): + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + + for entityId, parentPid in self.testcaseEnv.entityZkParentPidDict.items(): + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + + # make sure all entities are stopped + kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv) + + # ============================================= + # collect logs from remote hosts + # ============================================= + kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # ============================================= + # validate the data matched and checksum + # ============================================= + self.log_message("calling validate_data_matched_in_multi_topics_from_single_consumer_producer") + kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv) + + #self.log_message("calling validate_simple_consumer_data_matched_across_replicas") + #kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv) + + self.log_message("calling validate_broker_log_segment_checksum") + kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) + + # ============================================= + # draw graphs + # ============================================= + metrics.draw_all_graphs(self.systemTestEnv.METRICS_PATHNAME, + self.testcaseEnv, + self.systemTestEnv.clusterEntityConfigDictList) + + # build dashboard, one for each role + metrics.build_all_dashboards(self.systemTestEnv.METRICS_PATHNAME, + self.testcaseEnv.testCaseDashboardsDir, + self.systemTestEnv.clusterEntityConfigDictList) + except Exception as e: + self.log_message("Exception while running test {0}".format(e)) + traceback.print_exc() + + finally: + if not skipThisTestCase and not self.systemTestEnv.printTestDescriptionsOnly: + self.log_message("stopping all entities - please wait ...") + kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv) + Index: system_test/clean_shutdown_testsuite/config/server.properties =================================================================== --- system_test/clean_shutdown_testsuite/config/server.properties (revision 0) +++ system_test/clean_shutdown_testsuite/config/server.properties (revision 0) @@ -0,0 +1,139 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +brokerid=0 + +# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned +# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost +# may not be what you want. +#hostname= + + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9091 + +# The number of threads handling network requests +network.threads=2 + +# The number of threads doing disk I/O +io.threads=2 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +max.socket.request.bytes=104857600 + + +############################# Log Basics ############################# + +# The directory under which to store log files +log.dir=/tmp/kafka_server_logs + +# The number of logical partitions per topic per server. More partitions allow greater parallelism +# for consumption, but also mean more files. +num.partitions=5 + +# Overrides for for the default given by num.partitions on a per-topic basis +#topic.partition.count.map=topic1:3, topic2:4 + +############################# Log Flush Policy ############################# + +# The following configurations control the flush of data to disk. This is the most +# important performance knob in kafka. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. +# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). +# 3. Throughput: The flush is generally the most expensive operation. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +log.default.flush.interval.ms=1000 + +# Per-topic overrides for log.default.flush.interval.ms +#topic.flush.intervals.ms=topic1:1000, topic2:3000 + +# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. +log.default.flush.scheduler.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.size. +#log.retention.size=1073741824 +log.retention.size=-1 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +#log.file.size=536870912 +log.file.size=102400 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.cleanup.interval.mins=1 + +############################# Zookeeper ############################# + +# Enable connecting to zookeeper +enable.zookeeper=true + +# Zk connection string (see zk docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zk.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 + +monitoring.period.secs=1 +max.message.size=1000000 +max.queued.requests=500 +log.roll.hours=168 +log.index.max.size=10485760 +log.index.interval.bytes=4096 +auto.create.topics=true +controller.socket.timeout.ms=30000 +controller.message.queue.size=10 +default.replication.factor=1 +replica.max.lag.time.ms=10000 +replica.max.lag.bytes=4000 +replica.socket.timeout.ms=30000 +replica.socket.buffersize=65536 +replica.fetch.size=1048576 +replica.fetch.wait.time.ms=500 +replica.fetch.min.bytes=4096 +replica.fetchers=1 Index: system_test/clean_shutdown_testsuite/config/zookeeper.properties =================================================================== --- system_test/clean_shutdown_testsuite/config/zookeeper.properties (revision 0) +++ system_test/clean_shutdown_testsuite/config/zookeeper.properties (revision 0) @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=/tmp/zookeeper +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 Index: system_test/clean_shutdown_testsuite/__init__.py =================================================================== --- system_test/clean_shutdown_testsuite/__init__.py (revision 0) +++ system_test/clean_shutdown_testsuite/__init__.py (revision 0) @@ -0,0 +1 @@ + Index: system_test/clean_shutdown_testsuite/testcase_9061/cluster_config.json =================================================================== --- system_test/clean_shutdown_testsuite/testcase_9061/cluster_config.json (revision 0) +++ system_test/clean_shutdown_testsuite/testcase_9061/cluster_config.json (revision 0) @@ -0,0 +1,67 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9990" + }, + { + "entity_id": "1", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9991" + }, + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9992" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9993" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9994" + }, + { + "entity_id": "5", + "hostname": "localhost", + "role": "producer_performance", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9995" + }, + { + "entity_id": "6", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9996" + } + ] +} Index: system_test/clean_shutdown_testsuite/testcase_9061/testcase_9061_properties.json =================================================================== --- system_test/clean_shutdown_testsuite/testcase_9061/testcase_9061_properties.json (revision 0) +++ system_test/clean_shutdown_testsuite/testcase_9061/testcase_9061_properties.json (revision 0) @@ -0,0 +1,106 @@ +{ + "description": {"01":"To Test : 'Clean Shutdown'", + "02":"Produce and consume messages to 20 topics - 2 partitions.", + "03":"This test sends messages to 2 replicas", + "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)", + "05":"Restart the terminated broker", + "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully", + "07":"At the end it verifies the log size and contents", + "08":"Use a consumer to verify no message loss.", + "09":"Producer dimensions : mode:async, acks:-1, comp:1", + "10":"Log segment size : 1048576" + }, + "testcase_args": { + "broker_type": "controller", + "bounce_broker": "true", + "replica_factor": "2", + "num_partition": "2", + "num_iteration": "1", + "auto_create_topic": "true", + "producer_multi_topics_mode": "true", + "consumer_multi_topics_mode": "true", + "sleep_seconds_between_producer_calls": "10", + "message_producing_free_time_sec": "15", + "num_topics_for_auto_generated_string": "20", + "broker_down_time_in_sec": "5", + "clean_shutdown_num_retries": "10", + "clean_shutdown_retry_interval_ms": "5000", + "num_messages_to_produce_per_producer_call": "500" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "brokerid": "1", + "num.partitions":"2", + "default.replication.factor": "2", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_1_logs", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "brokerid": "2", + "num.partitions":"2", + "default.replication.factor": "2", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_2_logs", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "brokerid": "3", + "num.partitions":"2", + "default.replication.factor": "2", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_3_logs", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "port": "9094", + "brokerid": "4", + "num.partitions":"2", + "default.replication.factor": "2", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_4_logs", + "log_filename": "kafka_server_9094.log", + "config_filename": "kafka_server_9094.properties" + }, + { + "entity_id": "5", + "topic": "t001", + "threads": "5", + "compression-codec": "1", + "message-size": "500", + "message": "500", + "request-num-acks": "-1", + "producer-retry-backoff-ms": "3500", + "producer-num-retries": "3", + "sync":"false", + "log_filename": "producer_performance_5.log", + "config_filename": "producer_performance_5.properties" + }, + { + "entity_id": "6", + "topic": "t001", + "groupid": "mytestgroup", + "consumer-timeout-ms": "60000", + "zookeeper": "localhost:2188", + "log_filename": "console_consumer_6.log", + "config_filename": "console_consumer_6.properties" + } + ] +} Index: system_test/clean_shutdown_testsuite/testcase_9062/cluster_config.json =================================================================== --- system_test/clean_shutdown_testsuite/testcase_9062/cluster_config.json (revision 0) +++ system_test/clean_shutdown_testsuite/testcase_9062/cluster_config.json (revision 0) @@ -0,0 +1,67 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9990" + }, + { + "entity_id": "1", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9991" + }, + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9992" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9993" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9994" + }, + { + "entity_id": "5", + "hostname": "localhost", + "role": "producer_performance", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9995" + }, + { + "entity_id": "6", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9996" + } + ] +} Index: system_test/clean_shutdown_testsuite/testcase_9062/testcase_9062_properties.json =================================================================== --- system_test/clean_shutdown_testsuite/testcase_9062/testcase_9062_properties.json (revision 0) +++ system_test/clean_shutdown_testsuite/testcase_9062/testcase_9062_properties.json (revision 0) @@ -0,0 +1,106 @@ +{ + "description": {"01":"To Test : 'Clean Shutdown'", + "02":"Produce and consume messages to 20 topics - 2 partitions.", + "03":"This test sends messages to 2 replicas", + "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)", + "05":"Restart the terminated broker", + "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully", + "07":"At the end it verifies the log size and contents", + "08":"Use a consumer to verify no message loss.", + "09":"Producer dimensions : mode:async, acks:-1, comp:1", + "10":"Log segment size : 1048576" + }, + "testcase_args": { + "broker_type": "controller", + "bounce_broker": "true", + "replica_factor": "4", + "num_partition": "2", + "num_iteration": "1", + "auto_create_topic": "true", + "producer_multi_topics_mode": "true", + "consumer_multi_topics_mode": "true", + "sleep_seconds_between_producer_calls": "10", + "message_producing_free_time_sec": "15", + "num_topics_for_auto_generated_string": "30", + "broker_down_time_in_sec": "5", + "clean_shutdown_num_retries": "10", + "clean_shutdown_retry_interval_ms": "5000", + "num_messages_to_produce_per_producer_call": "500" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "brokerid": "1", + "num.partitions":"2", + "default.replication.factor": "4", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_1_logs", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "brokerid": "2", + "num.partitions":"2", + "default.replication.factor": "4", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_2_logs", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "brokerid": "3", + "num.partitions":"2", + "default.replication.factor": "4", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_3_logs", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "port": "9094", + "brokerid": "4", + "num.partitions":"2", + "default.replication.factor": "4", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_4_logs", + "log_filename": "kafka_server_9094.log", + "config_filename": "kafka_server_9094.properties" + }, + { + "entity_id": "5", + "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020,t021,t022,t023,t024,t025,t026,t027,t028,t029,t030", + "threads": "5", + "compression-codec": "1", + "message-size": "500", + "message": "500", + "request-num-acks": "-1", + "producer-retry-backoff-ms": "3500", + "producer-num-retries": "3", + "sync":"false", + "log_filename": "producer_performance_5.log", + "config_filename": "producer_performance_5.properties" + }, + { + "entity_id": "6", + "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020,t021,t022,t023,t024,t025,t026,t027,t028,t029,t030", + "groupid": "mytestgroup", + "consumer-timeout-ms": "60000", + "zookeeper": "localhost:2188", + "log_filename": "console_consumer_6.log", + "config_filename": "console_consumer_6.properties" + } + ] +} Index: system_test/logging.conf =================================================================== --- system_test/logging.conf (revision 1420500) +++ system_test/logging.conf (working copy) @@ -35,7 +35,7 @@ # ============================================== [handler_namedConsoleHandler] class=StreamHandler -level=INFO +level=DEBUG formatter=namedFormatter args=[] Index: system_test/testcase_to_run.json =================================================================== --- system_test/testcase_to_run.json (revision 1420500) +++ system_test/testcase_to_run.json (working copy) @@ -1,5 +1,5 @@ { - "ReplicaBasicTest" : [ - "testcase_0001" + "CleanShutdownTest" : [ + "testcase_9062" ] } Index: system_test/utils/kafka_system_test_utils.py =================================================================== --- system_test/utils/kafka_system_test_utils.py (revision 1420500) +++ system_test/utils/kafka_system_test_utils.py (working copy) @@ -777,9 +777,28 @@ # testcase configurations: testcaseList = testcaseEnv.testcaseConfigsList - topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic") + + # get testcase arguments + # 1. topics + numTopicsForAutoGenString = -1 + try: + numTopicsForAutoGenString = int(testcaseEnv.testcaseArgumentsDict["num_topics_for_auto_generated_string"]) + except: + pass + + topic = "" + if numTopicsForAutoGenString < 0: + topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic") + else: + topic = generate_topics_string("topic", numTopicsForAutoGenString) + + # update this variable and will be used by data validation functions + testcaseEnv.consumerTopicsString = topic + + # 2. consumer timeout timeoutMs = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "consumer-timeout-ms") + # 3. consumer formatter formatterOption = "" try: formatterOption = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "formatter") @@ -789,6 +808,7 @@ if len(formatterOption) > 0: formatterOption = " --formatter " + formatterOption + " " + # get zk.connect zkConnectStr = "" if clusterName == "source": zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] @@ -857,6 +877,34 @@ time.sleep(1) testcaseEnv.lock.release() +def generate_topics_string(topicPrefix, numOfTopics): + # return a topics string in the following format: + # _0001,_0002,... + # eg. "topic_0001,topic_0002,...,topic_xxxx" + + topicsStr = "" + counter = 1 + idx = "1" + while counter <= numOfTopics: + if counter <= 9: + idx = "000" + str(counter) + elif counter <= 99: + idx = "00" + str(counter) + elif counter <= 999: + idx = "0" + str(counter) + elif counter <= 9999: + idx = str(counter) + else: + raise Exception("Error: no. of topics must be under 10000 - current topics count : " + counter) + + if len(topicsStr) == 0: + topicsStr = topicPrefix + "_" + idx + else: + topicsStr = topicsStr + "," + topicPrefix + "_" + idx + + counter += 1 + return topicsStr + def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafka07Client): host = producerConfig["hostname"] entityId = producerConfig["entity_id"] @@ -868,9 +916,24 @@ jmxPort = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "jmx_port") kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" + # get optional testcase arguments + numTopicsForAutoGenString = -1 + try: + numTopicsForAutoGenString = int(testcaseEnv.testcaseArgumentsDict["num_topics_for_auto_generated_string"]) + except: + pass + # testcase configurations: testcaseConfigsList = testcaseEnv.testcaseConfigsList - topic = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "topic") + topic = "" + if numTopicsForAutoGenString < 0: + topic = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "topic") + else: + topic = generate_topics_string("topic", numTopicsForAutoGenString) + + # update this variable and will be used by data validation functions + testcaseEnv.producerTopicsString = topic + threads = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "threads") compCodec = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "compression-codec") messageSize = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message-size") @@ -1125,7 +1188,8 @@ for prodPerfCfg in prodPerfCfgList: producerEntityId = prodPerfCfg["entity_id"] - topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") + #topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") + topic = testcaseEnv.producerTopicsString acks = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks") consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \ @@ -1133,7 +1197,8 @@ matchingConsumerEntityId = None for consumerEntityId in consumerEntityIdList: - consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") + #consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") + consumerTopic = testcaseEnv.consumerTopicsString if consumerTopic in topic: matchingConsumerEntityId = consumerEntityId break @@ -1682,7 +1747,8 @@ # testcase configurations: testcaseList = testcaseEnv.testcaseConfigsList - topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic") + topicsStr = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic") + topicsList = topicsStr.split(',') brokerListStr = "" if clusterName == "source": @@ -1713,39 +1779,38 @@ startingOffset = -2 brokerPortList = brokerListStr.split(',') for brokerPort in brokerPortList: + for topic in topicsList: + partitionId = 0 + while (partitionId < numPartitions): + logger.info("starting debug consumer for replica on [" + brokerPort + "] partition [" + str(partitionId) + "]", extra=d) - partitionId = 0 - while (partitionId < numPartitions): - logger.info("starting debug consumer for replica on [" + brokerPort + "] partition [" + str(partitionId) + "]", extra=d) + if minStartingOffsetDict is not None: + topicPartition = topic + "-" + str(partitionId) + startingOffset = minStartingOffsetDict[topicPartition] - if minStartingOffsetDict is not None: - topicPartition = topic + "-" + str(partitionId) - startingOffset = minStartingOffsetDict[topicPartition] - - outputFilePathName = consumerLogPath + "/simple_consumer_" + topic + "-" + str(partitionId) + "_r" + str(replicaIndex) + ".log" - brokerPortLabel = brokerPort.replace(":", "_") - cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, - kafkaRunClassBin + " kafka.tools.SimpleConsumerShell", - "--broker-list " + brokerListStr, - "--topic " + topic, - "--partition " + str(partitionId), - "--replica " + str(replicaIndex), - "--offset " + str(startingOffset), - "--no-wait-at-logend ", - " > " + outputFilePathName, - " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] + outputFilePathName = consumerLogPath + "/simple_consumer_" + topic + "-" + str(partitionId) + "_r" + str(replicaIndex) + ".log" + brokerPortLabel = brokerPort.replace(":", "_") + cmdList = ["ssh " + host, + "'JAVA_HOME=" + javaHome, + kafkaRunClassBin + " kafka.tools.SimpleConsumerShell", + "--broker-list " + brokerListStr, + "--topic " + topic, + "--partition " + str(partitionId), + "--replica " + str(replicaIndex), + "--offset " + str(startingOffset), + "--no-wait-at-logend ", + " > " + outputFilePathName, + " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] - cmdStr = " ".join(cmdList) + cmdStr = " ".join(cmdList) - logger.debug("executing command: [" + cmdStr + "]", extra=d) - subproc_1 = system_test_utils.sys_call_return_subproc(cmdStr) - # dummy for-loop to wait until the process is completed - for line in subproc_1.stdout.readlines(): - pass - time.sleep(1) - - partitionId += 1 + logger.debug("executing command: [" + cmdStr + "]", extra=d) + subproc_1 = system_test_utils.sys_call_return_subproc(cmdStr) + # dummy for-loop to wait until the process is completed + for line in subproc_1.stdout.readlines(): + pass + time.sleep(1) + partitionId += 1 replicaIndex += 1 def validate_simple_consumer_data_matched(systemTestEnv, testcaseEnv): @@ -1966,42 +2031,46 @@ mismatchCounter = 0 for consumerEntityId in consumerEntityIdList: - topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") + topicsStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") + topicsList = topicsStr.split(',') consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", consumerEntityId, "default") - replicaIdxMsgCountDictList = [] - # replicaIdxMsgCountDictList is being used as follows: - # - # the above replica message count will be organized as follows: - # index of the list would map to the partitionId - # each element in the list maps to the replicaIdx-MessageCount - # to validate that : - # 1. there should be "no. of broker" of non-zero message count and they are equal - # 2. there should be "no. of broker - replication factor" of zero count - # [{"1": "750", "2": "750", "3": "0" }, - # {"1": "0" , "2": "750", "3": "750"}] + for topic in topicsList: + replicaIdxMsgCountDictList = [] + # replicaIdxMsgCountDictList is being used as follows: + # + # the above replica message count will be organized as follows: + # index of the list would map to the partitionId + # each element in the list maps to the replicaIdx-MessageCount + # to validate that : + # 1. there should be "no. of broker" of non-zero message count and they are equal + # 2. there should be "no. of broker - replication factor" of zero count + # [{"1": "750", "2": "750", "3": "0" }, + # {"1": "0" , "2": "750", "3": "750"}] - j = 0 - while j < int(numPartition): - newDict = {} - replicaIdxMsgCountDictList.append(newDict) - j += 1 + j = 0 + while j < int(numPartition): + newDict = {} + replicaIdxMsgCountDictList.append(newDict) + j += 1 - for logFile in sorted(os.listdir(consumerLogPath)): + for logFile in sorted(os.listdir(consumerLogPath)): - if logFile.startswith("simple_consumer_") and logFile.endswith(".log"): - matchObj = re.match("simple_consumer_"+topic+"-(\d*)_r(\d*)\.log" , logFile) - partitionId = int(matchObj.group(1)) - replicaIdx = int(matchObj.group(2)) + if logFile.startswith("simple_consumer_"+topic) and logFile.endswith(".log"): + logger.debug("file : " + logFile, extra=d) + logger.debug("topic : " + topic, extra=d) + matchObj = re.match("simple_consumer_"+topic+"-(\d*)_r(\d*)\.log" , logFile) + partitionId = int(matchObj.group(1)) + replicaIdx = int(matchObj.group(2)) - consumerLogPathName = consumerLogPath + "/" + logFile - consumerMsgIdList = get_message_id(consumerLogPathName) - consumerMsgIdSet = set(consumerMsgIdList) + consumerLogPathName = consumerLogPath + "/" + logFile + consumerMsgIdList = get_message_id(consumerLogPathName) + consumerMsgIdSet = set(consumerMsgIdList) - replicaIdxMsgCountDictList[partitionId][replicaIdx] = len(consumerMsgIdSet) + replicaIdxMsgCountDictList[partitionId][replicaIdx] = len(consumerMsgIdSet) - logger.info("no. of unique messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d) - validationStatusDict["Unique messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdSet)) + logger.info("no. of unique messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d) + validationStatusDict["Unique messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdSet)) pprint.pprint(replicaIdxMsgCountDictList) @@ -2042,14 +2111,14 @@ for prodPerfCfg in prodPerfCfgList: producerEntityId = prodPerfCfg["entity_id"] - topicStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") + topicStr = testcaseEnv.producerTopicsString acks = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks") consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer", "entity_id") matchingConsumerEntityId = None for consumerEntityId in consumerEntityIdList: - consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") + consumerTopic = testcaseEnv.consumerTopicsString if consumerTopic in topicStr: matchingConsumerEntityId = consumerEntityId break @@ -2098,4 +2167,148 @@ logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d) +def get_ISR(systemTestEnv, testcaseEnv): + logger.info("Querying Zookeeper for ISR info ...", extra=d) + clusterConfigsList = systemTestEnv.clusterEntityConfigDictList + tcConfigsList = testcaseEnv.testcaseConfigsList + zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") + firstZkDict = zkDictList[0] + firstZkHost = firstZkDict["hostname"] + firstZkEntityId = firstZkDict["entity_id"] + clusterName = firstZkDict["cluster_name"] + firstZkClientPort = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", firstZkEntityId, "clientPort") + firstZkKafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", firstZkEntityId, "kafka_home") + firstZkJavaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", firstZkEntityId, "java_home") + firstZkKafkaRunClassBin = firstZkKafkaHome + "/bin/kafka-run-class.sh" + numPartition = testcaseEnv.testcaseArgumentsDict["num_partition"] + + zkConnectStr = "" + if clusterName == "source": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + elif clusterName == "target": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + else: + logger.error("Invalid cluster name : " + clusterName, extra=d) + sys.exit(1) + + topicPartitionISRDict = {} + + producerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "producer_performance") + for producerConfig in producerConfigList: + entityId = producerConfig["entity_id"] + topicsStr = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", entityId, "topic") + topics = topicsStr.split(",") + + partitionIndex = 0 + for topic in topics: + while partitionIndex < int(numPartition): + cmdStrList = ["ssh " + firstZkHost, + "\"JAVA_HOME=" + firstZkJavaHome, + firstZkKafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain", + "-server " + zkConnectStr, + "'get /brokers/topics/" + topic + "/partitions/" + str(partitionIndex) + "/leaderAndISR' 2> /dev/null | tail -1\""] + + cmdStr = " ".join(cmdStrList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + + for line in subproc.stdout.readlines(): + if "ISR" in line: + matchObj = re.match(".*\"ISR\":\"(.*?)\",.*", line) + isrList = matchObj.group(1) + topicPartitionISRDict[topic + "-" + str(partitionIndex)] = isrList + break + + partitionIndex += 1 + + return topicPartitionISRDict + + +def bounce_broker(systemTestEnv, testcaseEnv, entityId): + # bouncing this broker + logger.info("stopping broker : " + entityId, extra=d) + stop_remote_entity(systemTestEnv, entityId, testcaseEnv.entityBrokerParentPidDict[entityId]) + + # wait here for the specified amount of time + brokerDownTimeInSec = 5 + try: + brokerDownTimeInSec = int(testcaseEnv.testcaseArgumentsDict["broker_down_time_in_sec"]) + except: + pass # take default + logger.info("sleeping for : " + str(brokerDownTimeInSec), extra=d) + time.sleep(brokerDownTimeInSec) + + # starting previously terminated broker + logger.info("starting the previously terminated broker", extra=d) + start_entity_in_background(systemTestEnv, testcaseEnv, entityId) + + +def call_ShutdownBroker_for_clean_shutdown(systemTestEnv, testcaseEnv, shutdownEntityId, controllerEntityId): + logger.info("Sending ShutdownBroker Command for broker [" + shutdownEntityId + "]", extra=d) + + clusterConfigsList = systemTestEnv.clusterEntityConfigDictList + tcConfigsList = testcaseEnv.testcaseConfigsList + validationStatusDict = testcaseEnv.validationStatusDict + + # get the first Zookeeper attributes + zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") + firstZkDict = zkDictList[0] + hostname = firstZkDict["hostname"] + zkEntityId = firstZkDict["entity_id"] + kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home") + javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home") + clusterName = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "cluster_name") + kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" + + # get zk.connect + zkConnectStr = "" + if clusterName == "source": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + elif clusterName == "target": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + else: + logger.error("Invalid cluster name : " + clusterName, extra=d) + sys.exit(1) + + # get broker attributes + brokerId = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", shutdownEntityId, "brokerid") + ctlHost = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", controllerEntityId, "hostname") + ctlJmxPort = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", controllerEntityId, "jmx_port") + + # get optional ShutdownBroker command line args from testcase_xxxx_properties.json + cleanShutdownNumRetries = 10 + try: + cleanShutdownNumRetries = int(testcaseEnv.testcaseArgumentsDict["clean_shutdown_num_retries"]) + except: + pass # take default + cleanShutdownRetryIntervalMs = 5000 + try: + cleanShutdownRetryIntervalMs = int(testcaseEnv.testcaseArgumentsDict["clean_shutdown_retry_interval_ms"]) + except: + pass # take default + + # set up the shell command to call ShutdownBroker + cmdStrList = ["ssh " + hostname, + "\"JAVA_HOME=" + javaHome, + kafkaRunClassBin + " kafka.admin.ShutdownBroker", + "--broker " + brokerId, + "--num.retries " + str(cleanShutdownNumRetries), + "--zookeeper " + zkConnectStr, + "--retry.interval.ms " + str(cleanShutdownRetryIntervalMs), + "--jmx.url service:jmx:rmi:///jndi/rmi://" + ctlHost + ":" + ctlJmxPort + "/jmxrmi\""] + + # execute the command + cmdStr = " ".join(cmdStrList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + rstripped_line = line.rstrip('\n') + #logger.debug("#### => " + rstripped_line, extra=d) + if "INFO Shutdown status: complete" in rstripped_line: + logger.info("found clean shutdown status in log : " + rstripped_line, extra=d) + validationStatusDict["Validate for broker " + shutdownEntityId + " clean shutdown"] = "PASSED" + return + + validationStatusDict["Validate for broker " + shutdownEntityId + " clean shutdown"] = "FAILED" +