diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py index 40c1157..17414ad 100644 --- a/system_test/replication_testsuite/replica_basic_test.py +++ b/system_test/replication_testsuite/replica_basic_test.py @@ -231,7 +231,7 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): # ============================================== if brokerType == "leader" or brokerType == "follower": self.log_message("looking up leader") - leaderDict = kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv, self.testcaseEnv, self.leaderAttributesDict) + leaderDict = kafka_system_test_utils.get_leader_attributes(self.systemTestEnv, self.testcaseEnv) # ========================== # leaderDict looks like this: @@ -285,10 +285,10 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): kafka_system_test_utils.validate_leader_election_successful(self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict) # trigger leader re-election by stopping leader to get re-election latency - reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict) - latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"] - self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms" - self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency * 1000)) + #reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict) + #latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"] + #self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms" + #self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency * 1000)) elif brokerType == "follower": # stopping Follower @@ -330,19 +330,19 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): # while loop # update Leader Election Latency MIN/MAX to testcaseEnv.validationStatusDict - self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = None - try: - self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = \ - min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"]) - except: - pass - - self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = None - try: - self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = \ - max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"]) - except: - pass + #self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = None + #try: + # self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = \ + # min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"]) + #except: + # pass + # + #self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = None + #try: + # self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = \ + # max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"]) + #except: + # pass # ============================================= # tell producer to stop diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index ae393bc..de16a34 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -2211,3 +2211,60 @@ def validate_index_log(systemTestEnv, testcaseEnv, clusterName="source"): else: validationStatusDict["Validate index log in cluster [" + clusterName + "]"] = "FAILED" +def get_leader_attributes(systemTestEnv, testcaseEnv): + + logger.info("Querying Zookeeper for leader info ...", extra=d) + + # keep track of leader data in this dict such as broker id & entity id + leaderDict = {} + + clusterConfigsList = systemTestEnv.clusterEntityConfigDictList + tcConfigsList = testcaseEnv.testcaseConfigsList + + zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") + firstZkDict = zkDictList[0] + hostname = firstZkDict["hostname"] + zkEntityId = firstZkDict["entity_id"] + clientPort = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", zkEntityId, "clientPort") + kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home") + javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home") + kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" + + # this should have been updated in start_producer_in_thread + producerTopicsString = testcaseEnv.producerTopicsString + topics = producerTopicsString.split(',') + zkQueryStr = "get /brokers/topics/" + topics[0] + "/partitions/0/state" + brokerid = '' + + cmdStrList = ["ssh " + hostname, + "\"JAVA_HOME=" + javaHome, + kafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain", + "-server " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"], + zkQueryStr + " 2> /dev/null | tail -1\""] + cmdStr = " ".join(cmdStrList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + logger.debug("zk returned : " + line, extra=d) + if "\"leader\"" in line: + line = line.rstrip('\n') + json_data = json.loads(line) + for key,val in json_data.items(): + if key == 'leader': + brokerid = str(val) + + leaderDict["brokerid"] = brokerid + leaderDict["topic"] = topics[0] + leaderDict["partition"] = '0' + leaderDict["entity_id"] = system_test_utils.get_data_by_lookup_keyval( + tcConfigsList, "broker.id", brokerid, "entity_id") + leaderDict["hostname"] = system_test_utils.get_data_by_lookup_keyval( + clusterConfigsList, "entity_id", leaderDict["entity_id"], "hostname") + break + + print leaderDict + return leaderDict + + +