Index: system_test/utils/kafka_system_test_utils.py =================================================================== --- system_test/utils/kafka_system_test_utils.py (revision 1377726) +++ system_test/utils/kafka_system_test_utils.py (working copy) @@ -5,6 +5,7 @@ # =================================== import datetime +import getpass import inspect import json import logging @@ -112,16 +113,40 @@ configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config") metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics") - dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards") + logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "default") + # ============================== + # collect entity log file + # ============================== cmdList = ["scp", + hostname + ":" + logPathName + "/*", + logPathName] + cmdStr = " ".join(cmdList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + system_test_utils.sys_call(cmdStr) + + # ============================== + # collect entity metrics file + # ============================== + cmdList = ["scp", hostname + ":" + metricsPathName + "/*", metricsPathName] cmdStr = " ".join(cmdList) logger.debug("executing command [" + cmdStr + "]", extra=d) system_test_utils.sys_call(cmdStr) + + # ============================== + # collect dashboards file + # ============================== + dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards") + cmdList = ["scp", + hostname + ":" + dashboardsPathName + "/*", + dashboardsPathName] + cmdStr = " ".join(cmdList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + system_test_utils.sys_call(cmdStr) + - def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv): testCaseBaseDir = testcaseEnv.testCaseBaseDir @@ -198,7 +223,8 @@ jmxUrl = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi" kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "java_home") - metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", entityId, "metrics") + role = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "role") + metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics") startMetricsCmdList = ["ssh " + jmxHost, "'JAVA_HOME=" + javaHome, @@ -211,21 +237,29 @@ startMetricsCommand = " ".join(startMetricsCmdList) logger.debug("executing command: [" + startMetricsCommand + "]", extra=d) system_test_utils.async_sys_call(startMetricsCommand) + time.sleep(1) - pidCmdStr = "ssh " + jmxHost + " 'cat " + metricsPathName + "/entity_pid'" + pidCmdStr = "ssh " + jmxHost + " 'cat " + metricsPathName + "/entity_pid 2> /dev/null'" logger.debug("executing command: [" + pidCmdStr + "]", extra=d) subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) - # keep track of the remote entity pid in a dictionary + # keep track of JMX ppid in a dictionary of entity_id to list of JMX ppid + # testcaseEnv.entityJmxParentPidDict: + # key: entity_id + # val: list of JMX ppid associated to that entity_id + # { 1: [1234, 1235, 1236], 2: [2234, 2235, 2236], ... } for line in subproc.stdout.readlines(): + line = line.rstrip('\n') + #logger.debug("line: [" + line + "]", extra=d) if line.startswith("pid"): - line = line.rstrip('\n') logger.debug("found pid line: [" + line + "]", extra=d) tokens = line.split(':') thisPid = tokens[1] - testcaseEnv.entityParentPidDict[thisPid] = thisPid + if entityId not in testcaseEnv.entityJmxParentPidDict: + testcaseEnv.entityJmxParentPidDict[entityId] = [] + testcaseEnv.entityJmxParentPidDict[entityId].append(thisPid) + #print "#### => ", testcaseEnv.entityJmxParentPidDict - def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv): logger.info("calling generate_properties_files", extra=d) @@ -273,7 +307,7 @@ cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg) elif ( clusterCfg["role"] == "producer_performance"): - tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr + #tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr copy_file_with_dict_values(cfgTemplatePathname + "/producer_performance.properties", \ cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg) @@ -322,6 +356,62 @@ start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId) +def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv): + + logger.info("looking up broker shutdown...", extra=d) + + # keep track of broker related data in this dict such as broker id, + # entity id and timestamp and return it to the caller function + shutdownBrokerDict = {} + + clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \ + clusterEntityConfigDictList, "role", "broker", "entity_id") + + for brokerEntityId in brokerEntityIdList: + + hostname = system_test_utils.get_data_by_lookup_keyval( \ + clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname") + logFile = system_test_utils.get_data_by_lookup_keyval( \ + testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename") + + shutdownBrokerDict["entity_id"] = brokerEntityId + shutdownBrokerDict["hostname"] = hostname + + logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") + cmdStrList = ["ssh " + hostname, + "\"grep -i -h '" + testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] + "' ", + logPathName + "/" + logFile + " | ", + "sort | tail -1\""] + cmdStr = " ".join(cmdStrList) + + logger.debug("executing command [" + cmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + + line = line.rstrip('\n') + + if testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] in line: + logger.info("found the log line : " + line, extra=d) + try: + matchObj = re.match(testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"], line) + datetimeStr = matchObj.group(1) + datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f") + unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond + #print "{0:.3f}".format(unixTs) + shutdownBrokerDict["timestamp"] = unixTs + shutdownBrokerDict["brokerid"] = matchObj.group(2) + logger.info("brokerid: [" + shutdownBrokerDict["brokerid"] + "] entity_id: [" + shutdownBrokerDict["entity_id"] + "]", extra=d) + return shutdownBrokerDict + except: + logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d) + raise + #else: + # logger.debug("unmatched line found [" + line + "]", extra=d) + + return shutdownBrokerDict + + def get_leader_elected_log_line(systemTestEnv, testcaseEnv): logger.info("looking up leader...", extra=d) @@ -341,9 +431,6 @@ logFile = system_test_utils.get_data_by_lookup_keyval( \ testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename") - leaderDict["entity_id"] = brokerEntityId - leaderDict["hostname"] = hostname - logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") cmdStrList = ["ssh " + hostname, "\"grep -i -h '" + testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + "' ", @@ -365,12 +452,21 @@ datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f") unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond #print "{0:.3f}".format(unixTs) - leaderDict["timestamp"] = unixTs - leaderDict["brokerid"] = matchObj.group(2) - leaderDict["topic"] = matchObj.group(3) - leaderDict["partition"] = matchObj.group(4) + + # update leaderDict when + # 1. leaderDict has no logline entry + # 2. leaderDict has existing logline entry but found another logline with more recent timestamp + if (len(leaderDict) > 0 and leaderDict["timestamp"] < unixTs) or (len(leaderDict) == 0): + leaderDict["timestamp"] = unixTs + leaderDict["brokerid"] = matchObj.group(2) + leaderDict["topic"] = matchObj.group(3) + leaderDict["partition"] = matchObj.group(4) + leaderDict["entity_id"] = brokerEntityId + leaderDict["hostname"] = hostname + logger.info("brokerid: [" + leaderDict["brokerid"] + "] entity_id: [" + leaderDict["entity_id"] + "]", extra=d) except: logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d) + raise #else: # logger.debug("unmatched line found [" + line + "]", extra=d) @@ -402,12 +498,12 @@ if role == "zookeeper": cmdList = ["ssh " + hostname, "'JAVA_HOME=" + javaHome, + "JMX_PORT=" + jmxPort, kafkaHome + "/bin/zookeeper-server-start.sh ", configPathName + "/" + configFile + " &> ", logPathName + "/" + logFile + " & echo pid:$! > ", logPathName + "/entity_" + entityId + "_pid'"] - # construct zk.connect str and update it to testcaseEnv.userDefinedEnvVarDict.zkConnectStr if ( len(testcaseEnv.userDefinedEnvVarDict["zkConnectStr"]) > 0 ): testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = \ @@ -420,7 +516,7 @@ "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka", - configPathName + "/" + configFile + " &> ", + configPathName + "/" + configFile + " >> ", logPathName + "/" + logFile + " & echo pid:$! > ", logPathName + "/entity_" + entityId + "_pid'"] @@ -446,10 +542,9 @@ tokens = line.split(':') testcaseEnv.entityParentPidDict[entityId] = tokens[1] - # if it is a broker, start metric collection - if role == "broker": - start_metrics_collection(hostname, jmxPort, "kafka:type=kafka.SocketServerStats", \ - "AvgFetchRequestMs, AvgProduceRequestMs", entityId, clusterEntityConfigDictList, testcaseEnv) + # start metric collection + start_metrics_collection(hostname, jmxPort, "kafka:type=kafka.SocketServerStats", \ + "AvgFetchRequestMs, AvgProduceRequestMs", entityId, clusterEntityConfigDictList, testcaseEnv) def start_console_consumer(systemTestEnv, testcaseEnv): @@ -464,31 +559,66 @@ clusterEntityConfigDictList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval( \ clusterEntityConfigDictList, "entity_id", entityId, "java_home") + jmxPort = system_test_utils.get_data_by_lookup_keyval( \ + clusterEntityConfigDictList, "entity_id", entityId, "jmx_port") kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" logger.info("starting console consumer", extra=d) - consumerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + \ - "/console_consumer.log" + consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + consumerLogPathName = consumerLogPath + "/console_consumer.log" testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consumerLogPathName commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["consumerConfigPathName"]) cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, + "JMX_PORT=" + jmxPort, kafkaRunClassBin + " kafka.consumer.ConsoleConsumer", - commandArgs + " &> " + consumerLogPathName + "'"] + commandArgs + " &> " + consumerLogPathName, + " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] cmdStr = " ".join(cmdList) logger.debug("executing command: [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.async_sys_call(cmdStr) + pidCmdStr = "ssh " + host + " 'cat " + consumerLogPath + "/entity_" + entityId + "_pid'" + logger.debug("executing command: [" + pidCmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) + # keep track of the remote entity pid in a dictionary + for line in subproc.stdout.readlines(): + if line.startswith("pid"): + line = line.rstrip('\n') + logger.debug("found pid line: [" + line + "]", extra=d) + tokens = line.split(':') + testcaseEnv.consumerHostParentPidDict[host] = tokens[1] + # start metric collection + start_metrics_collection(host, jmxPort, "kafka:type=kafka.SocketServerStats", \ + "AvgFetchRequestMs, AvgProduceRequestMs", entityId, clusterEntityConfigDictList, testcaseEnv) + + def start_producer_performance(systemTestEnv, testcaseEnv): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + testcaseConfigsList = testcaseEnv.testcaseConfigsList + brokerListStr = "" + # construct "broker-list" for producer + for clusterEntityConfigDict in clusterEntityConfigDictList: + entityRole = clusterEntityConfigDict["role"] + if entityRole == "broker": + hostname = clusterEntityConfigDict["hostname"] + entityId = clusterEntityConfigDict["entity_id"] + port = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "port") + + if len(brokerListStr) == 0: + brokerListStr = hostname + ":" + port + else: + brokerListStr = brokerListStr + "," + hostname + ":" + port + + producerConfigList = system_test_utils.get_dict_from_list_of_dicts( \ clusterEntityConfigDictList, "role", "producer_performance") for producerConfig in producerConfigList: @@ -498,26 +628,47 @@ clusterEntityConfigDictList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval( \ clusterEntityConfigDictList, "entity_id", entityId, "java_home") + jmxPort = system_test_utils.get_data_by_lookup_keyval( \ + clusterEntityConfigDictList, "entity_id", entityId, "jmx_port") kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" logger.info("starting producer preformance", extra=d) - producerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + \ - "/producer_performance.log" + producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + producerLogPathName = producerLogPath + "/producer_performance.log" testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"]) cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, + "JMX_PORT=" + jmxPort, kafkaRunClassBin + " kafka.perf.ProducerPerformance", - commandArgs + " &> " + producerLogPathName + "'"] + "--broker-list " +brokerListStr, + commandArgs + " &> " + producerLogPathName, + " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"] cmdStr = " ".join(cmdList) logger.debug("executing command: [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.async_sys_call(cmdStr) + pidCmdStr = "ssh " + host + " 'cat " + producerLogPath + "/entity_" + entityId + "_pid'" + logger.debug("executing command: [" + pidCmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) + # keep track of the remote entity pid in a dictionary + for line in subproc.stdout.readlines(): + if line.startswith("pid"): + line = line.rstrip('\n') + logger.debug("found pid line: [" + line + "]", extra=d) + tokens = line.split(':') + testcaseEnv.producerHostParentPidDict[host] = tokens[1] + + # start metric collection + start_metrics_collection(host, jmxPort, "kafka:type=kafka.SocketServerStats", \ + "AvgFetchRequestMs, AvgProduceRequestMs", entityId, clusterEntityConfigDictList, testcaseEnv) + + def stop_remote_entity(systemTestEnv, entityId, parentPid): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList @@ -530,23 +681,27 @@ system_test_utils.sigkill_remote_process(hostname, pidStack) +def force_stop_remote_entity(systemTestEnv, entityId, parentPid): + clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + + hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname") + pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid) + + logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d) + system_test_utils.sigkill_remote_process(hostname, pidStack) + + def create_topic(systemTestEnv, testcaseEnv): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts( \ - clusterEntityConfigDictList, "role", "producer_performance") - prodPerfCfgDict = system_test_utils.get_dict_from_list_of_dicts( \ - testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfgList[0]["entity_id"]) + prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance") + prodPerfCfgDict = system_test_utils.get_dict_from_list_of_dicts(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfgList[0]["entity_id"]) prodTopicList = prodPerfCfgDict[0]["topic"].split(',') - zkEntityId = system_test_utils.get_data_by_lookup_keyval( \ - clusterEntityConfigDictList, "role", "zookeeper", "entity_id") - zkHost = system_test_utils.get_data_by_lookup_keyval( \ - clusterEntityConfigDictList, "role", "zookeeper", "hostname") - kafkaHome = system_test_utils.get_data_by_lookup_keyval( \ - clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home") - javaHome = system_test_utils.get_data_by_lookup_keyval( \ - clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home") + zkEntityId = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "entity_id") + zkHost = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname") + kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home") + javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home") createTopicBin = kafkaHome + "/bin/kafka-create-topic.sh" logger.info("zkEntityId : " + zkEntityId, extra=d) @@ -633,9 +788,9 @@ return True except Exception, e: logger.error("leader info not completed:", extra=d) + print leaderDict + traceback.print_exc() validationStatusDict["Validate leader election successful"] = "FAILED" - print leaderDict - print e return False else: validationStatusDict["Validate leader election successful"] = "FAILED" @@ -652,7 +807,8 @@ hostname = clusterEntityConfigDict["hostname"] entityId = clusterEntityConfigDict["entity_id"] role = clusterEntityConfigDict["role"] - #testcasePathName = testcaseEnv.testcaseBaseDir + kafkaHome = clusterEntityConfigDict["kafka_home"] + testCaseBaseDir = testcaseEnv.testCaseBaseDir cmdStr = "" dataDir = "" @@ -674,8 +830,94 @@ logger.warn("aborting test...", extra=d) sys.exit(1) + # ============================ + # cleaning data dir + # ============================ logger.debug("executing command [" + cmdStr + "]", extra=d) system_test_utils.sys_call(cmdStr) - + # ============================ + # cleaning log/metrics/svg, ... + # ============================ + if system_test_utils.remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"): + # so kafkaHome is a real kafka installation + cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.log' | xargs rm 2> /dev/null\"" + logger.debug("executing command [" + cmdStr + "]", extra=d) + system_test_utils.sys_call(cmdStr) + cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*_pid' | xargs rm 2> /dev/null\"" + logger.debug("executing command [" + cmdStr + "]", extra=d) + system_test_utils.sys_call(cmdStr) + + cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.csv' | xargs rm 2> /dev/null\"" + logger.debug("executing command [" + cmdStr + "]", extra=d) + system_test_utils.sys_call(cmdStr) + + cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.svg' | xargs rm 2> /dev/null\"" + logger.debug("executing command [" + cmdStr + "]", extra=d) + system_test_utils.sys_call(cmdStr) + + cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.html' | xargs rm 2> /dev/null\"" + logger.debug("executing command [" + cmdStr + "]", extra=d) + system_test_utils.sys_call(cmdStr) + + +def force_terminate_running_entity(systemTestEnv): + clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + username = getpass.getuser() + + for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: + hostname = clusterEntityConfigDict["hostname"] + cmdList = ["ssh " + hostname, + "\"ps auxw | grep -v grep | grep -v Bootstrap | grep ^" + username, + "| grep -i 'java\|server\-start\|run\-\|producer\|consumer'", + "| tr -s ' ' | cut -f2 -d ' ' | xargs kill -9" + "\""] + + cmdStr = " ".join(cmdList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + + system_test_utils.sys_call(cmdStr) + + +def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict): + leaderEntityId = None + leaderBrokerId = None + leaderPPid = None + shutdownLeaderTimestamp = None + + if testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED": + # leader election is not successful - something is wrong => so skip this testcase + #continue + return None + else: + # leader elected => stop leader + try: + leaderEntityId = leaderDict["entity_id"] + leaderBrokerId = leaderDict["brokerid"] + leaderPPid = testcaseEnv.entityParentPidDict[leaderEntityId] + except: + logger.info("leader details unavailable", extra=d) + raise + + logger.info("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid, extra=d) + stop_remote_entity(systemTestEnv, leaderEntityId, leaderPPid) + + logger.info("sleeping for 5s for leader re-election to complete", extra=d) + time.sleep(5) + + # get broker shut down completed timestamp + shutdownBrokerDict = get_broker_shutdown_log_line(systemTestEnv, testcaseEnv) + #print shutdownBrokerDict + logger.info("unix timestamp of shut down completed: " + str("{0:.6f}".format(shutdownBrokerDict["timestamp"])), extra=d) + + logger.info("looking up new leader", extra=d) + leaderDict2 = get_leader_elected_log_line(systemTestEnv, testcaseEnv) + #print leaderDict2 + logger.info("unix timestamp of new elected leader: " + str("{0:.6f}".format(leaderDict2["timestamp"])), extra=d) + leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownBrokerDict["timestamp"]) + logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec", extra=d) + + return leaderReElectionLatency + + + Index: system_test/utils/system_test_utils.py =================================================================== --- system_test/utils/system_test_utils.py (revision 1377726) +++ system_test/utils/system_test_utils.py (working copy) @@ -8,7 +8,9 @@ import json import logging import os +import re import signal +import socket import subprocess import sys import time @@ -18,6 +20,15 @@ d = {'name_of_class': thisClassName} +def get_current_unix_timestamp(): + ts = time.time() + return "{0:.6f}".format(ts) + + +def get_local_hostname(): + return socket.gethostname() + + def sys_call(cmdStr): output = "" #logger.info("executing command [" + cmdStr + "]", extra=d) @@ -202,6 +213,7 @@ sys_call_return_subproc(cmdStr) except: print "WARN - pid:",pid,"not found" + raise def sigkill_remote_process(hostname, pidStack): @@ -215,6 +227,7 @@ sys_call_return_subproc(cmdStr) except: print "WARN - pid:",pid,"not found" + raise def terminate_process(pidStack): @@ -224,6 +237,7 @@ os.kill(int(pid), signal.SIGTERM) except: print "WARN - pid:",pid,"not found" + raise def convert_keyval_to_cmd_args(configFilePathname): @@ -251,6 +265,17 @@ return p +def remote_host_file_exists(hostname, pathname): + cmdStr = "ssh " + hostname + " 'ls " + pathname + "'" + logger.debug("executing command: [" + cmdStr + "]", extra=d) + subproc = sys_call_return_subproc(cmdStr) + + for line in subproc.stdout.readlines(): + if "No such file or directory" in line: + return False + return True + + def remote_host_directory_exists(hostname, path): cmdStr = "ssh " + hostname + " 'ls -d " + path + "'" logger.debug("executing command: [" + cmdStr + "]", extra=d) @@ -280,23 +305,36 @@ def setup_remote_hosts(systemTestEnv): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..") + localJavaHome = "" + + subproc = sys_call_return_subproc("which java") + for line in subproc.stdout.readlines(): + if line.startswith("which: no "): + logger.error("No Java binary found in local host", extra=d) + return False + else: + line = line.rstrip('\n') + matchObj = re.match("(.*)\/bin\/java$", line) + localJavaHome = matchObj.group(1) + + listIndex = -1 for clusterEntityConfigDict in clusterEntityConfigDictList: + listIndex += 1 + hostname = clusterEntityConfigDict["hostname"] kafkaHome = clusterEntityConfigDict["kafka_home"] javaHome = clusterEntityConfigDict["java_home"] - localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..") - logger.info("local kafka home : [" + localKafkaHome + "]", extra=d) - if kafkaHome != localKafkaHome: - logger.error("kafkaHome [" + kafkaHome + "] must be the same as [" + localKafkaHome + "] in host [" + hostname + "]", extra=d) - logger.error("please update cluster_config.json and run again. Aborting test ...", extra=d) - sys.exit(1) + if hostname == "localhost" and javaHome == "default": + clusterEntityConfigDictList[listIndex]["java_home"] = localJavaHome - #logger.info("checking running processes in host [" + hostname + "]", extra=d) - #if not remote_host_processes_stopped(hostname): - # logger.error("Running processes found in host [" + hostname + "]", extra=d) - # return False + if hostname == "localhost" and kafkaHome == "default": + clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome + kafkaHome = clusterEntityConfigDict["kafka_home"] + javaHome = clusterEntityConfigDict["java_home"] + logger.info("checking JAVA_HOME [" + javaHome + "] in host [" + hostname + "]", extra=d) if not remote_host_directory_exists(hostname, javaHome): logger.error("Directory not found: [" + javaHome + "] in host [" + hostname + "]", extra=d) @@ -324,3 +362,22 @@ for line in subproc.stdout.readlines(): dummyVar = 1 + +def remove_kafka_home_dir_at_remote_hosts(hostname, kafkaHome): + + if remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"): + cmdStr = "ssh " + hostname + " 'chmod -R 777 " + kafkaHome + "'" + logger.info("executing command [" + cmdStr + "]", extra=d) + system_test_utils.sys_call(cmdStr) + + cmdStr = "ssh " + hostname + " 'rm -r " + kafkaHome + "'" + logger.info("executing command [" + cmdStr + "]", extra=d) + #system_test_utils.sys_call(cmdStr) + else: + logger.warn("possible destructive command [" + cmdStr + "]", extra=d) + logger.warn("check config file: system_test/cluster_config.properties", extra=d) + logger.warn("aborting test...", extra=d) + sys.exit(1) + + + Index: system_test/utils/testcase_env.py =================================================================== --- system_test/utils/testcase_env.py (revision 1377726) +++ system_test/utils/testcase_env.py (working copy) @@ -14,9 +14,30 @@ # Generic testcase environment # ================================ - # dictionary of entity parent pid + # dictionary of entity_id to ppid for entities such as zookeepers & brokers + # key: entity_id + # val: ppid of zk or broker associated to that entity_id + # { 0: 12345, 1: 12389, ... } entityParentPidDict = {} + # dictionary of entity_id to list of JMX ppid + # key: entity_id + # val: list of JMX ppid associated to that entity_id + # { 1: [1234, 1235, 1236], 2: [2234, 2235, 2236], ... } + entityJmxParentPidDict = {} + + # dictionary of hostname-topic-ppid for consumer + # key: hostname + # val: dict of topic-ppid + # { ela4-app0996: { test1 : 12345 }, ela4-app0996: { test2 : 12389 }, ... } + consumerHostParentPidDict = {} + + # dictionary of hostname-topic-ppid for producer + # key: hostname + # val: dict of topic-ppid + # { ela4-app0996: { test1 : 12345 }, ela4-app0996: { test2 : 12389 }, ... } + producerHostParentPidDict = {} + # list of testcase configs testcaseConfigsList = [] Index: system_test/replication_testsuite/config/producer_performance.properties =================================================================== --- system_test/replication_testsuite/config/producer_performance.properties (revision 1377726) +++ system_test/replication_testsuite/config/producer_performance.properties (working copy) @@ -1,4 +1,3 @@ -broker-list=localhost:2181 topic=mytest messages=200 message-size=100 Index: system_test/replication_testsuite/replica_basic_test.py =================================================================== --- system_test/replication_testsuite/replica_basic_test.py (revision 1377726) +++ system_test/replication_testsuite/replica_basic_test.py (working copy) @@ -11,6 +11,7 @@ import subprocess import sys import time +import traceback from system_test_env import SystemTestEnv sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR) @@ -26,6 +27,7 @@ testModuleAbsPathName = os.path.realpath(__file__) testSuiteAbsPathName = os.path.abspath(os.path.dirname(testModuleAbsPathName)) isLeaderLogPattern = "Completed the leader state transition" + brokerShutDownCompletedPattern = "shut down completed" def __init__(self, systemTestEnv): @@ -64,12 +66,16 @@ self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName # initialize self.testcaseEnv with user-defined environment - self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] = \ - ReplicaBasicTest.isLeaderLogPattern + self.testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] = ReplicaBasicTest.brokerShutDownCompletedPattern + self.testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"] = \ + "\[(.*?)\] .* \[Kafka Server (.*?)\], " + ReplicaBasicTest.brokerShutDownCompletedPattern + + self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] = ReplicaBasicTest.isLeaderLogPattern self.testcaseEnv.userDefinedEnvVarDict["REGX_LEADER_ELECTION_PATTERN"] = \ "\[(.*?)\] .* Broker (.*?): " + \ self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + \ " for topic (.*?) partition (.*?) \(.*" + self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = "" # find testcase properties json file @@ -82,7 +88,7 @@ testcaseDirName = os.path.basename(testCasePathName) self.testcaseEnv.testcaseResultsDict["test_case_name"] = testcaseDirName - #### => update testcaseEnv + # update testcaseEnv self.testcaseEnv.testCaseBaseDir = testCasePathName self.testcaseEnv.testCaseLogsDir = self.testcaseEnv.testCaseBaseDir + "/logs" @@ -91,7 +97,6 @@ for k,v in testcaseNonEntityDataDict.items(): if ( k == "description" ): testcaseDescription = v - #### => update testcaseEnv # TestcaseEnv.testcaseArgumentsDict initialized, this dictionary keeps track of the # "testcase_args" in the testcase_properties.json such as replica_factor, num_partition, ... self.testcaseEnv.testcaseArgumentsDict = testcaseNonEntityDataDict["testcase_args"] @@ -105,13 +110,13 @@ # self.testcaseEnv.testCaseLogsDir # self.testcaseEnv.testcaseArgumentsDict + print # display testcase name and arguments self.log_message("Test Case : " + testcaseDirName) for k,v in self.testcaseEnv.testcaseArgumentsDict.items(): self.anonLogger.info(" " + k + " : " + v) self.log_message("Description : " + testcaseDescription) - - + # ================================================================ # # ================================================================ # # Product Specific Testing Code Starts Here: # @@ -134,7 +139,7 @@ # clean up data directories specified in zookeeper.properties and kafka_server_.properties kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv) - + # generate remote hosts log/config dirs if not exist kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv) @@ -177,67 +182,50 @@ # 'topic': 'test_1', # 'brokerid': '3'} + # ============================================= # validate to see if leader election is successful + # ============================================= self.log_message("validating leader election") result = kafka_system_test_utils.validate_leader_election_successful( \ self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict) - # checking to see if leader bouncing is required in this testcase + # ============================================= + # get leader re-election latency + # ============================================= bounceLeaderFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_leader"] self.log_message("bounce_leader flag : " + bounceLeaderFlag) - if (bounceLeaderFlag.lower() == "true"): - if self.testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED": - # no leader available for testing => skip this round - self.log_message("stopping all entities") - for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items(): - kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) - - continue - else: - # leader elected => stop leader - try: - leaderEntityId = leaderDict["entity_id"] - leaderBrokerId = leaderDict["brokerid"] - leaderPPid = self.testcaseEnv.entityParentPidDict[leaderEntityId] - except: - self.log_message("leader details unavailable") - - self.log_message("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid) - - kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, leaderEntityId, leaderPPid) - self.testcaseEnv.entityParentPidDict[leaderEntityId] = "" - - self.logger.info("sleeping for 5s for leader re-election to complete", extra=self.d) - time.sleep(5) - + reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict) + + # ============================================= # starting producer - self.log_message("starting producer") + # ============================================= + self.log_message("starting producer in the background") kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv) - self.anonLogger.info("sleeping for 5s") - time.sleep(5) - + self.anonLogger.info("sleeping for 30s") + time.sleep(30) + + # ============================================= # starting previously terminated broker - if (bounceLeaderFlag.lower() == "true" and not self.testcaseEnv.entityParentPidDict[leaderEntityId]): + # ============================================= + if bounceLeaderFlag.lower() == "true": self.log_message("starting the previously terminated broker") - - stoppedLeaderEntityId = leaderDict["entity_id"] - kafka_system_test_utils.start_entity_in_background( - self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId) - + stoppedLeaderEntityId = leaderDict["entity_id"] + kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId) self.anonLogger.info("sleeping for 5s") time.sleep(5) + # ============================================= # starting consumer - self.log_message("starting consumer") + # ============================================= + self.log_message("starting consumer in the background") kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv) - - # this testcase is completed - so stopping all entities - self.log_message("stopping all entities") - for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items(): - kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) - + self.anonLogger.info("sleeping for 20s") + time.sleep(20) + + # ============================================= # validate the data matched + # ============================================= self.log_message("validating data matched") result = kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv) @@ -249,8 +237,24 @@ except Exception as e: self.log_message("Exception caught : ") print e + traceback.print_exc() + + finally: self.log_message("stopping all entities") + for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items(): - kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, entityId, parentPid) - + for entityId, jmxParentPidList in self.testcaseEnv.entityJmxParentPidDict.items(): + for jmxParentPid in jmxParentPidList: + kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, entityId, jmxParentPid) + + for hostname, consumerPPid in self.testcaseEnv.consumerHostParentPidDict.items(): + consumerEntityId = system_test_utils.get_data_by_lookup_keyval(self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname, "entity_id") + kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, consumerEntityId, consumerPPid) + + for hostname, producerPPid in self.testcaseEnv.producerHostParentPidDict.items(): + producerEntityId = system_test_utils.get_data_by_lookup_keyval(self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname, "entity_id") + kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, producerEntityId, producerPPid) + + Index: system_test/replication_testsuite/testcase_1/testcase_1_properties.json =================================================================== --- system_test/replication_testsuite/testcase_1/testcase_1_properties.json (revision 1377726) +++ system_test/replication_testsuite/testcase_1/testcase_1_properties.json (working copy) @@ -3,7 +3,7 @@ "testcase_args": { "bounce_leader": "true", "replica_factor": "3", - "num_partition": "2" + "num_partition": "1" }, "entities": [ { @@ -47,7 +47,6 @@ "compression-codec": "1", "message-size": "500", "message": "500", - "broker-list": "localhost:9091,localhost:9092,localhost:9093", "log_filename": "producer_performance.log", "config_filename": "producer_performance.properties" }, Index: system_test/README.txt =================================================================== --- system_test/README.txt (revision 1377726) +++ system_test/README.txt (working copy) @@ -1,10 +1,8 @@ # ========================== # Known Issues: # ========================== -1. The "broker-list" in system_test/replication_testsuite/testcase_1/testcase_1_properties.json needs to be manually updated to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.) -2. Sometimes the running processes may not be terminated properly by the script. +1. This test framework currently doesn't support MacOS due to different "ps" argument options from Linux. The correct ps execution is required to terminate the background running processes properly. - # ========================== # Overview # ========================== Index: system_test/cluster_config.json =================================================================== --- system_test/cluster_config.json (revision 1377726) +++ system_test/cluster_config.json (working copy) @@ -4,48 +4,48 @@ "entity_id": "0", "hostname": "localhost", "role": "zookeeper", - "kafka_home": "/home/nnarkhed/Projects/kafka-440", - "java_home": "/export/apps/jdk/JDK-1_6_0_27", + "kafka_home": "default", + "java_home": "default", "jmx_port": "9990" }, { "entity_id": "1", "hostname": "localhost", "role": "broker", - "kafka_home": "/home/nnarkhed/Projects/kafka-440", - "java_home": "/export/apps/jdk/JDK-1_6_0_27", + "kafka_home": "default", + "java_home": "default", "jmx_port": "9991" }, { "entity_id": "2", "hostname": "localhost", "role": "broker", - "kafka_home": "/home/nnarkhed/Projects/kafka-440", - "java_home": "/export/apps/jdk/JDK-1_6_0_27", + "kafka_home": "default", + "java_home": "default", "jmx_port": "9992" }, { "entity_id": "3", "hostname": "localhost", "role": "broker", - "kafka_home": "/home/nnarkhed/Projects/kafka-440", - "java_home": "/export/apps/jdk/JDK-1_6_0_27", + "kafka_home": "default", + "java_home": "default", "jmx_port": "9993" }, { "entity_id": "4", "hostname": "localhost", "role": "producer_performance", - "kafka_home": "/home/nnarkhed/Projects/kafka-440", - "java_home": "/export/apps/jdk/JDK-1_6_0_27", + "kafka_home": "default", + "java_home": "default", "jmx_port": "9994" }, { "entity_id": "5", "hostname": "localhost", "role": "console_consumer", - "kafka_home": "/home/nnarkhed/Projects/kafka-440", - "java_home": "/export/apps/jdk/JDK-1_6_0_27", + "kafka_home": "default", + "java_home": "default", "jmx_port": "9995" } ]