Index: system_test/utils/kafka_system_test_utils.py =================================================================== --- system_test/utils/kafka_system_test_utils.py (revision 1383496) +++ system_test/utils/kafka_system_test_utils.py (working copy) @@ -29,6 +29,7 @@ import re import subprocess import sys +import thread import time import traceback @@ -370,7 +371,7 @@ line = line.rstrip('\n') if testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] in line: - logger.info("found the log line : " + line, extra=d) + logger.debug("found the log line : " + line, extra=d) try: matchObj = re.match(testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"], line) datetimeStr = matchObj.group(1) @@ -379,7 +380,7 @@ #print "{0:.3f}".format(unixTs) shutdownBrokerDict["timestamp"] = unixTs shutdownBrokerDict["brokerid"] = matchObj.group(2) - logger.info("brokerid: [" + shutdownBrokerDict["brokerid"] + "] entity_id: [" + shutdownBrokerDict["entity_id"] + "]", extra=d) + logger.debug("brokerid: [" + shutdownBrokerDict["brokerid"] + "] entity_id: [" + shutdownBrokerDict["entity_id"] + "]", extra=d) return shutdownBrokerDict except: logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d) @@ -423,7 +424,7 @@ line = line.rstrip('\n') if testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] in line: - logger.info("found the log line : " + line, extra=d) + logger.debug("found the log line : " + line, extra=d) try: matchObj = re.match(testcaseEnv.userDefinedEnvVarDict["REGX_LEADER_ELECTION_PATTERN"], line) datetimeStr = matchObj.group(1) @@ -441,7 +442,7 @@ leaderDict["partition"] = matchObj.group(4) leaderDict["entity_id"] = brokerEntityId leaderDict["hostname"] = hostname - logger.info("brokerid: [" + leaderDict["brokerid"] + "] entity_id: [" + leaderDict["entity_id"] + "]", extra=d) + logger.debug("brokerid: [" + leaderDict["brokerid"] + "] entity_id: [" + leaderDict["entity_id"] + "]", extra=d) except: logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d) raise @@ -504,7 +505,7 @@ system_test_utils.async_sys_call(cmdStr) time.sleep(5) - pidCmdStr = "ssh " + hostname + " 'cat " + logPathName + "/entity_" + entityId + "_pid'" + pidCmdStr = "ssh " + hostname + " 'cat " + logPathName + "/entity_" + entityId + "_pid' 2> /dev/null" logger.debug("executing command: [" + pidCmdStr + "]", extra=d) subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) @@ -551,7 +552,7 @@ "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, kafkaRunClassBin + " kafka.consumer.ConsoleConsumer", - commandArgs + " &> " + consumerLogPathName, + commandArgs + " >> " + consumerLogPathName, " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] cmdStr = " ".join(cmdList) @@ -573,84 +574,107 @@ tokens = line.split(':') testcaseEnv.consumerHostParentPidDict[host] = tokens[1] - def start_producer_performance(systemTestEnv, testcaseEnv): - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - testcaseConfigsList = testcaseEnv.testcaseConfigsList + entityConfigList = systemTestEnv.clusterEntityConfigDictList + testcaseConfigsList = testcaseEnv.testcaseConfigsList brokerListStr = "" # construct "broker-list" for producer - for clusterEntityConfigDict in clusterEntityConfigDictList: - entityRole = clusterEntityConfigDict["role"] + for entityConfig in entityConfigList: + entityRole = entityConfig["role"] if entityRole == "broker": - hostname = clusterEntityConfigDict["hostname"] - entityId = clusterEntityConfigDict["entity_id"] + hostname = entityConfig["hostname"] + entityId = entityConfig["entity_id"] port = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "port") if len(brokerListStr) == 0: brokerListStr = hostname + ":" + port else: brokerListStr = brokerListStr + "," + hostname + ":" + port - - producerConfigList = system_test_utils.get_dict_from_list_of_dicts( \ - clusterEntityConfigDictList, "role", "producer_performance") + producerConfigList = system_test_utils.get_dict_from_list_of_dicts(entityConfigList, "role", "producer_performance") for producerConfig in producerConfigList: host = producerConfig["hostname"] entityId = producerConfig["entity_id"] jmxPort = producerConfig["jmx_port"] role = producerConfig["role"] - kafkaHome = system_test_utils.get_data_by_lookup_keyval( \ - clusterEntityConfigDictList, "entity_id", entityId, "kafka_home") - javaHome = system_test_utils.get_data_by_lookup_keyval( \ - clusterEntityConfigDictList, "entity_id", entityId, "java_home") - jmxPort = system_test_utils.get_data_by_lookup_keyval( \ - clusterEntityConfigDictList, "entity_id", entityId, "jmx_port") - kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" - logger.info("starting producer preformance", extra=d) + thread.start_new_thread(start_producer_in_thread, (testcaseEnv, entityConfigList, producerConfig, brokerListStr)) + time.sleep(1) + metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv) - producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") - producerLogPathName = producerLogPath + "/producer_performance.log" +def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, brokerListStr): + host = producerConfig["hostname"] + entityId = producerConfig["entity_id"] + jmxPort = producerConfig["jmx_port"] + role = producerConfig["role"] + kafkaHome = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "kafka_home") + javaHome = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "java_home") + jmxPort = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "jmx_port") + kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" - testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName + logger.info("starting producer preformance", extra=d) - commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"]) - cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, - "JMX_PORT=" + jmxPort, - kafkaRunClassBin + " kafka.perf.ProducerPerformance", - "--broker-list " +brokerListStr, - commandArgs + " &> " + producerLogPathName, - " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"] + producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + producerLogPathName = producerLogPath + "/producer_performance.log" - cmdStr = " ".join(cmdList) - logger.debug("executing command: [" + cmdStr + "]", extra=d) - system_test_utils.async_sys_call(cmdStr) - time.sleep(1) - metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv) + testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName + commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"]) - pidCmdStr = "ssh " + host + " 'cat " + producerLogPath + "/entity_" + entityId + "_pid'" - logger.debug("executing command: [" + pidCmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) + counter = 0 + noMsgPerBatch = int(testcaseEnv.testcaseArgumentsDict["num_messages_to_produce_per_producer_call"]) + producerSleepSec = int(testcaseEnv.testcaseArgumentsDict["sleep_seconds_between_producer_calls"]) - # keep track of the remote entity pid in a dictionary - for line in subproc.stdout.readlines(): - if line.startswith("pid"): - line = line.rstrip('\n') - logger.debug("found pid line: [" + line + "]", extra=d) - tokens = line.split(':') - testcaseEnv.producerHostParentPidDict[host] = tokens[1] + # keep calling producer until signaled by: + # testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] + while 1: + testcaseEnv.lock.acquire() + if not testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]: + initMsgId = counter * noMsgPerBatch + logger.info("#### [producer thread] status of stopBackgroundProducer : [False] => producing [" + str(noMsgPerBatch) + \ + "] messages with starting message id : [" + str(initMsgId) + "]", extra=d) + cmdList = ["ssh " + host, + "'JAVA_HOME=" + javaHome, + "JMX_PORT=" + jmxPort, + kafkaRunClassBin + " kafka.perf.ProducerPerformance", + "--broker-list " + brokerListStr, + "--initial-message-id " + str(initMsgId), + "--messages " + str(noMsgPerBatch), + commandArgs + " >> " + producerLogPathName, + " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"] + + cmdStr = " ".join(cmdList) + logger.debug("executing command: [" + cmdStr + "]", extra=d) + + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + pass # dummy loop to wait until producer is completed + else: + testcaseEnv.lock.release() + break + + counter += 1 + testcaseEnv.lock.release() + time.sleep(int(producerSleepSec)) + + # let the main testcase know producer has stopped + testcaseEnv.lock.acquire() + testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = True + time.sleep(1) + testcaseEnv.lock.release() + time.sleep(1) + + def stop_remote_entity(systemTestEnv, entityId, parentPid): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname") pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid) - logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d) + logger.debug("terminating process id: " + parentPid + " in host: " + hostname, extra=d) system_test_utils.sigterm_remote_process(hostname, pidStack) # time.sleep(1) # system_test_utils.sigkill_remote_process(hostname, pidStack) @@ -662,7 +686,7 @@ hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname") pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid) - logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d) + logger.debug("terminating process id: " + parentPid + " in host: " + hostname, extra=d) system_test_utils.sigkill_remote_process(hostname, pidStack) @@ -737,7 +761,9 @@ outfile.close() logger.info("no. of unique messages sent from publisher : " + str(len(producerMsgIdSet)), extra=d) - logger.info("no. of unique messages received by consumer : " + str(len(producerMsgIdSet)), extra=d) + logger.info("no. of unique messages received by consumer : " + str(len(consumerMsgIdSet)), extra=d) + validationStatusDict["Unique messages from producer"] = str(len(producerMsgIdSet)) + validationStatusDict["Unique messages from consumer"] = str(len(consumerMsgIdSet)) if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ): validationStatusDict["Validate for data matched"] = "PASSED" @@ -892,15 +918,15 @@ # get broker shut down completed timestamp shutdownBrokerDict = get_broker_shutdown_log_line(systemTestEnv, testcaseEnv) #print shutdownBrokerDict - logger.info("unix timestamp of shut down completed: " + str("{0:.6f}".format(shutdownBrokerDict["timestamp"])), extra=d) + logger.debug("unix timestamp of shut down completed: " + str("{0:.6f}".format(shutdownBrokerDict["timestamp"])), extra=d) - logger.info("looking up new leader", extra=d) + logger.debug("looking up new leader", extra=d) leaderDict2 = get_leader_elected_log_line(systemTestEnv, testcaseEnv) #print leaderDict2 - logger.info("unix timestamp of new elected leader: " + str("{0:.6f}".format(leaderDict2["timestamp"])), extra=d) + logger.debug("unix timestamp of new elected leader: " + str("{0:.6f}".format(leaderDict2["timestamp"])), extra=d) leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownBrokerDict["timestamp"]) logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec", extra=d) - testcaseEnv.validationStatusDict["Leader Election Latency"] = str("{0:.2f}".format(leaderReElectionLatency * 1000)) + " ms" + #testcaseEnv.validationStatusDict["Leader Election Latency"] = str("{0:.2f}".format(leaderReElectionLatency * 1000)) + " ms" return leaderReElectionLatency Index: system_test/utils/testcase_env.py =================================================================== --- system_test/utils/testcase_env.py (revision 1383496) +++ system_test/utils/testcase_env.py (working copy) @@ -23,6 +23,7 @@ import json import os import sys +import thread class TestcaseEnv(): @@ -83,11 +84,11 @@ self.testCaseBaseDir = "" self.testCaseLogsDir = "" self.testCaseDashboardsDir = "" + # ================================ # dictionary to keep track of # user-defined environment variables # ================================ - # LEADER_ELECTION_COMPLETED_MSG = "completed the leader state transition" # REGX_LEADER_ELECTION_PATTERN = "\[(.*?)\] .* Broker (.*?) " + \ # LEADER_ELECTION_COMPLETED_MSG + \ @@ -97,6 +98,9 @@ # consumerConfigPathName = "" # producerLogPathName = "" # producerConfigPathName = "" - self.userDefinedEnvVarDict = {} + # Lock object for producer threads synchronization + self.lock = thread.allocate_lock() + + Index: system_test/replication_testsuite/config/producer_performance.properties =================================================================== --- system_test/replication_testsuite/config/producer_performance.properties (revision 1383496) +++ system_test/replication_testsuite/config/producer_performance.properties (working copy) @@ -1,6 +1,4 @@ topic=mytest -messages=200 message-size=100 thread=5 -initial-message-id=0 compression-codec=0 Index: system_test/replication_testsuite/replica_basic_test.py =================================================================== --- system_test/replication_testsuite/replica_basic_test.py (revision 1383496) +++ system_test/replication_testsuite/replica_basic_test.py (working copy) @@ -82,7 +82,9 @@ self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self) self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName - # initialize self.testcaseEnv with user-defined environment + # ====================================================================== + # initialize self.testcaseEnv with user-defined environment variables + # ====================================================================== self.testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] = ReplicaBasicTest.brokerShutDownCompletedPattern self.testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"] = \ "\[(.*?)\] .* \[Kafka Server (.*?)\], " + ReplicaBasicTest.brokerShutDownCompletedPattern @@ -94,7 +96,9 @@ " for topic (.*?) partition (.*?) \(.*" self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = "" - + self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = False + self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False + # find testcase properties json file testcasePropJsonPathName = system_test_utils.get_testcase_prop_json_pathname(testCasePathName) self.logger.debug("testcasePropJsonPathName : " + testcasePropJsonPathName, extra=self.d) @@ -168,7 +172,7 @@ # by overriding the settings specified in: # system_test/_testsuite/testcase_/testcase__properties.json kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName, self.testcaseEnv, self.systemTestEnv) - + # ============================================= # preparing all entities to start the test # ============================================= @@ -176,63 +180,92 @@ kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv) self.anonLogger.info("sleeping for 2s") time.sleep(2) - + self.log_message("starting brokers") kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv) - self.anonLogger.info("sleeping for 5s") - time.sleep(5) - + self.anonLogger.info("sleeping for 2s") + time.sleep(2) + self.log_message("creating topics") kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv) self.anonLogger.info("sleeping for 5s") time.sleep(5) - - self.log_message("looking up leader") - leaderDict = kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv, self.testcaseEnv) - - # ========================== - # leaderDict looks like this: - # ========================== - #{'entity_id': u'3', - # 'partition': '0', - # 'timestamp': 1345050255.8280001, - # 'hostname': u'localhost', - # 'topic': 'test_1', - # 'brokerid': '3'} - + # ============================================= - # validate to see if leader election is successful - # ============================================= - self.log_message("validating leader election") - result = kafka_system_test_utils.validate_leader_election_successful( \ - self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict) - - # ============================================= - # get leader re-election latency - # ============================================= - bounceLeaderFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_leader"] - self.log_message("bounce_leader flag : " + bounceLeaderFlag) - if (bounceLeaderFlag.lower() == "true"): - reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict) - - # ============================================= # starting producer # ============================================= self.log_message("starting producer in the background") kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv) - self.anonLogger.info("sleeping for 10s") - time.sleep(10) - - # ============================================= - # starting previously terminated broker - # ============================================= - if bounceLeaderFlag.lower() == "true": - self.log_message("starting the previously terminated broker") - stoppedLeaderEntityId = leaderDict["entity_id"] - kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId) + self.anonLogger.info("sleeping for 5s") + time.sleep(5) + + i = 1 + numIterations = int(self.testcaseEnv.testcaseArgumentsDict["num_iteration"]) + while i <= numIterations: + + self.log_message("Iteration " + str(i) + " of " + str(numIterations)) + + # looking up leader + leaderDict = kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv, self.testcaseEnv) + + # ========================== + # leaderDict looks like this: + # ========================== + #{'entity_id': u'3', + # 'partition': '0', + # 'timestamp': 1345050255.8280001, + # 'hostname': u'localhost', + # 'topic': 'test_1', + # 'brokerid': '3'} + + # ============================================= + # validate to see if leader election is successful + # ============================================= + self.log_message("validating leader election") + result = kafka_system_test_utils.validate_leader_election_successful( \ + self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict) + + # ============================================= + # get leader re-election latency by stopping leader + # ============================================= + bounceLeaderFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_leader"] + self.log_message("bounce_leader flag : " + bounceLeaderFlag) + if (bounceLeaderFlag.lower() == "true"): + reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict) + latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"] + self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms" + + # ============================================= + # starting previously terminated broker + # ============================================= + if bounceLeaderFlag.lower() == "true": + self.log_message("starting the previously terminated broker") + stoppedLeaderEntityId = leaderDict["entity_id"] + kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId) + self.anonLogger.info("sleeping for 5s") time.sleep(5) + i += 1 + # while loop + # tell producer to stop + self.testcaseEnv.lock.acquire() + self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True + time.sleep(1) + self.testcaseEnv.lock.release() + time.sleep(1) + + while 1: + self.testcaseEnv.lock.acquire() + self.logger.info("status of backgroundProducerStopped : [" + \ + str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d) + if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]: + time.sleep(1) + break + time.sleep(1) + self.testcaseEnv.lock.release() + time.sleep(2) + # ============================================= # starting consumer # ============================================= @@ -240,12 +273,15 @@ kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv) self.anonLogger.info("sleeping for 10s") time.sleep(10) - + # this testcase is completed - so stopping all entities self.log_message("stopping all entities") for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items(): kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) - + + # make sure all entities are stopped + kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv) + # validate the data matched # ============================================= self.log_message("validating data matched") @@ -268,10 +304,6 @@ self.testcaseEnv.testCaseDashboardsDir, self.systemTestEnv.clusterEntityConfigDictList) - # stop metrics processes - for entity in self.systemTestEnv.clusterEntityConfigDictList: - metrics.stop_metrics_collection(entity['hostname'], entity['jmx_port']) - except Exception as e: self.log_message("Exception while running test {0}".format(e)) traceback.print_exc() @@ -297,5 +329,4 @@ self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname, "entity_id") kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, producerEntityId, producerPPid) - #kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv) Index: system_test/replication_testsuite/testcase_1/testcase_1_properties.json =================================================================== --- system_test/replication_testsuite/testcase_1/testcase_1_properties.json (revision 1383496) +++ system_test/replication_testsuite/testcase_1/testcase_1_properties.json (working copy) @@ -3,7 +3,10 @@ "testcase_args": { "bounce_leader": "true", "replica_factor": "3", - "num_partition": "1" + "num_partition": "2", + "num_iteration": "2", + "sleep_seconds_between_producer_calls": "1", + "num_messages_to_produce_per_producer_call": "50" }, "entities": [ {