From 56015a57225b5e23721dec4881d208c0519f77aa Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Sun, 10 Aug 2014 21:18:11 -0700 Subject: [PATCH] KAFKA-1582; System Test should wait for producer to finish --- system_test/utils/kafka_system_test_utils.py | 34 +++++++++++++++------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 6edd64a..fcacf0a 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -792,19 +792,19 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): except: pass - # 4. consumer config - consumerProperties = {} - consumerProperties["consumer.timeout.ms"] = timeoutMs - try: + # 4. consumer config + consumerProperties = {} + consumerProperties["consumer.timeout.ms"] = timeoutMs + try: groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "group.id") consumerProperties["group.id"] = groupOption except: pass - props_file_path=write_consumer_properties(consumerProperties) - scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/" - logger.debug("executing command [" + scpCmdStr + "]", extra=d) - system_test_utils.sys_call(scpCmdStr) + props_file_path=write_consumer_properties(consumerProperties) + scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/" + logger.debug("executing command [" + scpCmdStr + "]", extra=d) + system_test_utils.sys_call(scpCmdStr) if len(formatterOption) > 0: formatterOption = " --formatter " + formatterOption + " " @@ -930,12 +930,12 @@ def start_console_consumer(systemTestEnv, testcaseEnv): logger.error("Invalid cluster name : " + clusterName, extra=d) sys.exit(1) - consumerProperties = {} - consumerProperties["consumer.timeout.ms"] = timeoutMs - props_file_path=write_consumer_properties(consumerProperties) - scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/" - logger.debug("executing command [" + scpCmdStr + "]", extra=d) - system_test_utils.sys_call(scpCmdStr) + consumerProperties = {} + consumerProperties["consumer.timeout.ms"] = timeoutMs + props_file_path=write_consumer_properties(consumerProperties) + scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/" + logger.debug("executing command [" + scpCmdStr + "]", extra=d) + system_test_utils.sys_call(scpCmdStr) cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, @@ -1136,7 +1136,8 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk "--metrics-dir " + metricsDir, boolArgumentsStr, " >> " + producerLogPathName, - " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"] + " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid", + " & wait'"] if kafka07Client: cmdList[:] = [] @@ -1167,7 +1168,8 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk "--message-size " + messageSize, "--vary-message-size --async", " >> " + producerLogPathName, - " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"] + " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid", + " & wait'"] cmdStr = " ".join(cmdList) logger.debug("executing command: [" + cmdStr + "]", extra=d) -- 1.7.12.4