diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index fb4a9c0..2965c75 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -1479,6 +1479,25 @@ def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict, leaderAttribu def stop_all_remote_running_processes(systemTestEnv, testcaseEnv): + # background producers need to be notified to stop specifically + testcaseEnv.lock.acquire() + testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True + time.sleep(1) + self.testcaseEnv.lock.release() + time.sleep(1) + + # wait for producer thread's update of + # "backgroundProducerStopped" to be "True" + while 1: + self.testcaseEnv.lock.acquire() + self.logger.info("status of backgroundProducerStopped : [" + \ + str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d) + if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]: + self.logger.info("all producer threads completed", extra=self.d) + break + self.testcaseEnv.lock.release() + time.sleep(2) + entityConfigs = systemTestEnv.clusterEntityConfigDictList for hostname, producerPPid in testcaseEnv.producerHostParentPidDict.items():