From 516ccbae3ed62638fcf85ef094e04d9ceca5bcc6 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Fri, 8 Aug 2014 14:50:46 -0700 Subject: [PATCH] KAFKA-1582; System Test should wait for producer to finish --- system_test/utils/kafka_system_test_utils.py | 44 ++++++++++++++-------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 6edd64a..0fedfd5 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -792,19 +792,19 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): except: pass - # 4. consumer config - consumerProperties = {} - consumerProperties["consumer.timeout.ms"] = timeoutMs - try: + # 4. consumer config + consumerProperties = {} + consumerProperties["consumer.timeout.ms"] = timeoutMs + try: groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "group.id") consumerProperties["group.id"] = groupOption except: pass - props_file_path=write_consumer_properties(consumerProperties) - scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/" - logger.debug("executing command [" + scpCmdStr + "]", extra=d) - system_test_utils.sys_call(scpCmdStr) + props_file_path=write_consumer_properties(consumerProperties) + scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/" + logger.debug("executing command [" + scpCmdStr + "]", extra=d) + system_test_utils.sys_call(scpCmdStr) if len(formatterOption) > 0: formatterOption = " --formatter " + formatterOption + " " @@ -930,12 +930,12 @@ def start_console_consumer(systemTestEnv, testcaseEnv): logger.error("Invalid cluster name : " + clusterName, extra=d) sys.exit(1) - consumerProperties = {} - consumerProperties["consumer.timeout.ms"] = timeoutMs - props_file_path=write_consumer_properties(consumerProperties) - scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/" - logger.debug("executing command [" + scpCmdStr + "]", extra=d) - system_test_utils.sys_call(scpCmdStr) + consumerProperties = {} + consumerProperties["consumer.timeout.ms"] = timeoutMs + props_file_path=write_consumer_properties(consumerProperties) + scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/" + logger.debug("executing command [" + scpCmdStr + "]", extra=d) + system_test_utils.sys_call(scpCmdStr) cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, @@ -1118,9 +1118,10 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk + str(noMsgPerBatch) + "] messages with starting message id : [" + str(initMsgId) + "]", extra=d) cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, + "'echo $! > " + producerLogPath + "/entity_" + entityId + "_pid", + "JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, - "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome, + " & KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome, kafkaRunClassBin + " kafka.tools.ProducerPerformance", "--broker-list " + brokerListStr, "--initial-message-id " + str(initMsgId), @@ -1135,8 +1136,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk "--csv-reporter-enabled", "--metrics-dir " + metricsDir, boolArgumentsStr, - " >> " + producerLogPathName, - " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"] + " >> " + producerLogPathName + "'"] if kafka07Client: cmdList[:] = [] @@ -1154,9 +1154,10 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk brokerInfoStr = "broker.list=" + brokerInfoStr cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, + "'echo $! > " + producerLogPath + "/entity_" + entityId + "_pid", + "JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, - "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome, + " & KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome, kafkaRunClassBin + " kafka.tools.ProducerPerformance", "--brokerinfo " + brokerInfoStr, "--initial-message-id " + str(initMsgId), @@ -1166,8 +1167,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk "--compression-codec " + compCodec, "--message-size " + messageSize, "--vary-message-size --async", - " >> " + producerLogPathName, - " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"] + " >> " + producerLogPathName + "'"] cmdStr = " ".join(cmdList) logger.debug("executing command: [" + cmdStr + "]", extra=d) -- 1.7.12.4