diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index fb4a9c0..3129eb9 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -933,6 +933,9 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk jmxPort = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "jmx_port") kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" + # first keep track of its pid + testcaseEnv.producerHostParentPidDict[entityId] = os.getpid() + # get optional testcase arguments numTopicsForAutoGenString = -1 try: @@ -1090,6 +1093,9 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk testcaseEnv.lock.release() time.sleep(1) + # finally remove itself from the tracking pids + del testcaseEnv.producerHostParentPidDict[entityId] + def stop_remote_entity(systemTestEnv, entityId, parentPid, signalType="SIGTERM"): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList @@ -1481,9 +1487,32 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv): entityConfigs = systemTestEnv.clusterEntityConfigDictList - for hostname, producerPPid in testcaseEnv.producerHostParentPidDict.items(): - producerEntityId = system_test_utils.get_data_by_lookup_keyval(entityConfigs, "hostname", hostname, "entity_id") - stop_remote_entity(systemTestEnv, producerEntityId, producerPPid) + # If there are any alive local threads that keep starting remote producer performance, we need to kill them; + # note we do not need to stop remote processes since they will terminate themselves eventually. + if len(testcaseEnv.producerHostParentPidDict) != 0: + # ============================================= + # tell producer to stop + # ============================================= + testcaseEnv.lock.acquire() + testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True + time.sleep(1) + testcaseEnv.lock.release() + + # ============================================= + # wait for producer thread's update of + # "backgroundProducerStopped" to be "True" + # ============================================= + while 1: + testcaseEnv.lock.acquire() + logger.info("status of backgroundProducerStopped : [" + \ + str(testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=d) + if testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]: + time.sleep(1) + logger.info("all producer threads completed", extra=d) + break + testcaseEnv.lock.release() + + testcaseEnv.producerHostParentPidDict.clear() for hostname, consumerPPid in testcaseEnv.consumerHostParentPidDict.items(): consumerEntityId = system_test_utils.get_data_by_lookup_keyval(entityConfigs, "hostname", hostname, "entity_id")