Index: system_test/utils/kafka_system_test_utils.py =================================================================== --- system_test/utils/kafka_system_test_utils.py (revision 1376321) +++ system_test/utils/kafka_system_test_utils.py (working copy) @@ -5,6 +5,7 @@ # =================================== import datetime +import getpass import inspect import json import logging @@ -198,7 +199,8 @@ jmxUrl = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi" kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "java_home") - metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", entityId, "metrics") + role = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "role") + metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics") startMetricsCmdList = ["ssh " + jmxHost, "'JAVA_HOME=" + javaHome, @@ -218,12 +220,13 @@ # keep track of the remote entity pid in a dictionary for line in subproc.stdout.readlines(): + #logger.debug("line: [" + line + "]", extra=d) if line.startswith("pid"): line = line.rstrip('\n') logger.debug("found pid line: [" + line + "]", extra=d) tokens = line.split(':') thisPid = tokens[1] - testcaseEnv.entityParentPidDict[thisPid] = thisPid + testcaseEnv.entityJmxParentPidDict[entityId] = thisPid def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv): @@ -273,7 +276,7 @@ cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg) elif ( clusterCfg["role"] == "producer_performance"): - tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr + #tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr copy_file_with_dict_values(cfgTemplatePathname + "/producer_performance.properties", \ cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg) @@ -371,6 +374,7 @@ leaderDict["partition"] = matchObj.group(4) except: logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d) + raise #else: # logger.debug("unmatched line found [" + line + "]", extra=d) @@ -402,12 +406,12 @@ if role == "zookeeper": cmdList = ["ssh " + hostname, "'JAVA_HOME=" + javaHome, + "JMX_PORT=" + jmxPort, kafkaHome + "/bin/zookeeper-server-start.sh ", configPathName + "/" + configFile + " &> ", logPathName + "/" + logFile + " & echo pid:$! > ", logPathName + "/entity_" + entityId + "_pid'"] - # construct zk.connect str and update it to testcaseEnv.userDefinedEnvVarDict.zkConnectStr if ( len(testcaseEnv.userDefinedEnvVarDict["zkConnectStr"]) > 0 ): testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = \ @@ -446,10 +450,9 @@ tokens = line.split(':') testcaseEnv.entityParentPidDict[entityId] = tokens[1] - # if it is a broker, start metric collection - if role == "broker": - start_metrics_collection(hostname, jmxPort, "kafka:type=kafka.SocketServerStats", \ - "AvgFetchRequestMs, AvgProduceRequestMs", entityId, clusterEntityConfigDictList, testcaseEnv) + # start metric collection + start_metrics_collection(hostname, jmxPort, "kafka:type=kafka.SocketServerStats", \ + "AvgFetchRequestMs, AvgProduceRequestMs", entityId, clusterEntityConfigDictList, testcaseEnv) def start_console_consumer(systemTestEnv, testcaseEnv): @@ -464,31 +467,62 @@ clusterEntityConfigDictList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval( \ clusterEntityConfigDictList, "entity_id", entityId, "java_home") + jmxPort = system_test_utils.get_data_by_lookup_keyval( \ + clusterEntityConfigDictList, "entity_id", entityId, "jmx_port") kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" logger.info("starting console consumer", extra=d) - consumerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + \ - "/console_consumer.log" + consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + consumerLogPathName = consumerLogPath + "/console_consumer.log" testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consumerLogPathName commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["consumerConfigPathName"]) cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, + "JMX_PORT=" + jmxPort, kafkaRunClassBin + " kafka.consumer.ConsoleConsumer", - commandArgs + " &> " + consumerLogPathName + "'"] + commandArgs + " &> " + consumerLogPathName, + " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] cmdStr = " ".join(cmdList) logger.debug("executing command: [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.async_sys_call(cmdStr) + pidCmdStr = "ssh " + host + " 'cat " + consumerLogPath + "/entity_" + entityId + "_pid'" + logger.debug("executing command: [" + pidCmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) + # keep track of the remote entity pid in a dictionary + for line in subproc.stdout.readlines(): + if line.startswith("pid"): + line = line.rstrip('\n') + logger.debug("found pid line: [" + line + "]", extra=d) + tokens = line.split(':') + testcaseEnv.consumerHostParentPidDict[host] = tokens[1] + def start_producer_performance(systemTestEnv, testcaseEnv): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + testcaseConfigsList = testcaseEnv.testcaseConfigsList + brokerListStr = "" + # construct "broker-list" for producer + for clusterEntityConfigDict in clusterEntityConfigDictList: + entityRole = clusterEntityConfigDict["role"] + if entityRole == "broker": + hostname = clusterEntityConfigDict["hostname"] + entityId = clusterEntityConfigDict["entity_id"] + port = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "port") + + if len(brokerListStr) == 0: + brokerListStr = hostname + ":" + port + else: + brokerListStr = brokerListStr + "," + hostname + ":" + port + + producerConfigList = system_test_utils.get_dict_from_list_of_dicts( \ clusterEntityConfigDictList, "role", "producer_performance") for producerConfig in producerConfigList: @@ -498,26 +532,43 @@ clusterEntityConfigDictList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval( \ clusterEntityConfigDictList, "entity_id", entityId, "java_home") + jmxPort = system_test_utils.get_data_by_lookup_keyval( \ + clusterEntityConfigDictList, "entity_id", entityId, "jmx_port") kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" logger.info("starting producer preformance", extra=d) - producerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + \ - "/producer_performance.log" + producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + producerLogPathName = producerLogPath + "/producer_performance.log" testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"]) cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, + "JMX_PORT=" + jmxPort, kafkaRunClassBin + " kafka.perf.ProducerPerformance", - commandArgs + " &> " + producerLogPathName + "'"] + "--broker-list " +brokerListStr, + commandArgs + " &> " + producerLogPathName, + " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"] cmdStr = " ".join(cmdList) logger.debug("executing command: [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.async_sys_call(cmdStr) + pidCmdStr = "ssh " + host + " 'cat " + producerLogPath + "/entity_" + entityId + "_pid'" + logger.debug("executing command: [" + pidCmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) + # keep track of the remote entity pid in a dictionary + for line in subproc.stdout.readlines(): + if line.startswith("pid"): + line = line.rstrip('\n') + logger.debug("found pid line: [" + line + "]", extra=d) + tokens = line.split(':') + testcaseEnv.producerHostParentPidDict[host] = tokens[1] + + def stop_remote_entity(systemTestEnv, entityId, parentPid): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList @@ -530,6 +581,16 @@ system_test_utils.sigkill_remote_process(hostname, pidStack) +def force_stop_remote_entity(systemTestEnv, entityId, parentPid): + clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + + hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname") + pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid) + + logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d) + system_test_utils.sigkill_remote_process(hostname, pidStack) + + def create_topic(systemTestEnv, testcaseEnv): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList @@ -633,9 +694,9 @@ return True except Exception, e: logger.error("leader info not completed:", extra=d) + print leaderDict + traceback.print_exc() validationStatusDict["Validate leader election successful"] = "FAILED" - print leaderDict - print e return False else: validationStatusDict["Validate leader election successful"] = "FAILED" @@ -677,5 +738,20 @@ logger.debug("executing command [" + cmdStr + "]", extra=d) system_test_utils.sys_call(cmdStr) - +def force_terminate_running_entity(systemTestEnv): + clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + username = getpass.getuser() + for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: + hostname = clusterEntityConfigDict["hostname"] + cmdList = ["ssh " + hostname, + "\"ps auxw | grep -v grep | grep -v Bootstrap | grep ^" + username, + "| grep -i 'java\|server\-start\|run\-\|producer\|consumer'", + "| tr -s ' ' | cut -f2 -d ' ' | xargs kill -9" + "\""] + + cmdStr = " ".join(cmdList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + + system_test_utils.sys_call(cmdStr) + + Index: system_test/utils/system_test_utils.py =================================================================== --- system_test/utils/system_test_utils.py (revision 1376321) +++ system_test/utils/system_test_utils.py (working copy) @@ -8,6 +8,7 @@ import json import logging import os +import re import signal import subprocess import sys @@ -202,6 +203,7 @@ sys_call_return_subproc(cmdStr) except: print "WARN - pid:",pid,"not found" + raise def sigkill_remote_process(hostname, pidStack): @@ -215,6 +217,7 @@ sys_call_return_subproc(cmdStr) except: print "WARN - pid:",pid,"not found" + raise def terminate_process(pidStack): @@ -224,6 +227,7 @@ os.kill(int(pid), signal.SIGTERM) except: print "WARN - pid:",pid,"not found" + raise def convert_keyval_to_cmd_args(configFilePathname): @@ -280,18 +284,37 @@ def setup_remote_hosts(systemTestEnv): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..") + localJavaHome = "" + + subproc = sys_call_return_subproc("which java") + for line in subproc.stdout.readlines(): + if line.startswith("which: no "): + logger.error("No Java binary found in local host", extra=d) + return False + else: + line = line.rstrip('\n') + matchObj = re.match("(.*)\/bin\/java$", line) + localJavaHome = matchObj.group(1) + #print localJavaHome + + listIndex = -1 for clusterEntityConfigDict in clusterEntityConfigDictList: + listIndex += 1 + hostname = clusterEntityConfigDict["hostname"] kafkaHome = clusterEntityConfigDict["kafka_home"] javaHome = clusterEntityConfigDict["java_home"] - localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..") - logger.info("local kafka home : [" + localKafkaHome + "]", extra=d) - if kafkaHome != localKafkaHome: - logger.error("kafkaHome [" + kafkaHome + "] must be the same as [" + localKafkaHome + "] in host [" + hostname + "]", extra=d) - logger.error("please update cluster_config.json and run again. Aborting test ...", extra=d) - sys.exit(1) + if hostname == "localhost" and javaHome == "default": + clusterEntityConfigDictList[listIndex]["java_home"] = localJavaHome + if hostname == "localhost" and kafkaHome == "default": + clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome + + kafkaHome = clusterEntityConfigDict["kafka_home"] + javaHome = clusterEntityConfigDict["java_home"] + #logger.info("checking running processes in host [" + hostname + "]", extra=d) #if not remote_host_processes_stopped(hostname): # logger.error("Running processes found in host [" + hostname + "]", extra=d) Index: system_test/utils/testcase_env.py =================================================================== --- system_test/utils/testcase_env.py (revision 1376321) +++ system_test/utils/testcase_env.py (working copy) @@ -17,6 +17,15 @@ # dictionary of entity parent pid entityParentPidDict = {} + # dictionary of entity parent pid for JMX + entityJmxParentPidDict = {} + + # dictionary of hostname : parent pid for consumer + consumerHostParentPidDict = {} + + # dictionary of hostname : parent pid for producer + producerHostParentPidDict = {} + # list of testcase configs testcaseConfigsList = [] Index: system_test/replication_testsuite/config/producer_performance.properties =================================================================== --- system_test/replication_testsuite/config/producer_performance.properties (revision 1376321) +++ system_test/replication_testsuite/config/producer_performance.properties (working copy) @@ -1,4 +1,3 @@ -broker-list=localhost:2181 topic=mytest messages=200 message-size=100 Index: system_test/replication_testsuite/replica_basic_test.py =================================================================== --- system_test/replication_testsuite/replica_basic_test.py (revision 1376321) +++ system_test/replication_testsuite/replica_basic_test.py (working copy) @@ -11,6 +11,7 @@ import subprocess import sys import time +import traceback from system_test_env import SystemTestEnv sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR) @@ -105,13 +106,13 @@ # self.testcaseEnv.testCaseLogsDir # self.testcaseEnv.testcaseArgumentsDict + print # display testcase name and arguments self.log_message("Test Case : " + testcaseDirName) for k,v in self.testcaseEnv.testcaseArgumentsDict.items(): self.anonLogger.info(" " + k + " : " + v) self.log_message("Description : " + testcaseDescription) - - + # ================================================================ # # ================================================================ # # Product Specific Testing Code Starts Here: # @@ -188,11 +189,7 @@ if (bounceLeaderFlag.lower() == "true"): if self.testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED": - # no leader available for testing => skip this round - self.log_message("stopping all entities") - for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items(): - kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) - + # skip this testcase continue else: # leader elected => stop leader @@ -202,20 +199,26 @@ leaderPPid = self.testcaseEnv.entityParentPidDict[leaderEntityId] except: self.log_message("leader details unavailable") + raise self.log_message("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid) kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, leaderEntityId, leaderPPid) + try: + jmxParentPid = self.testcaseEnv.entityJmxParentPidDict[leaderEntityId] + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, leaderEntityId, jmxParentPid) + except: + self.logger.warn("entity: [" + leaderEntityId + "] - JmxTool ppid not found", extra=self.d) self.testcaseEnv.entityParentPidDict[leaderEntityId] = "" self.logger.info("sleeping for 5s for leader re-election to complete", extra=self.d) time.sleep(5) # starting producer - self.log_message("starting producer") + self.log_message("starting producer in the background") kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv) - self.anonLogger.info("sleeping for 5s") - time.sleep(5) + self.anonLogger.info("sleeping for 30s") + time.sleep(30) # starting previously terminated broker if (bounceLeaderFlag.lower() == "true" and not self.testcaseEnv.entityParentPidDict[leaderEntityId]): @@ -229,14 +232,11 @@ time.sleep(5) # starting consumer - self.log_message("starting consumer") + self.log_message("starting consumer in the background") kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv) - - # this testcase is completed - so stopping all entities - self.log_message("stopping all entities") - for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items(): - kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) - + self.anonLogger.info("sleeping for 20s") + time.sleep(20) + # validate the data matched self.log_message("validating data matched") result = kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv) @@ -249,8 +249,28 @@ except Exception as e: self.log_message("Exception caught : ") print e + traceback.print_exc() + + finally: self.log_message("stopping all entities") for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items(): - kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, entityId, parentPid) + try: + jmxParentPid = self.testcaseEnv.entityJmxParentPidDict[entityId] + kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, entityId, jmxParentPid) + except: + self.logger.warn("entity: [" + entityId + "] - JmxTool ppid not found", extra=self.d) - + for hostname, consumerPPid in self.testcaseEnv.consumerHostParentPidDict.items(): + self.logger.debug("hostname : " + hostname, extra=self.d) + self.logger.debug("consumerPPid : " + consumerPPid, extra=self.d) + consumerEntityId = system_test_utils.get_data_by_lookup_keyval(self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname, "entity_id") + kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, consumerEntityId, consumerPPid) + + for hostname, producerPPid in self.testcaseEnv.consumerHostParentPidDict.items(): + self.logger.debug("hostname : " + hostname, extra=self.d) + self.logger.debug("consumerPPid : " + consumerPPid, extra=self.d) + producerEntityId = system_test_utils.get_data_by_lookup_keyval(self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname, "entity_id") + kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, producerEntityId, producerPPid) + + Index: system_test/replication_testsuite/testcase_1/testcase_1_properties.json =================================================================== --- system_test/replication_testsuite/testcase_1/testcase_1_properties.json (revision 1376321) +++ system_test/replication_testsuite/testcase_1/testcase_1_properties.json (working copy) @@ -47,7 +47,6 @@ "compression-codec": "1", "message-size": "500", "message": "500", - "broker-list": "localhost:9091,localhost:9092,localhost:9093", "log_filename": "producer_performance.log", "config_filename": "producer_performance.properties" }, Index: system_test/README.txt =================================================================== --- system_test/README.txt (revision 1376321) +++ system_test/README.txt (working copy) @@ -1,10 +1,8 @@ # ========================== # Known Issues: # ========================== -1. The "broker-list" in system_test/replication_testsuite/testcase_1/testcase_1_properties.json needs to be manually updated to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.) -2. Sometimes the running processes may not be terminated properly by the script. +1. This test framework currently doesn't support MacOS due to different "ps" argument options from Linux. The correct ps execution is required to terminate the background running processes properly. - # ========================== # Overview # ========================== Index: system_test/cluster_config.json =================================================================== --- system_test/cluster_config.json (revision 1376321) +++ system_test/cluster_config.json (working copy) @@ -4,48 +4,48 @@ "entity_id": "0", "hostname": "localhost", "role": "zookeeper", - "kafka_home": "/home/nnarkhed/Projects/kafka-440", - "java_home": "/export/apps/jdk/JDK-1_6_0_27", + "kafka_home": "default", + "java_home": "default", "jmx_port": "9990" }, { "entity_id": "1", "hostname": "localhost", "role": "broker", - "kafka_home": "/home/nnarkhed/Projects/kafka-440", - "java_home": "/export/apps/jdk/JDK-1_6_0_27", + "kafka_home": "default", + "java_home": "default", "jmx_port": "9991" }, { "entity_id": "2", "hostname": "localhost", "role": "broker", - "kafka_home": "/home/nnarkhed/Projects/kafka-440", - "java_home": "/export/apps/jdk/JDK-1_6_0_27", + "kafka_home": "default", + "java_home": "default", "jmx_port": "9992" }, { "entity_id": "3", "hostname": "localhost", "role": "broker", - "kafka_home": "/home/nnarkhed/Projects/kafka-440", - "java_home": "/export/apps/jdk/JDK-1_6_0_27", + "kafka_home": "default", + "java_home": "default", "jmx_port": "9993" }, { "entity_id": "4", "hostname": "localhost", "role": "producer_performance", - "kafka_home": "/home/nnarkhed/Projects/kafka-440", - "java_home": "/export/apps/jdk/JDK-1_6_0_27", + "kafka_home": "default", + "java_home": "default", "jmx_port": "9994" }, { "entity_id": "5", "hostname": "localhost", "role": "console_consumer", - "kafka_home": "/home/nnarkhed/Projects/kafka-440", - "java_home": "/export/apps/jdk/JDK-1_6_0_27", + "kafka_home": "default", + "java_home": "default", "jmx_port": "9995" } ]