Index: system_test/replication_testsuite/replica_basic_test.py =================================================================== --- system_test/replication_testsuite/replica_basic_test.py (revision 1410492) +++ system_test/replication_testsuite/replica_basic_test.py (working copy) @@ -123,7 +123,13 @@ logRetentionTest = self.testcaseEnv.testcaseArgumentsDict["log_retention_test"] except: pass + consumerMultiTopicsMode = "false" + try: + consumerMultiTopicsMode = self.testcaseEnv.testcaseArgumentsDict["consumer_multi_topics_mode"] + except: + pass + # initialize self.testcaseEnv with user-defined environment variables (product specific) self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = "" self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = False @@ -363,8 +369,8 @@ # ============================================= minStartingOffsetDict = None if logRetentionTest.lower() == "true": - self.anonLogger.info("sleeping for 10s before collecting logs") - time.sleep(10) + self.anonLogger.info("sleeping for 60s to make sure log truncation is completed") + time.sleep(60) kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv) minStartingOffsetDict = kafka_system_test_utils.getMinCommonStartingOffset(self.systemTestEnv, self.testcaseEnv) @@ -374,10 +380,19 @@ # ============================================= # starting debug consumer # ============================================= - self.log_message("starting debug consumers in the background") - kafka_system_test_utils.start_simple_consumer(self.systemTestEnv, self.testcaseEnv, minStartingOffsetDict) - self.anonLogger.info("sleeping for 10s") - time.sleep(10) + if consumerMultiTopicsMode.lower() == "false": + self.log_message("starting debug consumers in the background") + kafka_system_test_utils.start_simple_consumer(self.systemTestEnv, self.testcaseEnv, minStartingOffsetDict) + self.anonLogger.info("sleeping for 10s") + time.sleep(10) + + # ============================================= + # starting console consumer + # ============================================= + if logRetentionTest.lower() == "false": + self.log_message("starting consumer in the background") + kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv) + time.sleep(1) # ============================================= # this testcase is completed - stop all entities @@ -396,7 +411,7 @@ # collect logs from remote hosts # ============================================= kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv) - + # ============================================= # validate the data matched and checksum # ============================================= @@ -405,10 +420,13 @@ if logRetentionTest.lower() == "true": kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv) kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv) + elif consumerMultiTopicsMode.lower() == "true": + kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) + kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv) else: - #kafka_system_test_utils.validate_simple_consumer_data_matched(self.systemTestEnv, self.testcaseEnv) kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv) kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) + kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv) # ============================================= # draw graphs Index: system_test/replication_testsuite/testcase_9051/cluster_config.json =================================================================== --- system_test/replication_testsuite/testcase_9051/cluster_config.json (revision 0) +++ system_test/replication_testsuite/testcase_9051/cluster_config.json (revision 0) @@ -0,0 +1,107 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9900" + }, + + + { + "entity_id": "1", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9901" + }, + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9902" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9903" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9904" + }, + { + "entity_id": "5", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9905" + }, + { + "entity_id": "6", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9906" + }, + { + "entity_id": "7", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9907" + }, + { + "entity_id": "8", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9908" + }, + + + { + "entity_id": "9", + "hostname": "localhost", + "role": "producer_performance", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9909" + }, + { + "entity_id": "10", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9910" + } + ] +} Index: system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json =================================================================== --- system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json (revision 0) +++ system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json (revision 0) @@ -0,0 +1,129 @@ +{ + "description": {"01":"To Test : 'Leader Failure in Replication'", + "02":"Produce and consume messages to 300 topics - 4 partitions.", + "03":"This test sends messages to 3 replicas", + "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)", + "05":"Restart the terminated broker", + "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully", + "07":"At the end it verifies the log size and contents", + "08":"Use a consumer to verify no message loss.", + "09":"Producer dimensions : mode:sync, acks:-1, comp:0", + "10":"Log segment size : 1048576" + }, + "testcase_args": { + "broker_type": "leader", + "bounce_broker": "false", + "replica_factor": "8", + "num_partition": "2", + "num_iteration": "1", + "producer_multi_topics_mode": "true", + "consumer_multi_topics_mode": "true", + "sleep_seconds_between_producer_calls": "5", + "message_producing_free_time_sec": "15", + "num_messages_to_produce_per_producer_call": "50" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "brokerid": "1", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_1_logs", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "brokerid": "2", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_2_logs", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "brokerid": "3", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_3_logs", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "port": "9094", + "brokerid": "4", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_4_logs", + "log_filename": "kafka_server_9094.log", + "config_filename": "kafka_server_9094.properties" + }, + { + "entity_id": "5", + "port": "9095", + "brokerid": "5", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_5_logs", + "log_filename": "kafka_server_9095.log", + "config_filename": "kafka_server_9095.properties" + }, + { + "entity_id": "6", + "port": "9096", + "brokerid": "6", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_6_logs", + "log_filename": "kafka_server_9096.log", + "config_filename": "kafka_server_9096.properties" + }, + { + "entity_id": "7", + "port": "9097", + "brokerid": "7", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_7_logs", + "log_filename": "kafka_server_9097.log", + "config_filename": "kafka_server_9097.properties" + }, + { + "entity_id": "8", + "port": "9098", + "brokerid": "8", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_8_logs", + "log_filename": "kafka_server_9098.log", + "config_filename": "kafka_server_9098.properties" + }, + { + "entity_id": "9", + "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020", + "threads": "5", + "compression-codec": "0", + "message-size": "500", + "message": "500", + "request-num-acks": "-1", + "producer-retry-backoff-ms": "100", + "producer-num-retries": "5", + "async":"false", + "log_filename": "producer_performance_9.log", + "config_filename": "producer_performance_9.properties" + }, + { + "entity_id": "10", + "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020", + "groupid": "mytestgroup", + "consumer-timeout-ms": "60000", + "zookeeper": "localhost:2188", + "log_filename": "console_consumer_10.log", + "config_filename": "console_consumer_10.properties" + } + ] +} Index: system_test/testcase_to_run.json =================================================================== --- system_test/testcase_to_run.json (revision 1410492) +++ system_test/testcase_to_run.json (working copy) @@ -1,5 +1,5 @@ { "ReplicaBasicTest" : [ - "testcase_1" + "testcase_9051" ] } Index: system_test/utils/kafka_system_test_utils.py =================================================================== --- system_test/utils/kafka_system_test_utils.py (revision 1410492) +++ system_test/utils/kafka_system_test_utils.py (working copy) @@ -996,47 +996,43 @@ prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance") for prodPerfCfg in prodPerfCfgList: - topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfg["entity_id"], "topic") + topicsStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfg["entity_id"], "topic") zkEntityId = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "entity_id") zkHost = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname") kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home") createTopicBin = kafkaHome + "/bin/kafka-create-topic.sh" - logger.debug("zkEntityId : " + zkEntityId, extra=d) + logger.debug("zkEntityId : " + zkEntityId, extra=d) logger.debug("createTopicBin : " + createTopicBin, extra=d) + zkConnectStr = "" + topicsList = topicsStr.split(',') + if len(testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]) > 0: - logger.info("creating topic: [" + topic + "] at: [" + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + "]", extra=d) + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + elif len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) > 0: + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + else: + raise Exception("Empty zkConnectStr found") + + for topic in topicsList: + logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d) cmdList = ["ssh " + zkHost, "'JAVA_HOME=" + javaHome, createTopicBin, " --topic " + topic, - " --zookeeper " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"], + " --zookeeper " + zkConnectStr, " --replica " + testcaseEnv.testcaseArgumentsDict["replica_factor"], " --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ", testcaseEnv.testCaseBaseDir + "/logs/create_source_cluster_topic.log'"] - + cmdStr = " ".join(cmdList) logger.debug("executing command: [" + cmdStr + "]", extra=d) subproc = system_test_utils.sys_call_return_subproc(cmdStr) - if len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) > 0: - logger.info("creating topic: [" + topic + "] at: [" + testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + "]", extra=d) - cmdList = ["ssh " + zkHost, - "'JAVA_HOME=" + javaHome, - createTopicBin, - " --topic " + topic, - " --zookeeper " + testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"], - " --replica " + testcaseEnv.testcaseArgumentsDict["replica_factor"], - " --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ", - testcaseEnv.testCaseBaseDir + "/logs/create_target_cluster_topic.log'"] - cmdStr = " ".join(cmdList) - logger.debug("executing command: [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) - -def get_message_id(logPathName): +def get_message_id(logPathName, topic=""): logLines = open(logPathName, "r").readlines() messageIdList = [] @@ -1044,8 +1040,12 @@ if not "MessageID" in line: continue else: - matchObj = re.match('.*MessageID:(.*?):', line) - messageIdList.append( matchObj.group(1) ) + matchObj = re.match('.*Topic:(.*?):.*:MessageID:(.*?):', line) + if len(topic) == 0: + messageIdList.append( matchObj.group(2) ) + else: + if topic == matchObj.group(1): + messageIdList.append( matchObj.group(2) ) return messageIdList @@ -1949,14 +1949,75 @@ logger.info("topic " + topic + " : no. of brokers with non-zero msg count : " + str(nonZeroMsgCounter), extra=d) logger.info("topic " + topic + " : non-zero brokers msg count : " + str(nonZeroMsgValue), extra=d) - if mismatchCounter == 0: + if mismatchCounter == 0 and nonZeroMsgCounter > 0: validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "PASSED" else: validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "FAILED" +def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv, testcaseEnv): + validationStatusDict = testcaseEnv.validationStatusDict + clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance") + consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer") + for prodPerfCfg in prodPerfCfgList: + producerEntityId = prodPerfCfg["entity_id"] + topicStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") + acks = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks") + consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer", "entity_id") + matchingConsumerEntityId = None + for consumerEntityId in consumerEntityIdList: + consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") + if consumerTopic in topicStr: + matchingConsumerEntityId = consumerEntityId + break + if matchingConsumerEntityId is None: + break + + producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default") + producerLogPathName = producerLogPath + "/producer_performance.log" + + consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") + consumerLogPathName = consumerLogPath + "/console_consumer.log" + + topicList = topicStr.split(',') + for topic in topicList: + msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( + testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") \ + + "/msg_id_missing_in_consumer_" + topic + ".log" + producerMsgIdList = get_message_id(producerLogPathName, topic) + consumerMsgIdList = get_message_id(consumerLogPathName, topic) + producerMsgIdSet = set(producerMsgIdList) + consumerMsgIdSet = set(consumerMsgIdList) + + missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet + + outfile = open(msgIdMissingInConsumerLogPathName, "w") + for id in missingMsgIdInConsumer: + outfile.write(id + "\n") + outfile.close() + + logger.info("no. of unique messages on topic [" + topic + "] sent from publisher : " + str(len(producerMsgIdSet)), extra=d) + logger.info("no. of unique messages on topic [" + topic + "] received by consumer : " + str(len(consumerMsgIdSet)), extra=d) + validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet)) + validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet)) + + if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ): + validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" + elif (acks == "1"): + missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet) + print "#### missing Percent : ", missingPercentage + if missingPercentage <= 1: + validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" + logger.warn("Test case passes with less than 1% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d) + else: + validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" + logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d) + + +