diff --git a/system_test/mirror_maker_testsuite/mirror_maker_test.py b/system_test/mirror_maker_testsuite/mirror_maker_test.py index 48b0d25..70db79b 100644 --- a/system_test/mirror_maker_testsuite/mirror_maker_test.py +++ b/system_test/mirror_maker_testsuite/mirror_maker_test.py @@ -76,6 +76,8 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils): self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX) testCasePathNameList.sort() + replicationUtils = ReplicationUtils(self) + # ============================================================= # launch each testcase one by one: testcase_1, testcase_2, ... # ============================================================= @@ -283,7 +285,7 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils): # ============================================= self.log_message("validating data matched") #kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv) - kafka_system_test_utils.validate_simple_consumer_data_matched(self.systemTestEnv, self.testcaseEnv) + kafka_system_test_utils.validate_simple_consumer_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils) kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv, "target") diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py index 3fc47d9..a5764ec 100644 --- a/system_test/replication_testsuite/replica_basic_test.py +++ b/system_test/replication_testsuite/replica_basic_test.py @@ -77,6 +77,8 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX) testCasePathNameList.sort() + replicationUtils = ReplicationUtils(self) + # ============================================================= # launch each testcase one by one: testcase_1, testcase_2, ... # ============================================================= @@ -424,15 +426,15 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): if logRetentionTest.lower() == "true": kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv) - kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv) + kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils) elif consumerMultiTopicsMode.lower() == "true": #kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv) else: kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv) - #kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) - kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv) - + kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) + kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils) + # ============================================= # draw graphs # ============================================= diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 9411405..11f8653 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -1189,7 +1189,7 @@ def get_message_checksum(logPathName): return messageChecksumList -def validate_data_matched(systemTestEnv, testcaseEnv): +def validate_data_matched(systemTestEnv, testcaseEnv, replicationUtils): validationStatusDict = testcaseEnv.validationStatusDict clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList @@ -1241,14 +1241,21 @@ def validate_data_matched(systemTestEnv, testcaseEnv): validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet)) validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet)) + missingPercentage = len(missingMsgIdInConsumer) * 100.00 / len(producerMsgIdSet) + logger.info("Data Loss Threshold % : " + str(replicationUtils.ackOneDataLossThresholdPercent), extra=d) + logger.warn("Data Loss Current % : " + str(missingPercentage), extra=d) + if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ): validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" elif (acks == "1"): - missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet) - print "#### missing Percent : ", missingPercentage - if missingPercentage <= 1: + if missingPercentage <= replicationUtils.ackOneDataLossThresholdPercent: validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" - logger.warn("Test case passes with less than 1% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d) + logger.warn("Test case (Acks = 1) passes with less than " + str(replicationUtils.ackOneDataLossThresholdPercent) \ + + "% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d) + else: + validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" + logger.error("Test case (Acks = 1) failed with more than " + str(replicationUtils.ackOneDataLossThresholdPercent) \ + + "% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d) else: validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d) @@ -1823,7 +1830,7 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None partitionId += 1 replicaIndex += 1 -def validate_simple_consumer_data_matched(systemTestEnv, testcaseEnv): +def validate_simple_consumer_data_matched(systemTestEnv, testcaseEnv, replicationUtils): validationStatusDict = testcaseEnv.validationStatusDict clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList @@ -1865,6 +1872,12 @@ def validate_simple_consumer_data_matched(systemTestEnv, testcaseEnv): consumerLogPathName = consumerLogPath + "/" + logFile consumerMsgIdList = get_message_id(consumerLogPathName) consumerMsgIdSet = set(consumerMsgIdList) + + if len(consumerMsgIdSet) != len(consumerMsgIdList): + # there are duplicates received by simple consumer + mismatchCount += 1 + logger.error("Duplicate messages detected in " + consumerLogPathName) + missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") + \ @@ -1878,21 +1891,23 @@ def validate_simple_consumer_data_matched(systemTestEnv, testcaseEnv): logger.info("no. of unique messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d) validationStatusDict["Unique messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdSet)) + missingPercentage = len(missingMsgIdInConsumer) * 100.00 / len(producerMsgIdSet) + logger.info("Data Loss Threshold % : " + str(replicationUtils.ackOneDataLossThresholdPercent), extra=d) + logger.warn("Data Loss Current % : " + str(missingPercentage), extra=d) + if acks == "-1" and len(missingMsgIdInConsumer) > 0: mismatchCount += 1 elif acks == "1" and len(missingMsgIdInConsumer) > 0: - missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet) - logger.debug("missing percentage [" + str(missingPercentage) + "]", extra=d) - if missingPercentage <= 1: - logger.warn("Test case (acks == 1) passes with < 1% data loss : [" + \ - str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d) + if missingPercentage <= replicationUtils.ackOneDataLossThresholdPercent: + logger.warn("Test case (Acks = 1) passes with less than " + str(replicationUtils.ackOneDataLossThresholdPercent) \ + + "% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d) else: mismatchCount += 1 - if mismatchCount == 0: - validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" - else: - validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" + if mismatchCount == 0: + validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" + else: + validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" def get_controller_attributes(systemTestEnv, testcaseEnv): @@ -2108,7 +2123,7 @@ def validate_simple_consumer_data_matched_across_replicas(systemTestEnv, testcas validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "FAILED" -def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv, testcaseEnv): +def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv, testcaseEnv, replicationUtils): validationStatusDict = testcaseEnv.validationStatusDict clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList @@ -2160,14 +2175,17 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet)) validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet)) + missingPercentage = len(missingMsgIdInConsumer) * 100.00 / len(producerMsgIdSet) + logger.info("Data Loss Threshold % : " + str(replicationUtils.ackOneDataLossThresholdPercent), extra=d) + logger.warn("Data Loss Current % : " + str(missingPercentage), extra=d) + if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ): validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" elif (acks == "1"): - missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet) - print "#### missing Percent : ", missingPercentage - if missingPercentage <= 1: + if missingPercentage <= replicationUtils.ackOneDataLossThresholdPercent: validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" - logger.warn("Test case passes with less than 1% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d) + logger.warn("Test case (Acks = 1) passes with less than " + str(replicationUtils.ackOneDataLossThresholdPercent) \ + + "% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d) else: validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d) diff --git a/system_test/utils/replication_utils.py b/system_test/utils/replication_utils.py index 3e8efad..cfd80b2 100644 --- a/system_test/utils/replication_utils.py +++ b/system_test/utils/replication_utils.py @@ -65,3 +65,6 @@ class ReplicationUtils(object): self.controllerAttributesDict["REGX_CONTROLLER_STARTUP_PATTERN"] = "\[(.*?)\] .* \[Controller (.*?)\]: " + \ self.controllerAttributesDict["CONTROLLER_STARTUP_COMPLETE_MSG"] + # Data Loss Percentage Threshold in Ack = 1 cases + self.ackOneDataLossThresholdPercent = 5.0 +