diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py index ce29240..40c1157 100644 --- a/system_test/replication_testsuite/replica_basic_test.py +++ b/system_test/replication_testsuite/replica_basic_test.py @@ -433,6 +433,8 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv) kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils) + + kafka_system_test_utils.validate_index_log(self.systemTestEnv, self.testcaseEnv) # ============================================= # draw graphs diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 9e58624..dd082f5 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -2129,4 +2129,78 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d) +def validate_index_log(systemTestEnv, testcaseEnv, clusterName="source"): + logger.debug("#### Inside validate_index_log", extra=d) + + failureCount = 0 + brokerLogCksumDict = {} + testCaseBaseDir = testcaseEnv.testCaseBaseDir + tcConfigsList = testcaseEnv.testcaseConfigsList + validationStatusDict = testcaseEnv.validationStatusDict + clusterConfigList = systemTestEnv.clusterEntityConfigDictList + allBrokerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", "broker") + brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts(allBrokerConfigList, "cluster_name", clusterName, "entity_id") + + # loop through all brokers + for brokerEntityId in brokerEntityIdList: + logCksumDict = {} + # remoteLogSegmentPathName : /tmp/kafka_server_4_logs + # => remoteLogSegmentDir : kafka_server_4_logs + remoteLogSegmentPathName = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", brokerEntityId, "log.dir") + remoteLogSegmentDir = os.path.basename(remoteLogSegmentPathName) + logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") + localLogSegmentPath = logPathName + "/" + remoteLogSegmentDir + kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", brokerEntityId, "kafka_home") + hostname = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", brokerEntityId, "hostname") + kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" + + # localLogSegmentPath : + # .../system_test/mirror_maker_testsuite/testcase_5002/logs/broker-4/kafka_server_4_logs + # |- test_1-0 + # |- 00000000000000000000.index + # |- 00000000000000000000.log + # |- 00000000000000000020.index + # |- 00000000000000000020.log + # |- . . . + # |- test_1-1 + # |- 00000000000000000000.index + # |- 00000000000000000000.log + # |- 00000000000000000020.index + # |- 00000000000000000020.log + # |- . . . + + # loop through all topicPartition directories such as : test_1-0, test_1-1, ... + for topicPartition in os.listdir(localLogSegmentPath): + # found a topic-partition directory + if os.path.isdir(localLogSegmentPath + "/" + topicPartition): + + # log segment files are located in : localLogSegmentPath + "/" + topicPartition + # sort the log segment files under each topic-partition and verify index + for logFile in sorted(os.listdir(localLogSegmentPath + "/" + topicPartition)): + # only process index file: *.index + if logFile.endswith(".index"): + offsetLogSegmentPathName = localLogSegmentPath + "/" + topicPartition + "/" + logFile + cmdStrList = ["ssh " + hostname, + kafkaRunClassBin + " kafka.tools.DumpLogSegments", + " --file " + offsetLogSegmentPathName, + "--verify-index-only 2>&1"] + cmdStr = " ".join(cmdStrList) + + showMismatchedIndexOffset = False + + logger.debug("executing command [" + cmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + line = line.rstrip('\n') + if showMismatchedIndexOffset: + logger.debug("#### [" + line + "]", extra=d) + elif "Mismatches in :" in line: + logger.debug("#### error found [" + line + "]", extra=d) + failureCount += 1 + showMismatchedIndexOffset = True + + if failureCount == 0: + validationStatusDict["Validate index log in cluster [" + clusterName + "]"] = "PASSED" + else: + validationStatusDict["Validate index log in cluster [" + clusterName + "]"] = "FAILED"