diff --git a/system_test/README.txt b/system_test/README.txt index 0e469e3..f9972d1 100644 --- a/system_test/README.txt +++ b/system_test/README.txt @@ -48,7 +48,7 @@ The framework has the following levels: # ========================== * Please note that the following commands should be executed after downloading the kafka source code to build all the required binaries: - 1. / $ ./gradlew jar + 1. / $ ./sbt update package Now you are ready to follow the steps below. 1. Update system_test/cluster_config.json for "kafka_home" & "java_home" specific to your environment diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index de02e47..d026407 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -117,7 +117,7 @@ def generate_testcase_log_dirs(systemTestEnv, testcaseEnv): # create the role directory under dashboards dashboardsRoleDir = dashboardsPathName + "/" + role if not os.path.exists(dashboardsRoleDir) : os.makedirs(dashboardsRoleDir) - + def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): anonLogger.info("================================================") @@ -212,7 +212,7 @@ def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): logger.debug("executing command [" + cmdStr + "]", extra=d) system_test_utils.sys_call(cmdStr) - + def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv): testCaseBaseDir = testcaseEnv.testCaseBaseDir @@ -432,9 +432,9 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv sys.exit(1) addedCSVConfig = {} - addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", clusterCfg["entity_id"], "metrics") - addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" - addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" + addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", clusterCfg["entity_id"], "metrics") + addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" + addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" addedCSVConfig["kafka.csv.metrics.reporter.enabled"] = "true" if brokerVersion == "0.7": @@ -466,7 +466,7 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv tcCfg["zookeeper.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] copy_file_with_dict_values(cfgTemplatePathname + "/mirror_consumer.properties", cfgDestPathname + "/" + tcCfg["mirror_consumer_config_filename"], tcCfg, None) - + else: logger.debug("UNHANDLED role " + clusterCfg["role"], extra=d) @@ -495,7 +495,7 @@ def scp_file_to_remote_host(clusterEntityConfigDictList, testcaseEnv): def start_zookeepers(systemTestEnv, testcaseEnv): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - zkEntityIdList = system_test_utils.get_data_from_list_of_dicts( + zkEntityIdList = system_test_utils.get_data_from_list_of_dicts( clusterEntityConfigDictList, "role", "zookeeper", "entity_id") for zkEntityId in zkEntityIdList: @@ -534,7 +534,7 @@ def start_zookeepers(systemTestEnv, testcaseEnv): def start_brokers(systemTestEnv, testcaseEnv): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( clusterEntityConfigDictList, "role", "broker", "entity_id") for brokerEntityId in brokerEntityIdList: @@ -558,7 +558,7 @@ def start_mirror_makers(systemTestEnv, testcaseEnv, onlyThisEntityId=None): start_entity_in_background(systemTestEnv, testcaseEnv, onlyThisEntityId) else: clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( clusterEntityConfigDictList, "role", "mirror_maker", "entity_id") for brokerEntityId in brokerEntityIdList: @@ -571,17 +571,17 @@ def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv, leaderAttributesDic # keep track of broker related data in this dict such as broker id, # entity id and timestamp and return it to the caller function - shutdownBrokerDict = {} + shutdownBrokerDict = {} clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( clusterEntityConfigDictList, "role", "broker", "entity_id") for brokerEntityId in brokerEntityIdList: - hostname = system_test_utils.get_data_by_lookup_keyval( + hostname = system_test_utils.get_data_by_lookup_keyval( clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname") - logFile = system_test_utils.get_data_by_lookup_keyval( + logFile = system_test_utils.get_data_by_lookup_keyval( testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename") logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") @@ -629,7 +629,7 @@ def get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict # keep track of leader related data in this dict such as broker id, # entity id and timestamp and return it to the caller function - leaderDict = {} + leaderDict = {} clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \ @@ -754,7 +754,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): "--whitelist=\".*\" >> ", logPathName + "/" + logFile + " & echo pid:$! > ", logPathName + "/entity_" + entityId + "_pid'"] - else: + else: cmdList = ["ssh " + hostname, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, @@ -793,13 +793,19 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): pass # 4. group - groupOption = "" + consumerProperties = {} + consumerProperties["consumer.timeout.ms"] = timeoutMs try: groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "group.id") - groupOption = "--group " + groupOption + consumerProperties["group.id"] = groupOption except: pass + props_file_path=write_consumer_properties(consumerProperties) + scpCmdStr = "scp "+ props_file_path + hostname + ":/tmp/" + logger.debug("executing command [" + scpCmdStr + "]", extra=d) + system_test_utils.sys_call(scpCmdStr) + if len(formatterOption) > 0: formatterOption = " --formatter " + formatterOption + " " @@ -818,7 +824,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): kafkaHome + "/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer", "--zookeeper " + zkConnectStr, "--topic " + topic, - "--consumer-timeout-ms " + timeoutMs, + "--consumer.config /tmp/consumer.properties", "--csv-reporter-enabled", groupOption, formatterOption, @@ -861,9 +867,9 @@ def start_console_consumer(systemTestEnv, testcaseEnv): for consumerConfig in consumerConfigList: host = consumerConfig["hostname"] entityId = consumerConfig["entity_id"] - jmxPort = consumerConfig["jmx_port"] + jmxPort = consumerConfig["jmx_port"] role = consumerConfig["role"] - clusterName = consumerConfig["cluster_name"] + clusterName = consumerConfig["cluster_name"] kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "java_home") jmxPort = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "jmx_port") @@ -925,13 +931,20 @@ def start_console_consumer(systemTestEnv, testcaseEnv): logger.error("Invalid cluster name : " + clusterName, extra=d) sys.exit(1) + consumerProperties = {} + consumerProperties["consumer.timeout.ms"] = timeoutMs + props_file_path=write_consumer_properties(consumerProperties) + scpCmdStr = "scp "+ props_file_path + host + ":/tmp/" + logger.debug("executing command [" + scpCmdStr + "]", extra=d) + system_test_utils.sys_call(scpCmdStr) + cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, kafkaRunClassBin + " kafka.consumer.ConsoleConsumer", "--zookeeper " + zkConnectStr, "--topic " + topic, - "--consumer-timeout-ms " + timeoutMs, + "--consumer.config /tmp/consumer.properties", "--csv-reporter-enabled", #"--metrics-dir " + metricsDir, formatterOption, @@ -974,8 +987,8 @@ def start_producer_performance(systemTestEnv, testcaseEnv, kafka07Client): for producerConfig in producerConfigList: host = producerConfig["hostname"] entityId = producerConfig["entity_id"] - jmxPort = producerConfig["jmx_port"] - role = producerConfig["role"] + jmxPort = producerConfig["jmx_port"] + role = producerConfig["role"] thread.start_new_thread(start_producer_in_thread, (testcaseEnv, entityConfigList, producerConfig, kafka07Client)) logger.debug("calling testcaseEnv.lock.acquire()", extra=d) @@ -1017,7 +1030,7 @@ def generate_topics_string(topicPrefix, numOfTopics): def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafka07Client): host = producerConfig["hostname"] entityId = producerConfig["entity_id"] - jmxPort = producerConfig["jmx_port"] + jmxPort = producerConfig["jmx_port"] role = producerConfig["role"] clusterName = producerConfig["cluster_name"] kafkaHome = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "kafka_home") @@ -1255,7 +1268,7 @@ def create_topic_for_producer_performance(systemTestEnv, testcaseEnv): testcaseBaseDir = replace_kafka_home(testcaseBaseDir, kafkaHome) for topic in topicsList: - logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d) + logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d) cmdList = ["ssh " + zkHost, "'JAVA_HOME=" + javaHome, createTopicBin, @@ -1264,7 +1277,7 @@ def create_topic_for_producer_performance(systemTestEnv, testcaseEnv): " --replication-factor " + testcaseEnv.testcaseArgumentsDict["replica_factor"], " --partitions " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ", testcaseBaseDir + "/logs/create_source_cluster_topic.log'"] - + cmdStr = " ".join(cmdList) logger.debug("executing command: [" + cmdStr + "]", extra=d) subproc = system_test_utils.sys_call_return_subproc(cmdStr) @@ -1556,7 +1569,7 @@ def ps_grep_terminate_running_entity(systemTestEnv): cmdStr = " ".join(cmdList) logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.sys_call(cmdStr) def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict, leaderAttributesDict): leaderEntityId = None @@ -1613,7 +1626,7 @@ def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict, leaderAttribu if shutdownTimestamp > 0: leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownTimestamp) logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec", extra=d) - + return leaderReElectionLatency @@ -1649,7 +1662,7 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv): break logger.debug("calling testcaseEnv.lock.release()", extra=d) testcaseEnv.lock.release() - + testcaseEnv.producerHostParentPidDict.clear() for hostname, consumerPPid in testcaseEnv.consumerHostParentPidDict.items(): @@ -1684,8 +1697,8 @@ def start_migration_tool(systemTestEnv, testcaseEnv, onlyThisEntityId=None): if onlyThisEntityId is None or entityId == onlyThisEntityId: host = migrationToolConfig["hostname"] - jmxPort = migrationToolConfig["jmx_port"] - role = migrationToolConfig["role"] + jmxPort = migrationToolConfig["jmx_port"] + role = migrationToolConfig["role"] kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "java_home") jmxPort = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "jmx_port") @@ -1751,7 +1764,7 @@ def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv): producerEntityId = prodPerfCfg["entity_id"] topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") - consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( clusterEntityConfigDictList, "role", "console_consumer", "entity_id") matchingConsumerEntityId = None @@ -1765,7 +1778,7 @@ def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv): if matchingConsumerEntityId is None: break - msgChecksumMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( + msgChecksumMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") \ + "/msg_checksum_missing_in_consumer.log" producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default") @@ -1850,7 +1863,7 @@ def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName # |- 00000000000000000020.log # |- . . . - # loop through all topicPartition directories such as : test_1-0, test_1-1, ... + # loop through all topicPartition directories such as : test_1-0, test_1-1, ... for topicPartition in os.listdir(localLogSegmentPath): # found a topic-partition directory if os.path.isdir(localLogSegmentPath + "/" + topicPartition): @@ -1903,7 +1916,7 @@ def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName # 'test_2-0' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'], # 'test_2-1' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'] # } - + for brokerTopicPartitionKey, md5Checksum in brokerLogCksumDict.items(): tokens = brokerTopicPartitionKey.split(":") brokerKey = tokens[0] @@ -1929,7 +1942,7 @@ def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName logger.debug("merged log segment checksum in " + topicPartition + " matched", extra=d) else: logger.error("unexpected error in " + topicPartition, extra=d) - + if failureCount == 0: validationStatusDict["Validate for merged log segment checksum in cluster [" + clusterName + "]"] = "PASSED" else: @@ -1942,8 +1955,8 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None for consumerConfig in consumerConfigList: host = consumerConfig["hostname"] entityId = consumerConfig["entity_id"] - jmxPort = consumerConfig["jmx_port"] - clusterName = consumerConfig["cluster_name"] + jmxPort = consumerConfig["jmx_port"] + clusterName = consumerConfig["cluster_name"] kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "java_home") kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" @@ -2007,16 +2020,16 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None "--no-wait-at-logend ", " > " + outputFilePathName, " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] - + cmdStr = " ".join(cmdList) - + logger.debug("executing command: [" + cmdStr + "]", extra=d) subproc_1 = system_test_utils.sys_call_return_subproc(cmdStr) # dummy for-loop to wait until the process is completed for line in subproc_1.stdout.readlines(): - pass + pass time.sleep(1) - + partitionId += 1 replicaIndex += 1 @@ -2025,7 +2038,7 @@ def get_controller_attributes(systemTestEnv, testcaseEnv): logger.info("Querying Zookeeper for Controller info ...", extra=d) # keep track of controller data in this dict such as broker id & entity id - controllerDict = {} + controllerDict = {} clusterConfigsList = systemTestEnv.clusterEntityConfigDictList tcConfigsList = testcaseEnv.testcaseConfigsList @@ -2080,7 +2093,7 @@ def getMinCommonStartingOffset(systemTestEnv, testcaseEnv, clusterName="source") logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") localLogSegmentPath = logPathName + "/" + remoteLogSegmentDir - # loop through all topicPartition directories such as : test_1-0, test_1-1, ... + # loop through all topicPartition directories such as : test_1-0, test_1-1, ... for topicPartition in sorted(os.listdir(localLogSegmentPath)): # found a topic-partition directory if os.path.isdir(localLogSegmentPath + "/" + topicPartition): @@ -2119,7 +2132,7 @@ def getMinCommonStartingOffset(systemTestEnv, testcaseEnv, clusterName="source") # u'3:test_2-0': '0', # u'3:test_2-1': '0'} - # loop through brokerLogStartOffsetDict to get the min common starting offset for each topic-partition + # loop through brokerLogStartOffsetDict to get the min common starting offset for each topic-partition for brokerTopicPartition in sorted(brokerLogStartOffsetDict.iterkeys()): topicPartition = brokerTopicPartition.split(':')[1] @@ -2434,7 +2447,7 @@ def get_leader_attributes(systemTestEnv, testcaseEnv): logger.info("Querying Zookeeper for leader info ...", extra=d) # keep track of leader data in this dict such as broker id & entity id - leaderDict = {} + leaderDict = {} clusterConfigsList = systemTestEnv.clusterEntityConfigDictList tcConfigsList = testcaseEnv.testcaseConfigsList @@ -2483,5 +2496,11 @@ def get_leader_attributes(systemTestEnv, testcaseEnv): print leaderDict return leaderDict - - +def write_consumer_properties(consumerProperties): + import tempfile + props_file_path = tempfile.gettempdir() + "/consumer.properties" + consumer_props_file=open(props_file_path,"w") + for key,value in consumerProperties.iteritems(): + consumer_props_file.write(key+"="+value+"\n") + consumer_props_file.close() + return props_file_path