From aa257936691b98ba875ddc18e4c601dd9320c04b Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 27 Oct 2014 09:05:13 -0700 Subject: [PATCH 1/2] KAFKA-1748 Make remote hosts used by system tests more flexibly by allowing username, hostname, and SSH args. --- .../mirror_maker_testsuite/mirror_maker_test.py | 2 +- .../replication_testsuite/replica_basic_test.py | 2 +- system_test/utils/kafka_system_test_utils.py | 393 +++++++-------------- system_test/utils/metrics.py | 15 +- system_test/utils/system_test_utils.py | 237 ++++++++----- 5 files changed, 292 insertions(+), 357 deletions(-) diff --git a/system_test/mirror_maker_testsuite/mirror_maker_test.py b/system_test/mirror_maker_testsuite/mirror_maker_test.py index c0117c6..c4a2241 100644 --- a/system_test/mirror_maker_testsuite/mirror_maker_test.py +++ b/system_test/mirror_maker_testsuite/mirror_maker_test.py @@ -259,7 +259,7 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils): time.sleep(15) self.anonLogger.info("terminate Mirror Maker") cmdStr = "ps auxw | grep Mirror | grep -v grep | tr -s ' ' | cut -f2 -d ' ' | xargs kill -15" - subproc = system_test_utils.sys_call_return_subproc(cmdStr) + subproc = system_test_utils.async_sys_call(cmdStr) for line in subproc.stdout.readlines(): line = line.rstrip('\n') self.anonLogger.info("#### ["+line+"]") diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py index 660006c..5503055 100644 --- a/system_test/replication_testsuite/replica_basic_test.py +++ b/system_test/replication_testsuite/replica_basic_test.py @@ -319,7 +319,7 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): pauseTime = self.testcaseEnv.testcaseArgumentsDict["pause_time_in_seconds"] parentPid = self.testcaseEnv.entityBrokerParentPidDict[leaderDict["entity_id"]] pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid) - system_test_utils.simulate_garbage_collection_pause_in_remote_process(hostname, pidStack, pauseTime) + system_test_utils.simulate_garbage_collection_pause_in_remote_process(leaderDict["remote"], pidStack, pauseTime) except: pass diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 1093b66..c65650e 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -20,26 +20,21 @@ # kafka_system_test_utils.py # =================================== -import datetime import getpass import hashlib -import inspect import json import logging import os import pprint import re -import subprocess import sys import thread import time import traceback +from datetime import datetime import system_test_utils -import metrics -from datetime import datetime -from time import mktime # ==================================================================== # Two logging formats are defined in system_test/system_test_runner.py @@ -128,13 +123,13 @@ def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): tcConfigsList = testcaseEnv.testcaseConfigsList for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: - hostname = clusterEntityConfigDict["hostname"] + host = system_test_utils.get_remote_account(clusterEntityConfigDict) entity_id = clusterEntityConfigDict["entity_id"] role = clusterEntityConfigDict["role"] kafkaHome = clusterEntityConfigDict["kafka_home"] logger.debug("entity_id : " + entity_id, extra=d) - logger.debug("hostname : " + hostname, extra=d) + logger.debug("hostname : " + host.hostname, extra=d) logger.debug("role : " + role, extra=d) configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config") @@ -143,7 +138,7 @@ def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): rmtLogPathName = logPathName rmtMetricsPathName = metricsPathName - if hostname != "localhost": + if host.hostname != "localhost": rmtConfigPathName = replace_kafka_home(configPathName, kafkaHome) rmtMetricsPathName = replace_kafka_home(metricsPathName, kafkaHome) rmtLogPathName = replace_kafka_home(logPathName, kafkaHome) @@ -151,22 +146,12 @@ def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): # ============================== # collect entity log file # ============================== - cmdList = ["scp", - hostname + ":" + rmtLogPathName + "/*", - logPathName] - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.scp_from_remote(host, rmtLogPathName + '/*', logPathName) # ============================== # collect entity metrics file # ============================== - cmdList = ["scp", - hostname + ":" + rmtMetricsPathName + "/*", - metricsPathName] - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.scp_from_remote(host, rmtMetricsPathName + '/*', metricsPathName) # ============================== # collect broker log segment file @@ -174,13 +159,7 @@ def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): if role == "broker": dataLogPathName = system_test_utils.get_data_by_lookup_keyval( testcaseEnv.testcaseConfigsList, "entity_id", entity_id, "log.dir") - - cmdList = ["scp -r", - hostname + ":" + dataLogPathName, - logPathName] - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.scp_from_remote(host, dataLogPathName, logPathName, recursive=True) # ============================== # collect ZK log @@ -188,13 +167,7 @@ def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): if role == "zookeeper": dataLogPathName = system_test_utils.get_data_by_lookup_keyval( testcaseEnv.testcaseConfigsList, "entity_id", entity_id, "dataDir") - - cmdList = ["scp -r", - hostname + ":" + dataLogPathName, - logPathName] - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.scp_from_remote(host, dataLogPathName, logPathName, recursive=True) # ============================== # collect dashboards file @@ -202,21 +175,17 @@ def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards") rmtDashboardsPathName = dashboardsPathName - if hostname != "localhost": - rmtDashboardsPathName = replace_kafka_home(dashboardsPathName, kafkaHome) + if host.hostname != "localhost": + rmtDashboardsPathName = replace_kafka_home(dashboardsPathName, kafkaHome) - cmdList = ["scp", - hostname + ":" + rmtDashboardsPathName + "/*", - dashboardsPathName] - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.scp_from_remote(host, rmtDashboardsPathName, dashboardsPathName) def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv): testCaseBaseDir = testcaseEnv.testCaseBaseDir for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: + host = system_test_utils.get_remote_account(clusterEntityConfigDict) hostname = clusterEntityConfigDict["hostname"] entity_id = clusterEntityConfigDict["entity_id"] role = clusterEntityConfigDict["role"] @@ -235,15 +204,7 @@ def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv): metricsPathName = replace_kafka_home(metricsPathName, kafkaHome) dashboardsPathName = replace_kafka_home(dashboardsPathName, kafkaHome) - cmdList = ["ssh " + hostname, - "'mkdir -p", - configPathName, - metricsPathName, - dashboardsPathName + "'"] - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - + system_test_utils.remote_sys_call(host, " ".join(["mkdir -p", configPathName, metricsPathName, dashboardsPathName])) def init_entity_props(systemTestEnv, testcaseEnv): clusterConfigsList = systemTestEnv.clusterEntityConfigDictList @@ -471,25 +432,22 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv logger.debug("UNHANDLED role " + clusterCfg["role"], extra=d) # scp updated config files to remote hosts - scp_file_to_remote_host(clusterConfigsList, testcaseEnv) + scp_test_case_configs_to_remote_host(clusterConfigsList, testcaseEnv) -def scp_file_to_remote_host(clusterEntityConfigDictList, testcaseEnv): - +def scp_test_case_configs_to_remote_host(clusterEntityConfigDictList, testcaseEnv): testcaseConfigsList = testcaseEnv.testcaseConfigsList for clusterEntityConfigDict in clusterEntityConfigDictList: - hostname = clusterEntityConfigDict["hostname"] + host = system_test_utils.get_remote_account(clusterEntityConfigDict) kafkaHome = clusterEntityConfigDict["kafka_home"] localTestcasePathName = testcaseEnv.testCaseBaseDir remoteTestcasePathName = localTestcasePathName - if hostname != "localhost": + if host.hostname != "localhost": remoteTestcasePathName = replace_kafka_home(localTestcasePathName, kafkaHome) - cmdStr = "scp " + localTestcasePathName + "/config/* " + hostname + ":" + remoteTestcasePathName + "/config" - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.scp_to_remote(localTestcasePathName + "/config/*", host, remoteTestcasePathName + "/config") def start_zookeepers(systemTestEnv, testcaseEnv): @@ -506,8 +464,7 @@ def start_zookeepers(systemTestEnv, testcaseEnv): testcaseEnv.testcaseConfigsList, "entity_id", zkEntityId, "clientPort") dataDir = system_test_utils.get_data_by_lookup_keyval( testcaseEnv.testcaseConfigsList, "entity_id", zkEntityId, "dataDir") - hostname = system_test_utils.get_data_by_lookup_keyval( - clusterEntityConfigDictList, "entity_id", zkEntityId, "hostname") + host = system_test_utils.get_remote_account(clusterEntityConfigDictList, zkEntityId) minusOnePort = str(int(clientPort) - 1) plusOnePort = str(int(clientPort) + 1) @@ -517,16 +474,12 @@ def start_zookeepers(systemTestEnv, testcaseEnv): infile.close() for line in inlines: - if line.startswith("server.") and hostname + ":" + minusOnePort + ":" + plusOnePort in line: + if line.startswith("server.") and host.hostname + ":" + minusOnePort + ":" + plusOnePort in line: # server.1=host1:2187:2189 matchObj = re.match("server\.(.*?)=.*", line) zkServerId = matchObj.group(1) - cmdStr = "ssh " + hostname + " 'mkdir -p " + dataDir + "; echo " + zkServerId + " > " + dataDir + "/myid'" - logger.debug("executing command [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) - for line in subproc.stdout.readlines(): - pass # dummy loop to wait until producer is completed + system_test_utils.remote_sys_call(host, "mkdir -p " + dataDir + "; echo " + zkServerId + " > " + dataDir + "/myid", output=False) time.sleep(2) start_entity_in_background(systemTestEnv, testcaseEnv, zkEntityId) @@ -579,20 +532,17 @@ def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv, leaderAttributesDic for brokerEntityId in brokerEntityIdList: - hostname = system_test_utils.get_data_by_lookup_keyval( - clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname") + host = system_test_utils.get_remote_account( + clusterEntityConfigDictList, brokerEntityId) logFile = system_test_utils.get_data_by_lookup_keyval( testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename") logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") - cmdStrList = ["ssh " + hostname, - "\"grep -i -h '" + leaderAttributesDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] + "' ", - logPathName + "/" + logFile + " | ", - "sort | tail -1\""] - cmdStr = " ".join(cmdStrList) - - logger.debug("executing command [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) + + subproc = system_test_utils.async_sys_call(host, + " ".join(["grep -i -h '" + leaderAttributesDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] + "' ", + logPathName + "/" + logFile + " | ", + "sort | tail -1"])) for line in subproc.stdout.readlines(): line = line.rstrip('\n') @@ -612,7 +562,7 @@ def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv, leaderAttributesDic if (len(shutdownBrokerDict) > 0 and shutdownBrokerDict["timestamp"] < unixTs) or (len(shutdownBrokerDict) == 0): shutdownBrokerDict["timestamp"] = unixTs shutdownBrokerDict["brokerid"] = matchObj.group(2) - shutdownBrokerDict["hostname"] = hostname + shutdownBrokerDict["hostname"] = host.hostname shutdownBrokerDict["entity_id"] = brokerEntityId logger.debug("brokerid: [" + shutdownBrokerDict["brokerid"] + \ "] entity_id: [" + shutdownBrokerDict["entity_id"] + "]", extra=d) @@ -637,8 +587,7 @@ def get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict for brokerEntityId in brokerEntityIdList: - hostname = system_test_utils.get_data_by_lookup_keyval( \ - clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname") + host = system_test_utils.get_remote_account(clusterEntityConfigDictList, brokerEntityId) kafkaHome = system_test_utils.get_data_by_lookup_keyval( \ clusterEntityConfigDictList, "entity_id", brokerEntityId, "kafka_home") logFile = system_test_utils.get_data_by_lookup_keyval( \ @@ -646,17 +595,14 @@ def get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") - if hostname != "localhost": + if host.hostname != "localhost": logPathName = replace_kafka_home(logPathName, kafkaHome) - cmdStrList = ["ssh " + hostname, - "\"grep -i -h '" + leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] + "' ", - logPathName + "/" + logFile + " | ", - "sort | tail -1\""] - cmdStr = " ".join(cmdStrList) - - logger.debug("executing command [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) + subproc = system_test_utils.remote_async_sys_call(host, + " ".join([ + "\"grep -i -h '" + leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] + "' ", + logPathName + "/" + logFile + " | ", + "sort | tail -1\""])) for line in subproc.stdout.readlines(): line = line.rstrip('\n') @@ -679,7 +625,7 @@ def get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict leaderDict["topic"] = matchObj.group(3) leaderDict["partition"] = matchObj.group(4) leaderDict["entity_id"] = brokerEntityId - leaderDict["hostname"] = hostname + leaderDict["hostname"] = host.hostname logger.debug("brokerid: [" + leaderDict["brokerid"] + "] entity_id: [" + leaderDict["entity_id"] + "]", extra=d) except: logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d) @@ -695,7 +641,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList # cluster configurations: - hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname") + host = system_test_utils.get_remote_account(clusterEntityConfigDictList, entityId) role = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "role") kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "java_home") @@ -714,38 +660,35 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): mmProducerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "mirror_producer_config_filename") - logger.info("starting " + role + " in host [" + hostname + "] on client port [" + clientPort + "]", extra=d) + logger.info("starting " + role + " in host [" + str(host) + "] on client port [" + clientPort + "]", extra=d) configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "config") logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "default") - if hostname != "localhost": + if host.hostname != "localhost": configPathName = replace_kafka_home(configPathName, kafkaHome) logPathName = replace_kafka_home(logPathName, kafkaHome) if role == "zookeeper": - cmdList = ["ssh " + hostname, - "'JAVA_HOME=" + javaHome, + cmdList = ["JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, kafkaHome + "/bin/zookeeper-server-start.sh ", configPathName + "/" + configFile + " &> ", logPathName + "/" + logFile + " & echo pid:$! > ", - logPathName + "/entity_" + entityId + "_pid'"] + logPathName + "/entity_" + entityId + "_pid"] elif role == "broker": - cmdList = ["ssh " + hostname, - "'JAVA_HOME=" + javaHome, + cmdList = ["JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/log4j.properties" % kafkaHome, kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka", configPathName + "/" + configFile + " >> ", logPathName + "/" + logFile + " & echo pid:$! > ", - logPathName + "/entity_" + entityId + "_pid'"] + logPathName + "/entity_" + entityId + "_pid"] elif role == "mirror_maker": if useNewProducer.lower() == "true": - cmdList = ["ssh " + hostname, - "'JAVA_HOME=" + javaHome, + cmdList = ["JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, kafkaHome + "/bin/kafka-run-class.sh kafka.tools.MirrorMaker", "--consumer.config " + configPathName + "/" + mmConsumerConfigFile, @@ -753,17 +696,16 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): "--new.producer", "--whitelist=\".*\" >> ", logPathName + "/" + logFile + " & echo pid:$! > ", - logPathName + "/entity_" + entityId + "_pid'"] + logPathName + "/entity_" + entityId + "_pid"] else: - cmdList = ["ssh " + hostname, - "'JAVA_HOME=" + javaHome, + cmdList = ["JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, kafkaHome + "/bin/kafka-run-class.sh kafka.tools.MirrorMaker", "--consumer.config " + configPathName + "/" + mmConsumerConfigFile, "--producer.config " + configPathName + "/" + mmProducerConfigFile, "--whitelist=\".*\" >> ", logPathName + "/" + logFile + " & echo pid:$! > ", - logPathName + "/entity_" + entityId + "_pid'"] + logPathName + "/entity_" + entityId + "_pid"] elif role == "console_consumer": clusterToConsumeFrom = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "cluster_name") @@ -802,9 +744,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): pass props_file_path=write_consumer_properties(consumerProperties) - scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/" - logger.debug("executing command [" + scpCmdStr + "]", extra=d) - system_test_utils.sys_call(scpCmdStr) + system_test_utils.scp_to_remote(props_file_path, host, "/tmp/") if len(formatterOption) > 0: formatterOption = " --formatter " + formatterOption + " " @@ -818,8 +758,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): else: logger.error("Invalid cluster name : " + clusterName, extra=d) sys.exit(1) - cmdList = ["ssh " + hostname, - "'JAVA_HOME=" + javaHome, + cmdList = ["JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, kafkaHome + "/bin/kafka-run-class.sh kafka.tools.ConsoleConsumer", "--zookeeper " + zkConnectStr, @@ -829,18 +768,16 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): formatterOption, "--from-beginning", " >> " + logPathName + "/" + logFile + " & echo pid:$! > ", - logPathName + "/entity_" + entityId + "_pid'"] + logPathName + "/entity_" + entityId + "_pid"] cmdStr = " ".join(cmdList) - - logger.debug("executing command: [" + cmdStr + "]", extra=d) - system_test_utils.async_sys_call(cmdStr) + system_test_utils.remote_async_sys_call(host, cmdStr) logger.info("sleeping for 5 seconds.", extra=d) time.sleep(5) - pidCmdStr = "ssh " + hostname + " 'cat " + logPathName + "/entity_" + entityId + "_pid' 2> /dev/null" - logger.debug("executing command: [" + pidCmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) + + pidCmdStr = "cat " + logPathName + "/entity_" + entityId + "_pid 2> /dev/null" + subproc = system_test_utils.remote_async_sys_call(host, pidCmdStr) # keep track of the remote entity pid in a dictionary for line in subproc.stdout.readlines(): @@ -864,7 +801,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv): consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterList, "role", "console_consumer") for consumerConfig in consumerConfigList: - host = consumerConfig["hostname"] + host = system_test_utils.get_remote_account(consumerConfig) entityId = consumerConfig["entity_id"] jmxPort = consumerConfig["jmx_port"] role = consumerConfig["role"] @@ -879,7 +816,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv): consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") metricsDir = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "metrics"), - if host != "localhost": + if host.hostname != "localhost": consumerLogPath = replace_kafka_home(consumerLogPath, kafkaHome) #metricsDir = replace_kafka_home(metricsDir, kafkaHome) @@ -933,12 +870,9 @@ def start_console_consumer(systemTestEnv, testcaseEnv): consumerProperties = {} consumerProperties["consumer.timeout.ms"] = timeoutMs props_file_path=write_consumer_properties(consumerProperties) - scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/" - logger.debug("executing command [" + scpCmdStr + "]", extra=d) - system_test_utils.sys_call(scpCmdStr) + system_test_utils.scp_to_remote(props_file_path, host, "/tmp/") - cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, + cmdList = ["JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, kafkaRunClassBin + " kafka.tools.ConsoleConsumer", "--zookeeper " + zkConnectStr, @@ -949,16 +883,13 @@ def start_console_consumer(systemTestEnv, testcaseEnv): formatterOption, "--from-beginning ", " >> " + consumerLogPathName, - " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] - + " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid"] cmdStr = " ".join(cmdList) - logger.debug("executing command: [" + cmdStr + "]", extra=d) - system_test_utils.async_sys_call(cmdStr) + system_test_utils.remote_async_sys_call(host, cmdStr) - pidCmdStr = "ssh " + host + " 'cat " + consumerLogPath + "/entity_" + entityId + "_pid'" - logger.debug("executing command: [" + pidCmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) + pidCmdStr = "cat " + consumerLogPath + "/entity_" + entityId + "_pid" + subproc = system_test_utils.remote_async_sys_call(host, pidCmdStr) # keep track of the remote entity pid in a dictionary for line in subproc.stdout.readlines(): @@ -966,7 +897,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv): line = line.rstrip('\n') logger.debug("found pid line: [" + line + "]", extra=d) tokens = line.split(':') - testcaseEnv.consumerHostParentPidDict[host] = tokens[1] + testcaseEnv.consumerHostParentPidDict[host.hostname] = tokens[1] def start_producer_performance(systemTestEnv, testcaseEnv, kafka07Client): @@ -1027,8 +958,8 @@ def generate_topics_string(topicPrefix, numOfTopics): return topicsStr def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafka07Client): - host = producerConfig["hostname"] entityId = producerConfig["entity_id"] + host = system_test_utils.get_remote_account(entityConfigList, entityId) jmxPort = producerConfig["jmx_port"] role = producerConfig["role"] clusterName = producerConfig["cluster_name"] @@ -1089,7 +1020,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") metricsDir = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "metrics") - if host != "localhost": + if host.hostname != "localhost": producerLogPath = replace_kafka_home(producerLogPath, kafkaHome) metricsDir = replace_kafka_home(metricsDir, kafkaHome) @@ -1117,8 +1048,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk logger.info("#### [producer thread] status of stopBackgroundProducer : [False] => producing [" \ + str(noMsgPerBatch) + "] messages with starting message id : [" + str(initMsgId) + "]", extra=d) - cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, + cmdList = ["JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome, kafkaRunClassBin + " kafka.tools.ProducerPerformance", @@ -1137,7 +1067,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk boolArgumentsStr, " >> " + producerLogPathName, " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid", - " & wait'"] + " & wait"] if kafka07Client: cmdList[:] = [] @@ -1154,8 +1084,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk brokerInfoStr = "broker.list=" + brokerInfoStr - cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, + cmdList = ["JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome, kafkaRunClassBin + " kafka.tools.ProducerPerformance", @@ -1169,12 +1098,9 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk "--vary-message-size --async", " >> " + producerLogPathName, " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid", - " & wait'"] + " & wait"] - cmdStr = " ".join(cmdList) - logger.debug("executing command: [" + cmdStr + "]", extra=d) - - subproc = system_test_utils.sys_call_return_subproc(cmdStr) + subproc = system_test_utils.remote_async_sys_call(host, " ".join(cmdList)) logger.debug("waiting for producer to finish", extra=d) subproc.communicate() logger.debug("producer finished", extra=d) @@ -1213,15 +1139,15 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk def stop_remote_entity(systemTestEnv, entityId, parentPid, signalType="SIGTERM"): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname") - pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid) + host = system_test_utils.get_remote_account(clusterEntityConfigDictList, entityId) + pidStack = system_test_utils.get_remote_child_processes(host, parentPid) - logger.info("terminating (" + signalType + ") process id: " + parentPid + " in host: " + hostname, extra=d) + logger.info("terminating (" + signalType + ") process id: " + parentPid + " in host: " + str(host), extra=d) if signalType.lower() == "sigterm": - system_test_utils.sigterm_remote_process(hostname, pidStack) + system_test_utils.sigterm_remote_process(host, pidStack) elif signalType.lower() == "sigkill": - system_test_utils.sigkill_remote_process(hostname, pidStack) + system_test_utils.sigkill_remote_process(host, pidStack) else: logger.error("Invalid signal type: " + signalType, extra=d) raise Exception("Invalid signal type: " + signalType) @@ -1230,11 +1156,11 @@ def stop_remote_entity(systemTestEnv, entityId, parentPid, signalType="SIGTERM") def force_stop_remote_entity(systemTestEnv, entityId, parentPid): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname") - pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid) + host = system_test_utils.get_remote_account(clusterEntityConfigDictList, entityId) + pidStack = system_test_utils.get_remote_child_processes(host, parentPid) - logger.debug("terminating process id: " + parentPid + " in host: " + hostname, extra=d) - system_test_utils.sigkill_remote_process(hostname, pidStack) + logger.debug("terminating process id: " + parentPid + " in host: " + str(host), extra=d) + system_test_utils.sigkill_remote_process(host, pidStack) def create_topic_for_producer_performance(systemTestEnv, testcaseEnv): @@ -1245,7 +1171,7 @@ def create_topic_for_producer_performance(systemTestEnv, testcaseEnv): for prodPerfCfg in prodPerfCfgList: topicsStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfg["entity_id"], "topic") zkEntityId = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "entity_id") - zkHost = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname") + zkHost = system_test_utils.get_remote_account(clusterEntityConfigDictList, zkEntityId) kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home") createTopicBin = kafkaHome + "/bin/kafka-topics.sh --create" @@ -1265,23 +1191,20 @@ def create_topic_for_producer_performance(systemTestEnv, testcaseEnv): testcaseBaseDir = testcaseEnv.testCaseBaseDir - if zkHost != "localhost": + if zkHost.hostname != "localhost": testcaseBaseDir = replace_kafka_home(testcaseBaseDir, kafkaHome) for topic in topicsList: logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d) - cmdList = ["ssh " + zkHost, - "'JAVA_HOME=" + javaHome, + cmdList = ["JAVA_HOME=" + javaHome, createTopicBin, " --topic " + topic, " --zookeeper " + zkConnectStr, " --replication-factor " + testcaseEnv.testcaseArgumentsDict["replica_factor"], " --partitions " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ", - testcaseBaseDir + "/logs/create_source_cluster_topic.log'"] + testcaseBaseDir + "/logs/create_source_cluster_topic.log"] - cmdStr = " ".join(cmdList) - logger.debug("executing command: [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) + subproc = system_test_utils.remote_async_sys_call(zkHost, " ".join(cmdList)) def create_topic(systemTestEnv, testcaseEnv, topic, replication_factor, num_partitions): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList @@ -1290,7 +1213,7 @@ def create_topic(systemTestEnv, testcaseEnv, topic, replication_factor, num_part javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home") createTopicBin = kafkaHome + "/bin/kafka-topics.sh --create" zkConnectStr = "" - zkHost = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname") + zkHost = system_test_utils.get_remote_account(clusterEntityConfigDictList, zkEntityId) if len(testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]) > 0: zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] elif len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) > 0: @@ -1303,18 +1226,16 @@ def create_topic(systemTestEnv, testcaseEnv, topic, replication_factor, num_part testcaseBaseDir = replace_kafka_home(testcaseBaseDir, kafkaHome) logger.debug("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d) - cmdList = ["ssh " + zkHost, - "'JAVA_HOME=" + javaHome, + cmdList = ["JAVA_HOME=" + javaHome, createTopicBin, " --topic " + topic, " --zookeeper " + zkConnectStr, " --replication-factor " + str(replication_factor), " --partitions " + str(num_partitions) + " >> ", - testcaseBaseDir + "/logs/create_source_cluster_topic.log'"] + testcaseBaseDir + "/logs/create_source_cluster_topic.log"] cmdStr = " ".join(cmdList) - logger.info("executing command: [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) + subproc = system_test_utils.remote_async_sys_call(zkHost, " ".join(cmdList)) @@ -1481,29 +1402,29 @@ def cleanup_data_at_remote_hosts(systemTestEnv, testcaseEnv): for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: - hostname = clusterEntityConfigDict["hostname"] + host = system_test_utils.get_remote_account(clusterEntityConfigDict) entityId = clusterEntityConfigDict["entity_id"] role = clusterEntityConfigDict["role"] kafkaHome = clusterEntityConfigDict["kafka_home"] cmdStr = "" dataDir = "" - if hostname == "localhost": + if host.hostname == "localhost": remoteTestCaseBaseDir = testCaseBaseDir else: remoteTestCaseBaseDir = replace_kafka_home(testCaseBaseDir, kafkaHome) - logger.info("cleaning up data dir on host: [" + hostname + "]", extra=d) + logger.info("cleaning up data dir on host: [" + str(host) + "]", extra=d) if role == 'zookeeper': dataDir = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "dataDir") elif role == 'broker': dataDir = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log.dir") else: - logger.info("skipping role [" + role + "] on host : [" + hostname + "]", extra=d) + logger.info("skipping role [" + role + "] on host : [" + str(host) + "]", extra=d) continue - cmdStr = "ssh " + hostname + " 'rm -rf " + dataDir + "'" + cmdStr = "rm -rf " + dataDir if not dataDir.startswith("/tmp"): logger.warn("possible destructive command [" + cmdStr + "]", extra=d) @@ -1514,33 +1435,18 @@ def cleanup_data_at_remote_hosts(systemTestEnv, testcaseEnv): # ============================ # cleaning data dir # ============================ - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.remote_sys_call(host, cmdStr) # ============================ # cleaning log/metrics/svg, ... # ============================ - if system_test_utils.remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"): + if system_test_utils.remote_host_file_exists(host, kafkaHome + "/bin/kafka-run-class.sh"): # so kafkaHome is a real kafka installation - cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.log' | xargs rm 2> /dev/null\"" - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*_pid' | xargs rm 2> /dev/null\"" - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.csv' | xargs rm 2> /dev/null\"" - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.svg' | xargs rm 2> /dev/null\"" - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.html' | xargs rm 2> /dev/null\"" - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.remote_sys_call(host, "find " + remoteTestCaseBaseDir + " -name '*.log' | xargs rm 2> /dev/null") + system_test_utils.remote_sys_call(host, "find " + remoteTestCaseBaseDir + " -name '*_pid' | xargs rm 2> /dev/null") + system_test_utils.remote_sys_call(host, "find " + remoteTestCaseBaseDir + " -name '*.csv' | xargs rm 2> /dev/null") + system_test_utils.remote_sys_call(host, "find " + remoteTestCaseBaseDir + " -name '*.svg' | xargs rm 2> /dev/null") + system_test_utils.remote_sys_call(host, "find " + remoteTestCaseBaseDir + " -name '*.html' | xargs rm 2> /dev/null") def replace_kafka_home(systemTestSubDirPath, kafkaHome): matchObj = re.match(".*(\/system_test\/.*)$", systemTestSubDirPath) @@ -1558,19 +1464,14 @@ def stop_consumer(): def ps_grep_terminate_running_entity(systemTestEnv): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - username = getpass.getuser() + default_username = getpass.getuser() for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: - hostname = clusterEntityConfigDict["hostname"] - cmdList = ["ssh " + hostname, - "\"ps auxw | grep -v grep | grep -v Bootstrap | grep -v vim | grep ^" + username, + host = system_test_utils.get_remote_account(clusterEntityConfigDict) + cmdList = ["ps auxw | grep -v grep | grep -v Bootstrap | grep -v vim | grep ^" + (host.user or default_username), "| grep -i 'java\|server\-start\|run\-\|producer\|consumer\|jmxtool' | grep kafka", - "| tr -s ' ' | cut -f2 -d ' ' | xargs kill -9" + "\""] - - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - - system_test_utils.sys_call(cmdStr) + "| tr -s ' ' | cut -f2 -d ' ' | xargs kill -9"] + system_test_utils.remote_sys_call(host, " ".join(cmdList)) def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict, leaderAttributesDict): leaderEntityId = None @@ -1654,7 +1555,7 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv): while 1: logger.debug("calling testcaseEnv.lock.acquire()", extra=d) testcaseEnv.lock.acquire() - logger.info("status of backgroundProducerStopped : [" + \ + logger.debug("status of backgroundProducerStopped : [" + \ str(testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=d) if testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]: logger.debug("calling testcaseEnv.lock.release()", extra=d) @@ -1697,7 +1598,7 @@ def start_migration_tool(systemTestEnv, testcaseEnv, onlyThisEntityId=None): if onlyThisEntityId is None or entityId == onlyThisEntityId: - host = migrationToolConfig["hostname"] + host = system_test_utils.get_remote_account(clusterConfigList, entityId) jmxPort = migrationToolConfig["jmx_port"] role = migrationToolConfig["role"] kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "kafka_home") @@ -1720,8 +1621,7 @@ def start_migration_tool(systemTestEnv, testcaseEnv, onlyThisEntityId=None): whiteList = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "whitelist") logFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename") - cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, + cmdList = ["JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, kafkaRunClassBin + " kafka.tools.KafkaMigrationTool", "--whitelist=" + whiteList, @@ -1732,16 +1632,12 @@ def start_migration_tool(systemTestEnv, testcaseEnv, onlyThisEntityId=None): "--zkclient.01.jar=" + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + zkClientJar, "--kafka.07.jar=" + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + kafka07Jar, " &> " + migrationToolLogPath + "/migrationTool.log", - " & echo pid:$! > " + migrationToolLogPath + "/entity_" + entityId + "_pid'"] - - cmdStr = " ".join(cmdList) - logger.debug("executing command: [" + cmdStr + "]", extra=d) - system_test_utils.async_sys_call(cmdStr) + " & echo pid:$! > " + migrationToolLogPath + "/entity_" + entityId + "_pid"] + system_test_utils.remote_async_sys_call(host, " ".join(cmdList)) time.sleep(5) - pidCmdStr = "ssh " + host + " 'cat " + migrationToolLogPath + "/entity_" + entityId + "_pid' 2> /dev/null" - logger.debug("executing command: [" + pidCmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) + pidCmdStr = "cat " + migrationToolLogPath + "/entity_" + entityId + "_pid 2> /dev/null" + subproc = system_test_utils.remote_async_sys_call(host, pidCmdStr) # keep track of the remote entity pid in a dictionary for line in subproc.stdout.readlines(): @@ -1954,8 +1850,8 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None clusterList = systemTestEnv.clusterEntityConfigDictList consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterList, "role", "console_consumer") for consumerConfig in consumerConfigList: - host = consumerConfig["hostname"] entityId = consumerConfig["entity_id"] + host = system_test_utils.get_remote_account(consumerConfig) jmxPort = consumerConfig["jmx_port"] clusterName = consumerConfig["cluster_name"] kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home") @@ -1963,7 +1859,7 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") - if host != "localhost": + if host.hostname != "localhost": consumerLogPath = replace_kafka_home(consumerLogPath, kafkaHome) # testcase configurations: @@ -2010,8 +1906,7 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None outputFilePathName = consumerLogPath + "/simple_consumer_" + topic + "-" + str(partitionId) + "_r" + str(replicaIndex) + ".log" brokerPortLabel = brokerPort.replace(":", "_") - cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, + cmdList = ["JAVA_HOME=" + javaHome, kafkaRunClassBin + " kafka.tools.SimpleConsumerShell", "--broker-list " + brokerListStr, "--topic " + topic, @@ -2020,16 +1915,9 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None "--offset " + str(startingOffset), "--no-wait-at-logend ", " > " + outputFilePathName, - " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] - - cmdStr = " ".join(cmdList) + " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid"] - logger.debug("executing command: [" + cmdStr + "]", extra=d) - subproc_1 = system_test_utils.sys_call_return_subproc(cmdStr) - # dummy for-loop to wait until the process is completed - for line in subproc_1.stdout.readlines(): - pass - time.sleep(1) + subproc_1 = system_test_utils.remote_sys_call(host, " ".join(cmdList), output=False) partitionId += 1 replicaIndex += 1 @@ -2046,22 +1934,18 @@ def get_controller_attributes(systemTestEnv, testcaseEnv): zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") firstZkDict = zkDictList[0] - hostname = firstZkDict["hostname"] zkEntityId = firstZkDict["entity_id"] + host = system_test_utils.get_remote_account(tcConfigsList, zkEntityId) clientPort = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", zkEntityId, "clientPort") kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home") kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" - cmdStrList = ["ssh " + hostname, - "\"JAVA_HOME=" + javaHome, + cmdStrList = ["JAVA_HOME=" + javaHome, kafkaRunClassBin + " kafka.tools.ZooKeeperMainWrapper ", "-server " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"], - "get /controller 2> /dev/null | tail -1\""] - - cmdStr = " ".join(cmdStrList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) + "get /controller 2> /dev/null | tail -1"] + subproc = system_test_utils.remote_async_sys_call(host, " ".join(cmdStrList)) for line in subproc.stdout.readlines(): if "brokerid" in line: json_str = line.rstrip('\n') @@ -2353,7 +2237,7 @@ def validate_index_log(systemTestEnv, testcaseEnv, clusterName="source"): logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") localLogSegmentPath = logPathName + "/" + remoteLogSegmentDir kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", brokerEntityId, "kafka_home") - hostname = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", brokerEntityId, "hostname") + host = system_test_utils.get_remote_account(clusterConfigList, brokerEntityId) kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" # localLogSegmentPath : @@ -2382,16 +2266,13 @@ def validate_index_log(systemTestEnv, testcaseEnv, clusterName="source"): # only process index file: *.index if logFile.endswith(".index"): offsetLogSegmentPathName = localLogSegmentPath + "/" + topicPartition + "/" + logFile - cmdStrList = ["ssh " + hostname, - kafkaRunClassBin + " kafka.tools.DumpLogSegments", + cmdStrList = [kafkaRunClassBin + " kafka.tools.DumpLogSegments", " --file " + offsetLogSegmentPathName, "--verify-index-only 2>&1"] - cmdStr = " ".join(cmdStrList) showMismatchedIndexOffset = False - logger.debug("executing command [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) + subproc = system_test_utils.remote_async_sys_call(host, " ".join(cmdStrList)) for line in subproc.stdout.readlines(): line = line.rstrip('\n') if showMismatchedIndexOffset: @@ -2413,8 +2294,8 @@ def get_leader_for(systemTestEnv, testcaseEnv, topic, partition): zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") firstZkDict = zkDictList[0] - hostname = firstZkDict["hostname"] zkEntityId = firstZkDict["entity_id"] + host = system_test_utils.get_remote_account(clusterConfigsList, zkEntityId) clientPort = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", zkEntityId, "clientPort") kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home") @@ -2424,14 +2305,11 @@ def get_leader_for(systemTestEnv, testcaseEnv, topic, partition): brokerid = '' leaderEntityId = '' - cmdStrList = ["ssh " + hostname, - "\"JAVA_HOME=" + javaHome, + cmdStrList = ["JAVA_HOME=" + javaHome, kafkaRunClassBin + " kafka.tools.ZooKeeperMainWrapper ", "-server " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"], - zkQueryStr + " 2> /dev/null | tail -1\""] - cmdStr = " ".join(cmdStrList) - logger.info("executing command [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) + zkQueryStr + " 2> /dev/null | tail -1"] + subproc = system_test_utils.remote_async_sys_call(host, " ".join(cmdStrList)) for line in subproc.stdout.readlines(): if "\"leader\"" in line: line = line.rstrip('\n') @@ -2455,8 +2333,8 @@ def get_leader_attributes(systemTestEnv, testcaseEnv): zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") firstZkDict = zkDictList[0] - hostname = firstZkDict["hostname"] zkEntityId = firstZkDict["entity_id"] + host = system_test_utils.get_remote_account(clusterConfigsList, zkEntityId) clientPort = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", zkEntityId, "clientPort") kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home") @@ -2468,15 +2346,12 @@ def get_leader_attributes(systemTestEnv, testcaseEnv): zkQueryStr = "get /brokers/topics/" + topics[0] + "/partitions/0/state" brokerid = '' - cmdStrList = ["ssh " + hostname, - "\"JAVA_HOME=" + javaHome, + cmdStrList = ["JAVA_HOME=" + javaHome, kafkaRunClassBin + " kafka.tools.ZooKeeperMainWrapper ", "-server " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"], - zkQueryStr + " 2> /dev/null | tail -1\""] - cmdStr = " ".join(cmdStrList) - logger.info("executing command [" + cmdStr + "]", extra=d) + zkQueryStr + " 2> /dev/null | tail -1"] - subproc = system_test_utils.sys_call_return_subproc(cmdStr) + subproc = system_test_utils.remote_async_sys_call(host, " ".join(cmdStrList)) for line in subproc.stdout.readlines(): if "\"leader\"" in line: line = line.rstrip('\n') @@ -2490,6 +2365,8 @@ def get_leader_attributes(systemTestEnv, testcaseEnv): leaderDict["partition"] = '0' leaderDict["entity_id"] = system_test_utils.get_data_by_lookup_keyval( tcConfigsList, "broker.id", brokerid, "entity_id") + leaderDict["remote"] = system_test_utils.get_remote_account(clusterConfigsList, leaderDict["entity_id"]) + # FIXME this is now redundant, it should be removed leaderDict["hostname"] = system_test_utils.get_data_by_lookup_keyval( clusterConfigsList, "entity_id", leaderDict["entity_id"], "hostname") break diff --git a/system_test/utils/metrics.py b/system_test/utils/metrics.py index 3e66348..cc0319f 100644 --- a/system_test/utils/metrics.py +++ b/system_test/utils/metrics.py @@ -248,28 +248,25 @@ def start_metrics_collection(jmxHost, jmxPort, role, entityId, systemTestEnv, te entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics") dashboardsForRole = get_dashboard_definition(metricsDefinitionFile, role) mbeansForRole = get_mbeans_for_role(dashboardsForRole) - + kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "java_home") for mbean in mbeansForRole: outputCsvFile = entityMetricsDir + "/" + mbean + ".csv" - startMetricsCmdList = ["ssh " + jmxHost, - "'JAVA_HOME=" + javaHome, + startMetricsCmdList = ["JAVA_HOME=" + javaHome, "JMX_PORT= " + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool", "--jmx-url " + jmxUrl, "--object-name " + mbean + " 1> ", outputCsvFile + " & echo pid:$! > ", - entityMetricsDir + "/entity_pid'"] + entityMetricsDir + "/entity_pid"] startMetricsCommand = " ".join(startMetricsCmdList) - logger.debug("executing command: [" + startMetricsCommand + "]", extra=d) - system_test_utils.async_sys_call(startMetricsCommand) + system_test_utils.remote_async_sys_call(system_test_utils.RemoteAccount(jmxHost), startMetricsCommand) time.sleep(1) - pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid' 2> /dev/null" - logger.debug("executing command: [" + pidCmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) + pidCmdStr = "cat " + entityMetricsDir + "/entity_pid 2> /dev/null" + subproc = system_test_utils.remote_async_sys_call(system_test_utils.RemoteAccount(jmxHost), pidCmdStr) # keep track of JMX ppid in a dictionary of entity_id to list of JMX ppid # testcaseEnv.entityJmxParentPidDict: diff --git a/system_test/utils/system_test_utils.py b/system_test/utils/system_test_utils.py index e8529cd..554fcbd 100644 --- a/system_test/utils/system_test_utils.py +++ b/system_test/utils/system_test_utils.py @@ -39,6 +39,90 @@ thisClassName = '(system_test_utils)' d = {'name_of_class': thisClassName} +class RemoteAccount(object): + def __init__(self, hostname, user=None, ssh_args=None): + self.hostname = hostname + self.user = user + self.ssh_args = ssh_args + + + def ssh_command(self, cmd): + r = "ssh " + if self.user: + r += self.user + "@" + r += self.hostname + " " + if self.ssh_args: + r += self.ssh_args + " " + r += "'" + cmd.replace("'", "'\\''") + "'" + return r + + + def scp_from_command(self, src, dest, recursive=False): + r = "scp " + if self.ssh_args: + r += self.ssh_args + " " + if recursive: + r += "-r " + if self.user: + r += self.user + "@" + r += self.hostname + ":" + src + " " + dest + return r + + + def scp_to_command(self, src, dest, recursive=False): + r = "scp " + if self.ssh_args: + r += self.ssh_args + " " + if recursive: + r += "-r " + r += src + " " + if self.user: + r += self.user + "@" + r += self.hostname + ":" + dest + return r + + + def rsync_to_command(self, flags, src_dir, dest_dir): + r = "rsync " + if self.ssh_args: + r += "-e \"ssh " + self.ssh_args + "\" " + if flags: + r += flags + r += src_dir + if self.user: + r += self.user + "@" + r += self.hostname + ":" + dest_dir + return r + + + def __str__(self): + r = "" + if self.user: + r += self.user + "@" + r += self.hostname + return r + +def get_remote_account(clusterEntityConfig, entityId=None): + ''' + Gets a RemoteAccount object, either from a single entity config or by search a list of entity configs for the + specified entity ID + ''' + + if type(clusterEntityConfig) == dict: + return RemoteAccount( + clusterEntityConfig["hostname"], + clusterEntityConfig.get("user"), + clusterEntityConfig.get("ssh_args") + ) + + assert(type(clusterEntityConfig) == list) + assert(entityId != None) + hostname = get_data_by_lookup_keyval(clusterEntityConfig, "entity_id", entityId, "hostname") + user = get_data_by_lookup_keyval(clusterEntityConfig, "entity_id", entityId, "user") + ssh_args = get_data_by_lookup_keyval(clusterEntityConfig, "entity_id", entityId, "ssh_args") + return RemoteAccount(hostname, user, ssh_args) + + def get_current_unix_timestamp(): ts = time.time() return "{0:.6f}".format(ts) @@ -48,25 +132,29 @@ def get_local_hostname(): return socket.gethostname() -def sys_call(cmdStr): - output = "" +def sys_call(cmdStr, output=True): + result = "" #logger.info("executing command [" + cmdStr + "]", extra=d) p = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) for line in p.stdout.readlines(): - output += line - return output + if output: result += line + return result + + +def async_sys_call(cmd_str): + return subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) def remote_async_sys_call(host, cmd): - cmdStr = "ssh " + host + " \"" + cmd + "\"" - logger.info("executing command [" + cmdStr + "]", extra=d) - async_sys_call(cmdStr) + remote_cmd = host.ssh_command(cmd) + logger.debug("executing command [" + remote_cmd + "]", extra=d) + return async_sys_call(remote_cmd) -def remote_sys_call(host, cmd): - cmdStr = "ssh " + host + " \"" + cmd + "\"" - logger.info("executing command [" + cmdStr + "]", extra=d) - sys_call(cmdStr) +def remote_sys_call(host, cmd, output=True): + remote_cmd = host.ssh_command(cmd) + logger.debug("executing command [" + remote_cmd + "]", extra=d) + sys_call(remote_cmd, output=output) def get_dir_paths_with_prefix(fullPath, dirNamePrefix): @@ -177,10 +265,11 @@ def get_json_dict_data(infile): return data_dict -def get_remote_child_processes(hostname, pid): +def get_remote_child_processes(host, pid): pidStack = [] - cmdList = ['''ssh ''' + hostname, + subproc = remote_async_sys_call(host, + " ".join([ ''''pid=''' + pid + '''; prev_pid=""; echo $pid;''', '''while [[ "x$pid" != "x" ]];''', '''do prev_pid=$pid;''', @@ -190,12 +279,8 @@ def get_remote_child_processes(hostname, pid): ''' if [ $prev_pid == $pid ]; then''', ''' break;''', ''' fi;''', - '''done' 2> /dev/null'''] - - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr, extra=d) + '''done' 2> /dev/null'''])) - subproc = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE) for line in subproc.stdout.readlines(): procId = line.rstrip('\n') pidStack.append(procId) @@ -220,44 +305,36 @@ def get_child_processes(pid): break return pidStack -def sigterm_remote_process(hostname, pidStack): +def sigterm_remote_process(host, pidStack): while ( len(pidStack) > 0 ): pid = pidStack.pop() - cmdStr = "ssh " + hostname + " 'kill -15 " + pid + "'" - try: - logger.debug("executing command [" + cmdStr + "]", extra=d) - sys_call_return_subproc(cmdStr) + remote_async_sys_call(host, "kill -15 " + pid) except: print "WARN - pid:",pid,"not found" raise -def sigkill_remote_process(hostname, pidStack): +def sigkill_remote_process(host, pidStack): while ( len(pidStack) > 0 ): pid = pidStack.pop() - cmdStr = "ssh " + hostname + " 'kill -9 " + pid + "'" - try: - logger.debug("executing command [" + cmdStr + "]", extra=d) - sys_call_return_subproc(cmdStr) + remote_async_sys_call(host, "kill -9 " + pid) except: print "WARN - pid:",pid,"not found" raise -def simulate_garbage_collection_pause_in_remote_process(hostname, pidStack, pauseTimeInSeconds): +def simulate_garbage_collection_pause_in_remote_process(host, pidStack, pauseTimeInSeconds): pausedPidStack = [] # pause the processes while len(pidStack) > 0: pid = pidStack.pop() pausedPidStack.append(pid) - cmdStr = "ssh " + hostname + " 'kill -SIGSTOP " + pid + "'" try: - logger.debug("executing command [" + cmdStr + "]", extra=d) - sys_call_return_subproc(cmdStr) + remote_async_sys_call(host, "kill -SIGSTOP " + pid) except: print "WARN - pid:",pid,"not found" raise @@ -267,11 +344,9 @@ def simulate_garbage_collection_pause_in_remote_process(hostname, pidStack, paus # resume execution of the processes while len(pausedPidStack) > 0: pid = pausedPidStack.pop() - cmdStr = "ssh " + hostname + " 'kill -SIGCONT " + pid + "'" try: - logger.debug("executing command [" + cmdStr + "]", extra=d) - sys_call_return_subproc(cmdStr) + remote_async_sys_call(host, "kill -SIGCONT " + pid) except: print "WARN - pid:",pid,"not found" raise @@ -301,20 +376,8 @@ def convert_keyval_to_cmd_args(configFilePathname): print "ERROR: unexpected arguments list", line return cmdArg - -def async_sys_call(cmd_str): - subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - - -def sys_call_return_subproc(cmd_str): - p = subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - return p - - -def remote_host_file_exists(hostname, pathname): - cmdStr = "ssh " + hostname + " 'ls " + pathname + "'" - logger.debug("executing command: [" + cmdStr + "]", extra=d) - subproc = sys_call_return_subproc(cmdStr) +def remote_host_file_exists(host, pathname): + subproc = remote_async_sys_call(host, "ls " + pathname) for line in subproc.stdout.readlines(): if "No such file or directory" in line: @@ -322,10 +385,8 @@ def remote_host_file_exists(hostname, pathname): return True -def remote_host_directory_exists(hostname, path): - cmdStr = "ssh " + hostname + " 'ls -d " + path + "'" - logger.debug("executing command: [" + cmdStr + "]", extra=d) - subproc = sys_call_return_subproc(cmdStr) +def remote_host_directory_exists(host, path): + subproc = remote_async_sys_call(host, "ls -d " + path) for line in subproc.stdout.readlines(): if "No such file or directory" in line: @@ -333,12 +394,9 @@ def remote_host_directory_exists(hostname, path): return True -def remote_host_processes_stopped(hostname): - cmdStr = "ssh " + hostname + \ - " \"ps auxw | grep -v grep | grep -v Bootstrap | grep -i 'java\|run\-\|producer\|consumer\|jmxtool\|kafka' | wc -l\" 2> /dev/null" - - logger.info("executing command: [" + cmdStr + "]", extra=d) - subproc = sys_call_return_subproc(cmdStr) +def remote_host_processes_stopped(host): + subproc = remote_async_sys_call(host, + "bash -c \"ps auxw | grep -v grep | grep -v Bootstrap | grep -i 'java\|run\-\|producer\|consumer\|jmxtool\|kafka' | wc -l\" 2> /dev/null") for line in subproc.stdout.readlines(): line = line.rstrip('\n') @@ -367,7 +425,7 @@ def setup_remote_hosts(systemTestEnv): if localJavaHome is not None: localJavaBin = localJavaHome + '/bin/java' else: - subproc = sys_call_return_subproc("which java") + subproc = async_sys_call("which java") for line in subproc.stdout.readlines(): if line.startswith("which: no "): logger.error("No Java binary found in local host", extra=d) @@ -382,60 +440,64 @@ def setup_remote_hosts(systemTestEnv): for clusterEntityConfigDict in clusterEntityConfigDictList: listIndex += 1 - hostname = clusterEntityConfigDict["hostname"] + host = get_remote_account(clusterEntityConfigDict) kafkaHome = clusterEntityConfigDict["kafka_home"] javaHome = clusterEntityConfigDict["java_home"] - if hostname == "localhost" and javaHome == "default": + if host.hostname == "localhost" and javaHome == "default": clusterEntityConfigDictList[listIndex]["java_home"] = localJavaHome - if hostname == "localhost" and kafkaHome == "default": + if host.hostname == "localhost" and kafkaHome == "default": clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome - if hostname == "localhost" and kafkaHome == "system_test/migration_tool_testsuite/0.7": + if host.hostname == "localhost" and kafkaHome == "system_test/migration_tool_testsuite/0.7": clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome + "/system_test/migration_tool_testsuite/0.7" kafkaHome = clusterEntityConfigDict["kafka_home"] javaHome = clusterEntityConfigDict["java_home"] - logger.debug("checking java binary [" + localJavaBin + "] in host [" + hostname + "]", extra=d) - if not remote_host_directory_exists(hostname, javaHome): - logger.error("Directory not found: [" + javaHome + "] in host [" + hostname + "]", extra=d) + logger.debug("checking java binary [" + localJavaBin + "] in host [" + str(host) + "]", extra=d) + if not remote_host_directory_exists(host, javaHome): + logger.error("Directory not found: [" + javaHome + "] in host [" + str(host) + "]", extra=d) return False - logger.debug("checking directory [" + kafkaHome + "] in host [" + hostname + "]", extra=d) - if not remote_host_directory_exists(hostname, kafkaHome): - logger.info("Directory not found: [" + kafkaHome + "] in host [" + hostname + "]", extra=d) - if hostname == "localhost": + logger.debug("checking directory [" + kafkaHome + "] in host [" + str(host) + "]", extra=d) + if not remote_host_directory_exists(host, kafkaHome): + logger.info("Directory not found: [" + kafkaHome + "] in host [" + str(host) + "]", extra=d) + if host.hostname == "localhost": return False else: localKafkaSourcePath = systemTestEnv.SYSTEM_TEST_BASE_DIR + "/.." - logger.debug("copying local copy of [" + localKafkaSourcePath + "] to " + hostname + ":" + kafkaHome, extra=d) - copy_source_to_remote_hosts(hostname, localKafkaSourcePath, kafkaHome) + logger.debug("copying local copy of [" + localKafkaSourcePath + "] to " + str(host) + ":" + kafkaHome, extra=d) + copy_source_to_remote_hosts(host, localKafkaSourcePath, kafkaHome) return True -def copy_source_to_remote_hosts(hostname, sourceDir, destDir): - - cmdStr = "rsync -avz --delete-before " + sourceDir + "/ " + hostname + ":" + destDir +def copy_source_to_remote_hosts(host, sourceDir, destDir): + cmdStr = host.rsync_to_command("-avz --delete-before", sourceDir + "/", destDir) logger.info("executing command [" + cmdStr + "]", extra=d) - subproc = sys_call_return_subproc(cmdStr) + subproc = async_sys_call(cmdStr) for line in subproc.stdout.readlines(): - dummyVar = 1 + pass +def scp_from_remote(host, remote_path, local_path, recursive=False): + cmd = host.scp_from_command(remote_path, local_path, recursive=recursive) + logger.debug("executing command [" + cmd + "]", extra=d) + sys_call(cmd) -def remove_kafka_home_dir_at_remote_hosts(hostname, kafkaHome): - if remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"): - cmdStr = "ssh " + hostname + " 'chmod -R 777 " + kafkaHome + "'" - logger.info("executing command [" + cmdStr + "]", extra=d) - sys_call(cmdStr) +def scp_to_remote(local_path, host, remote_path, recursive=False): + cmd = host.scp_to_command(local_path, remote_path, recursive=recursive) + logger.debug("executing command [" + cmd + "]", extra=d) + sys_call(cmd) - cmdStr = "ssh " + hostname + " 'rm -rf " + kafkaHome + "'" - logger.info("executing command [" + cmdStr + "]", extra=d) - #sys_call(cmdStr) + +def remove_kafka_home_dir_at_remote_hosts(host, kafkaHome): + if remote_host_file_exists(host, kafkaHome + "/bin/kafka-run-class.sh"): + remote_sys_call(host, "chmod -R 777 " + kafkaHome) + #remote_sys_call(host, "rm -rf " + kafkaHome) else: - logger.warn("possible destructive command [" + cmdStr + "]", extra=d) + logger.warn("possible destructive command [" + "rm -rf " + kafkaHome + "]", extra=d) logger.warn("check config file: system_test/cluster_config.properties", extra=d) logger.warn("aborting test...", extra=d) sys.exit(1) @@ -635,4 +697,3 @@ def diff_lists(a, b): logger.debug("#### only in seq 2 : " + item, extra=d) return mismatchCount - -- 2.1.2 From b4f0a50afaaa1f64dca5ab299784a7ec4a975cf1 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 28 Oct 2014 10:55:09 -0700 Subject: [PATCH 2/2] KAFKA-1748 Make cluster resource declaration separate from test role specification and support loading in three ways: a zero-config default on localhost, a JSON file, and loading from Vagrant's ssh-config command. --- system_test/cluster.json | 12 ++ .../mirror_maker_testsuite/mirror_maker_test.py | 9 +- .../offset_management_test.py | 9 +- .../replication_testsuite/replica_basic_test.py | 8 +- system_test/system_test_env.py | 6 +- system_test/system_test_runner.py | 20 +- system_test/utils/cluster.py | 235 +++++++++++++++++++++ system_test/utils/kafka_system_test_utils.py | 22 +- system_test/utils/metrics.py | 5 +- system_test/utils/setup_utils.py | 12 +- system_test/utils/system_test_utils.py | 158 +++++--------- 11 files changed, 366 insertions(+), 130 deletions(-) create mode 100644 system_test/cluster.json create mode 100644 system_test/utils/cluster.py diff --git a/system_test/cluster.json b/system_test/cluster.json new file mode 100644 index 0000000..c1c9f72 --- /dev/null +++ b/system_test/cluster.json @@ -0,0 +1,12 @@ +{ + "reuse_nodes": true, + "nodes": [ + { + "hostname": "localhost", + "user": null, + "ssh_args": null, + "java_home": null, + "kafka_home": null + } + ] +} \ No newline at end of file diff --git a/system_test/mirror_maker_testsuite/mirror_maker_test.py b/system_test/mirror_maker_testsuite/mirror_maker_test.py index c4a2241..8c05167 100644 --- a/system_test/mirror_maker_testsuite/mirror_maker_test.py +++ b/system_test/mirror_maker_testsuite/mirror_maker_test.py @@ -109,7 +109,12 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils): continue else: self.testcaseEnv.printTestCaseDescription(testcaseDirName) - system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName) + system_test_utils.configure_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName) + + self.log_message("allocating cluster resources...") + system_test_utils.allocate_cluster_nodes(self.systemTestEnv) + + self.setup_remote_hosts(self.systemTestEnv) # ============================================================================== # # ============================================================================== # @@ -321,3 +326,5 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils): self.log_message("stopping all entities - please wait ...") kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv) + self.log_message("freeing cluster resources...") + system_test_utils.free_cluster_nodes(self.systemTestEnv) diff --git a/system_test/offset_management_testsuite/offset_management_test.py b/system_test/offset_management_testsuite/offset_management_test.py index 12b5cd2..2b7f846 100644 --- a/system_test/offset_management_testsuite/offset_management_test.py +++ b/system_test/offset_management_testsuite/offset_management_test.py @@ -106,7 +106,12 @@ class OffsetManagementTest(ReplicationUtils, SetupUtils): continue else: self.testcaseEnv.printTestCaseDescription(testcaseDirName) - system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName) + system_test_utils.configure_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName) + + self.log_message("allocating cluster resources...") + system_test_utils.allocate_cluster_nodes(self.systemTestEnv) + + self.setup_remote_hosts(self.systemTestEnv) # ============================================================================== # # ============================================================================== # @@ -296,3 +301,5 @@ class OffsetManagementTest(ReplicationUtils, SetupUtils): self.log_message("stopping all entities - please wait ...") kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv) + self.log_message("freeing cluster resources...") + system_test_utils.free_cluster_nodes(self.systemTestEnv) diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py index 5503055..8616249 100644 --- a/system_test/replication_testsuite/replica_basic_test.py +++ b/system_test/replication_testsuite/replica_basic_test.py @@ -110,8 +110,12 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): continue else: self.testcaseEnv.printTestCaseDescription(testcaseDirName) - system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName) + system_test_utils.configure_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName) + self.log_message("allocating cluster resources...") + system_test_utils.allocate_cluster_nodes(self.systemTestEnv) + + self.setup_remote_hosts(self.systemTestEnv) # ============================================================================== # # ============================================================================== # @@ -457,3 +461,5 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): self.log_message("stopping all entities - please wait ...") kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv) + self.log_message("freeing cluster resources...") + system_test_utils.free_cluster_nodes(self.systemTestEnv) \ No newline at end of file diff --git a/system_test/system_test_env.py b/system_test/system_test_env.py index c24d3e8..ec8a2b4 100644 --- a/system_test/system_test_env.py +++ b/system_test/system_test_env.py @@ -26,6 +26,7 @@ import os import sys from utils import system_test_utils +from utils.cluster import Cluster class SystemTestEnv(): @@ -61,9 +62,12 @@ class SystemTestEnv(): printTestDescriptionsOnly = False doNotValidateRemoteHost = False - def __init__(self): + def __init__(self, clusterType): "Create an object with this system test session environment" + # Load/connect the cluster resource description + self.cluster = Cluster.create(clusterType, self) + # load the system level cluster config system_test_utils.load_cluster_config(self.CLUSTER_CONFIG_PATHNAME, self.clusterEntityConfigDictList) diff --git a/system_test/system_test_runner.py b/system_test/system_test_runner.py index ee7aa25..93d7597 100644 --- a/system_test/system_test_runner.py +++ b/system_test/system_test_runner.py @@ -57,6 +57,12 @@ def main(): aLogger = logging.getLogger('anonymousLogger') optionParser = OptionParser() + optionParser.add_option("-c", "--cluster", + dest="clusterType", + default="localhost", + action="store", + type="string", + help="The source of cluster resources (localhost, json, vagrant)") optionParser.add_option("-p", "--print-test-descriptions-only", dest="printTestDescriptionsOnly", default=False, @@ -81,23 +87,13 @@ def main(): # SystemTestEnv is a class to provide all environement settings for this session # such as the SYSTEM_TEST_BASE_DIR, SYSTEM_TEST_UTIL_DIR, ... - systemTestEnv = SystemTestEnv() + systemTestEnv = SystemTestEnv(options.clusterType) if options.printTestDescriptionsOnly: systemTestEnv.printTestDescriptionsOnly = True if options.doNotValidateRemoteHost: systemTestEnv.doNotValidateRemoteHost = True - if not systemTestEnv.printTestDescriptionsOnly: - if not systemTestEnv.doNotValidateRemoteHost: - if not system_test_utils.setup_remote_hosts(systemTestEnv): - nLogger.error("Remote hosts sanity check failed. Aborting test ...", extra=d) - print - sys.exit(1) - else: - nLogger.info("SKIPPING : checking remote machines", extra=d) - print - # get all defined names within a module: definedItemList = dir(SystemTestEnv) aLogger.debug("=================================================") @@ -119,7 +115,7 @@ def main(): testModulePathName = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + dirName) if not systemTestEnv.printTestDescriptionsOnly: - system_test_utils.setup_remote_hosts_with_testsuite_level_cluster_config(systemTestEnv, testModulePathName) + system_test_utils.configure_remote_hosts_with_testsuite_level_cluster_config(systemTestEnv, testModulePathName) # go through all test modules file in this testsuite for moduleFileName in os.listdir(testModulePathName): diff --git a/system_test/utils/cluster.py b/system_test/utils/cluster.py new file mode 100644 index 0000000..2515e43 --- /dev/null +++ b/system_test/utils/cluster.py @@ -0,0 +1,235 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#!/usr/bin/env python + +# =================================== +# cluster.py +# =================================== + +import collections, json, os.path, subprocess + +class RemoteAccount(object): + def __init__(self, hostname, user=None, ssh_args=None, java_home="default", kafka_home="default"): + self.hostname = hostname + self.user = user + self.ssh_args = ssh_args + self.java_home = java_home + self.kafka_home = kafka_home + + @property + def local(self): + "Returns true if this 'remote' account is actually local. This is only a heuristic, but should work for simple local testing." + return self.hostname == "localhost" and self.user is None and self.ssh_args is None + + def ssh_command(self, cmd): + r = "ssh " + if self.user: + r += self.user + "@" + r += self.hostname + " " + if self.ssh_args: + r += self.ssh_args + " " + r += "'" + cmd.replace("'", "'\\''") + "'" + return r + + + def scp_from_command(self, src, dest, recursive=False): + r = "scp " + if self.ssh_args: + r += self.ssh_args + " " + if recursive: + r += "-r " + if self.user: + r += self.user + "@" + r += self.hostname + ":" + src + " " + dest + return r + + + def scp_to_command(self, src, dest, recursive=False): + r = "scp " + if self.ssh_args: + r += self.ssh_args + " " + if recursive: + r += "-r " + r += src + " " + if self.user: + r += self.user + "@" + r += self.hostname + ":" + dest + return r + + + def rsync_to_command(self, flags, src_dir, dest_dir): + r = "rsync " + if self.ssh_args: + r += "-e \"ssh " + self.ssh_args + "\" " + if flags: + r += flags + r += src_dir + if self.user: + r += self.user + "@" + r += self.hostname + ":" + dest_dir + return r + + + def __str__(self): + r = "" + if self.user: + r += self.user + "@" + r += self.hostname + return r + + +class ClusterSlot(object): + def __init__(self, parent, account, **kwargs): + self.parent = parent + self.account = account + for k,v in kwargs.items(): + setattr(self, k, v) + + def free(self): + self.parent.free(self) + + +class Cluster(object): + ''' + Interface for a cluster -- a collection of nodes with login credentials. This interface doesn't + define any mapping of roles/services to nodes. It only interacts with some underlying system that + can describe available resources and mediates reservations of those resources. This is intentionally + simple right now: the only "resource" right now is a generic VM and it is assumed everything is + approximately homogeneous. + ''' + + _FACTORY = {} + + @classmethod + def create(cls, type, systemTestEnv, *args, **kwargs): + return cls._FACTORY[type](systemTestEnv, *args, **kwargs) + + def __init__(self, systemTestEnv): + self.systemTestEnv = systemTestEnv + + def request(self, nslots): + "Request the specified number of slots, which will be reserved until they are freed by the caller." + raise NotImplementedError() + + def free(self, slots): + "Free the given slot or list of slots" + if isinstance(slots, collections.Iterable): + for s in slots: + self.free_single(s) + else: + self.free_single(slots) + + def free_single(self, slot): + raise NotImplementedError() + + +class LocalhostCluster(Cluster): + ''' + A "cluster" that runs entirely on localhost using default credentials. This doesn't require any user + configuration and is equivalent to the old defaults in cluster_config.json. There are no constraints + on the resources available. + ''' + + def request(self, nslots): + return [ClusterSlot(self, RemoteAccount("localhost")) for i in range(nslots)] + + def free_single(self, slot): + pass + + +class JsonCluster(Cluster): + ''' + An implementation of Cluster that uses static settings specified in a cluster file. + ''' + + def __init__(self, systemTestEnv, cluster_json=None): + super(JsonCluster, self).__init__(systemTestEnv) + if cluster_json is None: + cluster_json_path = os.path.abspath(os.path.join(self.systemTestEnv.SYSTEM_TEST_BASE_DIR, "cluster.json")) + cluster_json = json.load(open(cluster_json_path)) + init_nodes = [RemoteAccount(ninfo["hostname"], ninfo.get("user"), ninfo.get("ssh_args"), + ninfo.get("java_home", "default"), ninfo.get("kafka_home", "default")) + for ninfo in cluster_json["nodes"]] + self.available_nodes = collections.deque(init_nodes) + self.reuse_nodes = cluster_json.get("reuse_nodes", False) + self.in_use_nodes = set() + self.id_source = 1 + + def request(self, nslots): + if nslots > len(self.available_nodes) and not self.reuse_nodes: + raise RuntimeError("There aren't enough available nodes to satisfy the resource request.") + + result = [] + for i in range(nslots): + node = self.available_nodes.popleft() + result.append(ClusterSlot(self, node, slot_id=self.id_source)) + self.in_use_nodes.add(self.id_source) + self.id_source += 1 + if self.reuse_nodes: + self.available_nodes.append(node) + return result + + def free_single(self, slot): + assert(slot.slot_id in self.in_use_nodes) + self.in_use_nodes.remove(slot.slot_id) + if not self.reuse_nodes: + self.available_nodes.append(slot.account) + + +class VagrantCluster(JsonCluster): + ''' + An implementation of Cluster that uses a set of VMs created by Vagrant. Because we need hostnames that can be + advertised, this assumes that the Vagrant VM's name is a routeable hostname on all the hosts. + ''' + + def __init__(self, systemTestEnv): + hostname, username, flags = None, None, "" + nodes = [] + for line in (subprocess.check_output("vagrant ssh-config", shell=True)).split("\n"): + line = line.strip() + if len(line.strip()) == 0: + if hostname is not None: + nodes.append({ + "hostname": hostname, + "user": username, + "ssh_args": flags, + # java_home is determined automatically, but we need to explicitly indicate that should be the case + # instead of using "default" + "java_home": None, + "kafka_home": "/opt/kafka", + }) + hostname, username, flags = None, None, "" + continue + key,val = line.split() + if key == "Host": + hostname = val + elif key == "Hostname": + # Ignore since we use the Vagrant VM name + pass + elif key == "User": + username = val + else: + flags += "-o '" + line + "' " + cluster_json = { + "nodes": nodes, + "reuse_nodes": True + } + super(VagrantCluster, self).__init__(systemTestEnv, cluster_json) + +Cluster._FACTORY["localhost"] = LocalhostCluster +Cluster._FACTORY["json"] = JsonCluster +Cluster._FACTORY["vagrant"] = VagrantCluster \ No newline at end of file diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index c65650e..bae0314 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -138,7 +138,7 @@ def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): rmtLogPathName = logPathName rmtMetricsPathName = metricsPathName - if host.hostname != "localhost": + if not host.local: rmtConfigPathName = replace_kafka_home(configPathName, kafkaHome) rmtMetricsPathName = replace_kafka_home(metricsPathName, kafkaHome) rmtLogPathName = replace_kafka_home(logPathName, kafkaHome) @@ -175,7 +175,7 @@ def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards") rmtDashboardsPathName = dashboardsPathName - if host.hostname != "localhost": + if not host.local: rmtDashboardsPathName = replace_kafka_home(dashboardsPathName, kafkaHome) system_test_utils.scp_from_remote(host, rmtDashboardsPathName, dashboardsPathName) @@ -199,7 +199,7 @@ def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv): metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics") dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards") - if hostname != "localhost": + if not host.local: configPathName = replace_kafka_home(configPathName, kafkaHome) metricsPathName = replace_kafka_home(metricsPathName, kafkaHome) dashboardsPathName = replace_kafka_home(dashboardsPathName, kafkaHome) @@ -444,7 +444,7 @@ def scp_test_case_configs_to_remote_host(clusterEntityConfigDictList, testcaseEn localTestcasePathName = testcaseEnv.testCaseBaseDir remoteTestcasePathName = localTestcasePathName - if host.hostname != "localhost": + if not host.local: remoteTestcasePathName = replace_kafka_home(localTestcasePathName, kafkaHome) system_test_utils.scp_to_remote(localTestcasePathName + "/config/*", host, remoteTestcasePathName + "/config") @@ -595,7 +595,7 @@ def get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") - if host.hostname != "localhost": + if not host.local: logPathName = replace_kafka_home(logPathName, kafkaHome) subproc = system_test_utils.remote_async_sys_call(host, @@ -665,7 +665,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "config") logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "default") - if host.hostname != "localhost": + if not host.local: configPathName = replace_kafka_home(configPathName, kafkaHome) logPathName = replace_kafka_home(logPathName, kafkaHome) @@ -816,7 +816,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv): consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") metricsDir = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "metrics"), - if host.hostname != "localhost": + if not host.local: consumerLogPath = replace_kafka_home(consumerLogPath, kafkaHome) #metricsDir = replace_kafka_home(metricsDir, kafkaHome) @@ -1020,7 +1020,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") metricsDir = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "metrics") - if host.hostname != "localhost": + if not host.local: producerLogPath = replace_kafka_home(producerLogPath, kafkaHome) metricsDir = replace_kafka_home(metricsDir, kafkaHome) @@ -1191,7 +1191,7 @@ def create_topic_for_producer_performance(systemTestEnv, testcaseEnv): testcaseBaseDir = testcaseEnv.testCaseBaseDir - if zkHost.hostname != "localhost": + if not zkHost.local: testcaseBaseDir = replace_kafka_home(testcaseBaseDir, kafkaHome) for topic in topicsList: @@ -1409,7 +1409,7 @@ def cleanup_data_at_remote_hosts(systemTestEnv, testcaseEnv): cmdStr = "" dataDir = "" - if host.hostname == "localhost": + if host.local: remoteTestCaseBaseDir = testCaseBaseDir else: remoteTestCaseBaseDir = replace_kafka_home(testCaseBaseDir, kafkaHome) @@ -1859,7 +1859,7 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") - if host.hostname != "localhost": + if not host.local: consumerLogPath = replace_kafka_home(consumerLogPath, kafkaHome) # testcase configurations: diff --git a/system_test/utils/metrics.py b/system_test/utils/metrics.py index cc0319f..5818dfe 100644 --- a/system_test/utils/metrics.py +++ b/system_test/utils/metrics.py @@ -40,6 +40,7 @@ import numpy from pyh import * import kafka_system_test_utils import system_test_utils +from cluster import RemoteAccount logger = logging.getLogger("namedLogger") thisClassName = '(metrics)' @@ -262,11 +263,11 @@ def start_metrics_collection(jmxHost, jmxPort, role, entityId, systemTestEnv, te entityMetricsDir + "/entity_pid"] startMetricsCommand = " ".join(startMetricsCmdList) - system_test_utils.remote_async_sys_call(system_test_utils.RemoteAccount(jmxHost), startMetricsCommand) + system_test_utils.remote_async_sys_call(RemoteAccount(jmxHost), startMetricsCommand) time.sleep(1) pidCmdStr = "cat " + entityMetricsDir + "/entity_pid 2> /dev/null" - subproc = system_test_utils.remote_async_sys_call(system_test_utils.RemoteAccount(jmxHost), pidCmdStr) + subproc = system_test_utils.remote_async_sys_call(RemoteAccount(jmxHost), pidCmdStr) # keep track of JMX ppid in a dictionary of entity_id to list of JMX ppid # testcaseEnv.entityJmxParentPidDict: diff --git a/system_test/utils/setup_utils.py b/system_test/utils/setup_utils.py index 0e8b7f9..4102633 100644 --- a/system_test/utils/setup_utils.py +++ b/system_test/utils/setup_utils.py @@ -22,7 +22,7 @@ # ================================================================= import logging -import kafka_system_test_utils +import system_test_utils import sys class SetupUtils(object): @@ -45,3 +45,13 @@ class SetupUtils(object): self.anonLogger.info(message) self.anonLogger.info("======================================================") + def setup_remote_hosts(self, systemTestEnv): + if not systemTestEnv.printTestDescriptionsOnly: + if not systemTestEnv.doNotValidateRemoteHost: + if not system_test_utils.setup_remote_hosts(systemTestEnv): + self.logger.error("Remote hosts sanity check failed. Aborting test ...", extra=self.d) + print + sys.exit(1) + else: + self.logger.info("SKIPPING : checking remote machines", extra=self.d) + print diff --git a/system_test/utils/system_test_utils.py b/system_test/utils/system_test_utils.py index 554fcbd..1a9e83b 100644 --- a/system_test/utils/system_test_utils.py +++ b/system_test/utils/system_test_utils.py @@ -32,75 +32,32 @@ import socket import subprocess import sys import time +from cluster import RemoteAccount logger = logging.getLogger("namedLogger") aLogger = logging.getLogger("anonymousLogger") thisClassName = '(system_test_utils)' d = {'name_of_class': thisClassName} - -class RemoteAccount(object): - def __init__(self, hostname, user=None, ssh_args=None): - self.hostname = hostname - self.user = user - self.ssh_args = ssh_args - - - def ssh_command(self, cmd): - r = "ssh " - if self.user: - r += self.user + "@" - r += self.hostname + " " - if self.ssh_args: - r += self.ssh_args + " " - r += "'" + cmd.replace("'", "'\\''") + "'" - return r - - - def scp_from_command(self, src, dest, recursive=False): - r = "scp " - if self.ssh_args: - r += self.ssh_args + " " - if recursive: - r += "-r " - if self.user: - r += self.user + "@" - r += self.hostname + ":" + src + " " + dest - return r - - - def scp_to_command(self, src, dest, recursive=False): - r = "scp " - if self.ssh_args: - r += self.ssh_args + " " - if recursive: - r += "-r " - r += src + " " - if self.user: - r += self.user + "@" - r += self.hostname + ":" + dest - return r - - - def rsync_to_command(self, flags, src_dir, dest_dir): - r = "rsync " - if self.ssh_args: - r += "-e \"ssh " + self.ssh_args + "\" " - if flags: - r += flags - r += src_dir - if self.user: - r += self.user + "@" - r += self.hostname + ":" + dest_dir - return r - - - def __str__(self): - r = "" - if self.user: - r += self.user + "@" - r += self.hostname - return r +def allocate_cluster_nodes(systemTestEnv): + slots = systemTestEnv.cluster.request(len(systemTestEnv.clusterEntityConfigDictList)) + + for clusterEntityConfigDict,slot in zip(systemTestEnv.clusterEntityConfigDictList,slots): + clusterEntityConfigDict["cluster_slot"] = slot + clusterEntityConfigDict["account"] = slot.account + # FIXME ideally this wouldn't be needed, but doing so would require a lot of cleanup and careful testing + clusterEntityConfigDict["hostname"] = slot.account.hostname + clusterEntityConfigDict["java_home"] = slot.account.java_home + clusterEntityConfigDict["kafka_home"] = slot.account.kafka_home + +def free_cluster_nodes(systemTestEnv): + for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: + slot = clusterEntityConfigDict.get("cluster_slot") + if slot is None: continue + slot.free() + del clusterEntityConfigDict["cluster_slot"] + del clusterEntityConfigDict["account"] + del clusterEntityConfigDict["hostname"] def get_remote_account(clusterEntityConfig, entityId=None): ''' @@ -109,18 +66,11 @@ def get_remote_account(clusterEntityConfig, entityId=None): ''' if type(clusterEntityConfig) == dict: - return RemoteAccount( - clusterEntityConfig["hostname"], - clusterEntityConfig.get("user"), - clusterEntityConfig.get("ssh_args") - ) + return clusterEntityConfig["account"] assert(type(clusterEntityConfig) == list) assert(entityId != None) - hostname = get_data_by_lookup_keyval(clusterEntityConfig, "entity_id", entityId, "hostname") - user = get_data_by_lookup_keyval(clusterEntityConfig, "entity_id", entityId, "user") - ssh_args = get_data_by_lookup_keyval(clusterEntityConfig, "entity_id", entityId, "ssh_args") - return RemoteAccount(hostname, user, ssh_args) + return get_data_by_lookup_keyval(clusterEntityConfig, "entity_id", entityId, "account") def get_current_unix_timestamp(): @@ -421,20 +371,24 @@ def setup_remote_hosts(systemTestEnv): # when configuring "default" java_home, use JAVA_HOME environment variable, if exists # otherwise, use the directory with the java binary + BIN_JAVA_REGEX = "(.*)\/bin\/java$" localJavaHome = os.environ.get('JAVA_HOME') if localJavaHome is not None: localJavaBin = localJavaHome + '/bin/java' else: - subproc = async_sys_call("which java") + subproc = async_sys_call("readlink -f `which java`") + subproc.wait() + if subproc.returncode != 0: + logger.error("No Java binary found in local host", extra=d) + return False for line in subproc.stdout.readlines(): - if line.startswith("which: no "): + line = line.rstrip('\n') + localJavaBin = line + matchObj = re.match(BIN_JAVA_REGEX, line) + if matchObj is None: logger.error("No Java binary found in local host", extra=d) return False - else: - line = line.rstrip('\n') - localJavaBin = line - matchObj = re.match("(.*)\/bin\/java$", line) - localJavaHome = matchObj.group(1) + localJavaHome = matchObj.group(1) listIndex = -1 for clusterEntityConfigDict in clusterEntityConfigDictList: @@ -442,20 +396,36 @@ def setup_remote_hosts(systemTestEnv): host = get_remote_account(clusterEntityConfigDict) kafkaHome = clusterEntityConfigDict["kafka_home"] - javaHome = clusterEntityConfigDict["java_home"] + javaHome = clusterEntityConfigDict.get("java_home") - if host.hostname == "localhost" and javaHome == "default": + if host.local and (javaHome is None or javaHome == "default"): clusterEntityConfigDictList[listIndex]["java_home"] = localJavaHome + elif javaHome is None: + logger.debug("finding java_home in host [" + str(host) + "]", extra=d) + subproc = remote_async_sys_call(host, "readlink -f `which java`") + subproc.wait() + if subproc.returncode != 0: + logger.error("Couldn't find java binary on host [" + str(host) + "]", extra=d) + return False + javaBin = subproc.stdout.readlines()[0].strip() + matchObj = re.match(BIN_JAVA_REGEX, javaBin) + if matchObj is None: + logger.error("No Java binary found in local host", extra=d) + return False + javaHome = matchObj.group(1) + clusterEntityConfigDict["java_home"] = javaHome + host = get_remote_account(clusterEntityConfigDict) - if host.hostname == "localhost" and kafkaHome == "default": + if host.local and (kafkaHome is None or kafkaHome == "default"): clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome - if host.hostname == "localhost" and kafkaHome == "system_test/migration_tool_testsuite/0.7": + if host.local and kafkaHome == "system_test/migration_tool_testsuite/0.7": clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome + "/system_test/migration_tool_testsuite/0.7" + host = get_remote_account(clusterEntityConfigDict) kafkaHome = clusterEntityConfigDict["kafka_home"] - javaHome = clusterEntityConfigDict["java_home"] + javaHome = clusterEntityConfigDict.get("java_home") - logger.debug("checking java binary [" + localJavaBin + "] in host [" + str(host) + "]", extra=d) + logger.debug("checking java binary [" + javaHome + "] in host [" + str(host) + "]", extra=d) if not remote_host_directory_exists(host, javaHome): logger.error("Directory not found: [" + javaHome + "] in host [" + str(host) + "]", extra=d) return False @@ -463,7 +433,7 @@ def setup_remote_hosts(systemTestEnv): logger.debug("checking directory [" + kafkaHome + "] in host [" + str(host) + "]", extra=d) if not remote_host_directory_exists(host, kafkaHome): logger.info("Directory not found: [" + kafkaHome + "] in host [" + str(host) + "]", extra=d) - if host.hostname == "localhost": + if host.local: return False else: localKafkaSourcePath = systemTestEnv.SYSTEM_TEST_BASE_DIR + "/.." @@ -526,7 +496,7 @@ def load_cluster_config(clusterConfigPathName, clusterEntityConfigDictList): for cfg in cfgList: clusterEntityConfigDictList.append(cfg) -def setup_remote_hosts_with_testcase_level_cluster_config(systemTestEnv, testCasePathName): +def configure_remote_hosts_with_testcase_level_cluster_config(systemTestEnv, testCasePathName): # ======================================================================= # starting a new testcase, check for local cluster_config.json # ======================================================================= @@ -571,14 +541,8 @@ def setup_remote_hosts_with_testcase_level_cluster_config(systemTestEnv, testCas # restore the system_test/cluster_config.json systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListInSystemTestLevel) - # set up remote hosts - if not setup_remote_hosts(systemTestEnv): - logger.error("Remote hosts sanity check failed. Aborting test ...", extra=d) - print - sys.exit(1) - print -def setup_remote_hosts_with_testsuite_level_cluster_config(systemTestEnv, testModulePathName): +def configure_remote_hosts_with_testsuite_level_cluster_config(systemTestEnv, testModulePathName): # ======================================================================= # starting a new testsuite, check for local cluster_config.json: # ======================================================================= @@ -614,12 +578,6 @@ def setup_remote_hosts_with_testsuite_level_cluster_config(systemTestEnv, testMo # restore the system_test/cluster_config.json systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListInSystemTestLevel) - # set up remote hosts - if not setup_remote_hosts(systemTestEnv): - logger.error("Remote hosts sanity check failed. Aborting test ...", extra=d) - print - sys.exit(1) - print # ================================================= # lists_diff_count -- 2.1.2