diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index de02e47..887fe05 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -792,14 +792,20 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): except: pass - # 4. group - groupOption = "" - try: + # 4. consumer config + consumerProperties = {} + consumerProperties["consumer.timeout.ms"] = timeoutMs + try: groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "group.id") - groupOption = "--group " + groupOption + consumerProperties["group.id"] = groupOption except: pass + props_file_path=write_consumer_properties(consumerProperties) + scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/" + logger.debug("executing command [" + scpCmdStr + "]", extra=d) + system_test_utils.sys_call(scpCmdStr) + if len(formatterOption) > 0: formatterOption = " --formatter " + formatterOption + " " @@ -818,9 +824,8 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): kafkaHome + "/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer", "--zookeeper " + zkConnectStr, "--topic " + topic, - "--consumer-timeout-ms " + timeoutMs, + "--consumer.config /tmp/consumer.properties", "--csv-reporter-enabled", - groupOption, formatterOption, "--from-beginning", " >> " + logPathName + "/" + logFile + " & echo pid:$! > ", @@ -925,13 +930,20 @@ def start_console_consumer(systemTestEnv, testcaseEnv): logger.error("Invalid cluster name : " + clusterName, extra=d) sys.exit(1) + consumerProperties = {} + consumerProperties["consumer.timeout.ms"] = timeoutMs + props_file_path=write_consumer_properties(consumerProperties) + scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/" + logger.debug("executing command [" + scpCmdStr + "]", extra=d) + system_test_utils.sys_call(scpCmdStr) + cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, kafkaRunClassBin + " kafka.consumer.ConsoleConsumer", "--zookeeper " + zkConnectStr, "--topic " + topic, - "--consumer-timeout-ms " + timeoutMs, + "--consumer.config /tmp/consumer.properties", "--csv-reporter-enabled", #"--metrics-dir " + metricsDir, formatterOption, @@ -2484,4 +2496,12 @@ def get_leader_attributes(systemTestEnv, testcaseEnv): return leaderDict +def write_consumer_properties(consumerProperties): + import tempfile + props_file_path = tempfile.gettempdir() + "/consumer.properties" + consumer_props_file=open(props_file_path,"w") + for key,value in consumerProperties.iteritems(): + consumer_props_file.write(key+"="+value+"\n") + consumer_props_file.close() + return props_file_path