diff --git a/system_test/mirror_maker_testsuite/mirror_maker_test.py b/system_test/mirror_maker_testsuite/mirror_maker_test.py index 4dc3cdf..1db66fe 100644 --- a/system_test/mirror_maker_testsuite/mirror_maker_test.py +++ b/system_test/mirror_maker_testsuite/mirror_maker_test.py @@ -165,18 +165,27 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils): self.anonLogger.info("sleeping for 5s") time.sleep(5) + self.log_message("creating topics") + kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 5s") + time.sleep(5) + self.log_message("starting mirror makers") kafka_system_test_utils.start_mirror_makers(self.systemTestEnv, self.testcaseEnv) self.anonLogger.info("sleeping for 10s") time.sleep(10) - #self.log_message("creating topics") - #kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv) - #self.anonLogger.info("sleeping for 5s") - #time.sleep(5) - - + ''' + # ============================================= + # starting consumer + # ============================================= + self.log_message("starting consumer in the background") + kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 5s") + time.sleep(5) + ''' + # ============================================= # starting producer # ============================================= @@ -256,6 +265,7 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils): self.anonLogger.info("sleeping for 15s") time.sleep(15) + self.anonLogger.info("terminate Mirror Maker") cmdStr = "ps auxw | grep Mirror | grep -v grep | tr -s ' ' | cut -f2 -d ' ' | xargs kill -15" subproc = system_test_utils.sys_call_return_subproc(cmdStr) @@ -264,15 +274,20 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils): self.anonLogger.info("#### ["+line+"]") self.anonLogger.info("sleeping for 15s") time.sleep(15) - + # ============================================= # starting consumer # ============================================= - self.log_message("starting consumer in the background") - kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv) - self.anonLogger.info("sleeping for 10s") - time.sleep(10) - + #self.log_message("starting consumer in the background") + #kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv) + #self.anonLogger.info("sleeping for 5s") + #time.sleep(5) + + self.log_message("starting debug consumers in the background") + kafka_system_test_utils.start_simple_consumer_keyed_message(self.systemTestEnv, self.testcaseEnv, "target") + self.anonLogger.info("sleeping for 5s") + time.sleep(5) + # ============================================= # this testcase is completed - stop all entities # ============================================= @@ -295,7 +310,8 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils): # validate the data matched and checksum # ============================================= self.log_message("validating data matched") - kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils) + #kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils) + kafka_system_test_utils.validate_keyed_message_id_matched(self.systemTestEnv, self.testcaseEnv) kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv, "source") kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv, "target") diff --git a/system_test/mirror_maker_testsuite/testcase_5007/cluster_config.json b/system_test/mirror_maker_testsuite/testcase_5007/cluster_config.json new file mode 100644 index 0000000..108612d --- /dev/null +++ b/system_test/mirror_maker_testsuite/testcase_5007/cluster_config.json @@ -0,0 +1,152 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9100" + }, + + + { + "entity_id": "1", + "hostname": "localhost", + "role": "zookeeper", + "cluster_name":"target", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9101" + }, + + + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9102" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "broker", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9103" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "broker", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9104" + }, + + + { + "entity_id": "5", + "hostname": "localhost", + "role": "broker", + "cluster_name":"target", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9105" + }, + { + "entity_id": "6", + "hostname": "localhost", + "role": "broker", + "cluster_name":"target", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9106" + }, + { + "entity_id": "7", + "hostname": "localhost", + "role": "broker", + "cluster_name":"target", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9107" + }, + { + "entity_id": "8", + "hostname": "localhost", + "role": "broker", + "cluster_name":"target", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9108" + }, + + + { + "entity_id": "9", + "hostname": "localhost", + "role": "producer_performance", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9109" + }, + + + { + "entity_id": "10", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name":"target", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9110" + }, + { + "entity_id": "11", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name":"target", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9111" + }, + { + "entity_id": "12", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name":"target", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9112" + }, + { + "entity_id": "13", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name":"target", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9113" + }, + + + { + "entity_id": "14", + "hostname": "localhost", + "role": "mirror_maker", + "cluster_name":"target", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9114" + } + + ] +} diff --git a/system_test/mirror_maker_testsuite/testcase_5007/testcase_5007_properties.json b/system_test/mirror_maker_testsuite/testcase_5007/testcase_5007_properties.json new file mode 100644 index 0000000..30955a8 --- /dev/null +++ b/system_test/mirror_maker_testsuite/testcase_5007/testcase_5007_properties.json @@ -0,0 +1,181 @@ +{ + "description": {"01":"To Test : 'Replication with Mirror Maker'", + "02":"Set up 2 clusters such as : SOURCE => MirrorMaker => TARGET", + "03":"Set up 1-node Zk cluster for both SOURCE & TARGET", + "04":"Produce and consume messages to a single topic - single partition.", + "05":"This test sends messages to 2 replicas", + "06":"At the end it verifies the log size and contents", + "07":"Use a consumer to verify no message loss in TARGET cluster.", + "08":"Producer dimensions : mode:sync, acks:-1, comp:0", + "09":"Log segment size : 10240" + }, + "testcase_args": { + "bounce_leader": "false", + "bounce_mirror_maker": "false", + "replica_factor": "2", + "num_partition": "4", + "num_iteration": "1", + "sleep_seconds_between_producer_calls": "2", + "message_producing_free_time_sec": "2", + "num_messages_to_produce_per_producer_call": "10" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2108", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_0.log", + "config_filename": "zookeeper_0.properties" + }, + + + { + "entity_id": "1", + "clientPort": "2128", + "dataDir": "/tmp/zookeeper_1", + "log_filename": "zookeeper_1.log", + "config_filename": "zookeeper_1.properties" + }, + + + { + "entity_id": "2", + "port": "9091", + "broker.id": "1", + "log.segment.bytes": "10240000", + "log.dir": "/tmp/kafka_server_2_logs", + "default.replication.factor": "2", + "num.partitions": "4", + "log_filename": "kafka_server_2.log", + "config_filename": "kafka_server_2.properties" + }, + { + "entity_id": "3", + "port": "9092", + "broker.id": "2", + "log.segment.bytes": "10240000", + "log.dir": "/tmp/kafka_server_3_logs", + "default.replication.factor": "2", + "num.partitions": "4", + "log_filename": "kafka_server_3.log", + "config_filename": "kafka_server_3.properties" + }, + { + "entity_id": "4", + "port": "9093", + "broker.id": "3", + "log.segment.bytes": "10240000", + "log.dir": "/tmp/kafka_server_4_logs", + "default.replication.factor": "2", + "num.partitions": "4", + "log_filename": "kafka_server_4.log", + "config_filename": "kafka_server_4.properties" + }, + + + { + "entity_id": "5", + "port": "9094", + "broker.id": "4", + "log.segment.bytes": "10240000", + "log.dir": "/tmp/kafka_server_5_logs", + "default.replication.factor": "2", + "num.partitions": "4", + "log_filename": "kafka_server_5.log", + "config_filename": "kafka_server_5.properties" + }, + { + "entity_id": "6", + "port": "9095", + "broker.id": "5", + "log.segment.bytes": "10240000", + "log.dir": "/tmp/kafka_server_6_logs", + "default.replication.factor": "2", + "num.partitions": "4", + "log_filename": "kafka_server_6.log", + "config_filename": "kafka_server_6.properties" + }, + { + "entity_id": "7", + "port": "9096", + "broker.id": "6", + "log.segment.bytes": "10240000", + "log.dir": "/tmp/kafka_server_7_logs", + "default.replication.factor": "2", + "num.partitions": "4", + "log_filename": "kafka_server_7.log", + "config_filename": "kafka_server_7.properties" + }, + { + "entity_id": "8", + "port": "9097", + "broker.id": "7", + "log.segment.bytes": "10240000", + "log.dir": "/tmp/kafka_server_8_logs", + "default.replication.factor": "2", + "num.partitions": "4", + "log_filename": "kafka_server_8.log", + "config_filename": "kafka_server_8.properties" + }, + + + { + "entity_id": "9", + "topic": "test_1", + "threads": "1", + "compression-codec": "0", + "message-size": "500", + "message": "10", + "request-num-acks": "-1", + "sync":"true", + "producer-num-retries":"5", + "log_filename": "producer_performance.log", + "config_filename": "producer_performance_9.properties" + }, + + + { + "entity_id": "10", + "topic": "test_1", + "group": "mytestgroup", + "consumer-timeout-ms": "30000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer_10.properties" + }, + { + "entity_id": "11", + "topic": "test_1", + "group": "mytestgroup", + "consumer-timeout-ms": "30000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer_11.properties" + }, + { + "entity_id": "12", + "topic": "test_1", + "group": "mytestgroup", + "consumer-timeout-ms": "30000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer_12.properties" + }, + { + "entity_id": "13", + "topic": "test_1", + "group": "mytestgroup", + "consumer-timeout-ms": "30000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer_13.properties" + }, + + + { + "entity_id": "14", + "log_filename": "mirror_maker_14.log", + "mirror_num_producers": "2", + "mirror_num_streams": "3", + "mirror_consumer_config_filename": "mirror_consumer_14.properties", + "mirror_producer_config_filename": "mirror_producer_14.properties" + } + + ] +} diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py index 40c1157..17414ad 100644 --- a/system_test/replication_testsuite/replica_basic_test.py +++ b/system_test/replication_testsuite/replica_basic_test.py @@ -231,7 +231,7 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): # ============================================== if brokerType == "leader" or brokerType == "follower": self.log_message("looking up leader") - leaderDict = kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv, self.testcaseEnv, self.leaderAttributesDict) + leaderDict = kafka_system_test_utils.get_leader_attributes(self.systemTestEnv, self.testcaseEnv) # ========================== # leaderDict looks like this: @@ -285,10 +285,10 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): kafka_system_test_utils.validate_leader_election_successful(self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict) # trigger leader re-election by stopping leader to get re-election latency - reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict) - latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"] - self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms" - self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency * 1000)) + #reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict) + #latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"] + #self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms" + #self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency * 1000)) elif brokerType == "follower": # stopping Follower @@ -330,19 +330,19 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): # while loop # update Leader Election Latency MIN/MAX to testcaseEnv.validationStatusDict - self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = None - try: - self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = \ - min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"]) - except: - pass - - self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = None - try: - self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = \ - max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"]) - except: - pass + #self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = None + #try: + # self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = \ + # min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"]) + #except: + # pass + # + #self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = None + #try: + # self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = \ + # max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"]) + #except: + # pass # ============================================= # tell producer to stop diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index ae393bc..24c6fd1 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -700,6 +700,10 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): "mirror_consumer_config_filename") mmProducerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "mirror_producer_config_filename") + mmNumProducers = "" + mmNumStreams = "" + mmNumProducers = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "mirror_num_producers") + mmNumStreams = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "mirror_num_streams") logger.info("starting " + role + " in host [" + hostname + "] on client port [" + clientPort + "]", extra=d) @@ -722,20 +726,26 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): elif role == "broker": cmdList = ["ssh " + hostname, "'JAVA_HOME=" + javaHome, - "JMX_PORT=" + jmxPort, + "JMX_PORT=" + jmxPort, kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka", configPathName + "/" + configFile + " >> ", logPathName + "/" + logFile + " & echo pid:$! > ", logPathName + "/entity_" + entityId + "_pid'"] elif role == "mirror_maker": + additionalArgs = "" + if len(mmNumProducers) > 0: + additionalArgs += " --num.producers " + mmNumProducers + " " + if len(mmNumStreams) > 0: + additionalArgs += " --num.streams " + mmNumStreams + " " + cmdList = ["ssh " + hostname, "'JAVA_HOME=" + javaHome, - "JMX_PORT=" + jmxPort, + "JMX_PORT=" + jmxPort, kafkaHome + "/bin/kafka-run-class.sh kafka.tools.MirrorMaker", "--consumer.config " + configPathName + "/" + mmConsumerConfigFile, "--producer.config " + configPathName + "/" + mmProducerConfigFile, - "--whitelist=\".*\" >> ", + "--whitelist=\".*\" " + additionalArgs + " >> ", logPathName + "/" + logFile + " & echo pid:$! > ", logPathName + "/entity_" + entityId + "_pid'"] @@ -825,6 +835,16 @@ def start_console_consumer(systemTestEnv, testcaseEnv): if len(formatterOption) > 0: formatterOption = " --formatter " + formatterOption + " " + # 4. group + groupOption = "" + try: + groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "group") + except: + pass + + if len(groupOption) > 0: + groupOption = " --group " + groupOption + " " + # get zookeeper connect string zkConnectStr = "" if clusterName == "source": @@ -845,6 +865,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv): "--csv-reporter-enabled", #"--metrics-dir " + metricsDir, formatterOption, + groupOption, "--from-beginning ", " >> " + consumerLogPathName, " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] @@ -2211,3 +2232,275 @@ def validate_index_log(systemTestEnv, testcaseEnv, clusterName="source"): else: validationStatusDict["Validate index log in cluster [" + clusterName + "]"] = "FAILED" +def get_leader_attributes(systemTestEnv, testcaseEnv, clusterName="source", partitionId="0"): + + logger.info("Querying Zookeeper for leader info ...", extra=d) + + # keep track of leader data in this dict such as broker id & entity id + leaderDict = {} + + clusterConfigsList = systemTestEnv.clusterEntityConfigDictList + tcConfigsList = testcaseEnv.testcaseConfigsList + + zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") + firstZkDict = zkDictList[0] + hostname = firstZkDict["hostname"] + zkEntityId = firstZkDict["entity_id"] + clientPort = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", zkEntityId, "clientPort") + kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home") + javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home") + kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" + + # this should have been updated in start_producer_in_thread + producerTopicsString = testcaseEnv.producerTopicsString + topics = producerTopicsString.split(',') + zkQueryStr = "get /brokers/topics/" + topics[0] + "/partitions/" + partitionId + "/state" + brokerid = '' + + zkConnStr = "" + + if clusterName == "source": + zkConnStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + else: + zkConnStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + + cmdStrList = ["ssh " + hostname, + "\"JAVA_HOME=" + javaHome, + kafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain", + "-server " + zkConnStr, + zkQueryStr + " 2> /dev/null | tail -1\""] + cmdStr = " ".join(cmdStrList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + logger.debug("zk returned : " + line, extra=d) + if "\"leader\"" in line: + line = line.rstrip('\n') + json_data = json.loads(line) + for key,val in json_data.items(): + if key == 'leader': + brokerid = str(val) + + leaderDict["brokerid"] = brokerid + leaderDict["topic"] = topics[0] + leaderDict["partition"] = partitionId + leaderDict["entity_id"] = system_test_utils.get_data_by_lookup_keyval( + tcConfigsList, "broker.id", brokerid, "entity_id") + leaderDict["hostname"] = system_test_utils.get_data_by_lookup_keyval( + clusterConfigsList, "entity_id", leaderDict["entity_id"], "hostname") + break + + print leaderDict + return leaderDict + +def get_keyed_message_id_dict(logPathName, topic=""): + logLines = open(logPathName, "r").readlines() + keyedMessageIdDict = {} + + # keyedMessageIdDict would look like this: + # { + # "key_1" : [ "0001", "0003", "0005", "0007"], + # "key_2" : [ "0002", "0004", "0006", "0008"] + # } + + for line in logLines: + if not "MessageID" in line: + continue + else: + try: + matchObj = re.match('.*Topic:(.*?):Key:(.*?):.*:MessageID:(.*?):', line) + key = matchObj.group(2) + if key not in keyedMessageIdDict: + #logger.debug("#### 0", extra=d) + keyedMessageIdDict[key] = [] + + #logger.debug("#### 1", extra=d) + if len(topic) == 0: + #logger.debug("#### 2", extra=d) + keyedMessageIdDict[key].append( matchObj.group(3) ) + else: + #logger.debug("#### 3", extra=d) + if topic == matchObj.group(1): + #logger.debug("#### 4", extra=d) + keyedMessageIdDict[key].append( matchObj.group(3) ) + except: + logger.error("logline of MessageID with key : " + line, extra=d) + + #pprint.pprint(keyedMessageIdDict) + + return keyedMessageIdDict + +def validate_producer_consumer_keyed_message_id_matched(producerKeyedMessageIdDict, consumerKeyedMessageIdDict): + logger.debug("#### Inside validate_producer_consumer_keyed_message_id_matched", extra=d) + + diffCount = 0 + + # producerKeyedMessageIdDict : + # { + # "key_1" : [ "0001", "0003", "0005", "0007"], + # "key_2" : [ "0002", "0004", "0006", "0008"] + # } + # + # consumerKeyedMessageIdDict : + # { "key_1" : [ "0001", "0003", "0005", "0007"] } + # or + # { "key_2" : [ "0002", "0004", "0006", "0008"] } + # + + consumerKeys = consumerKeyedMessageIdDict.keys() + + if len(consumerKeys) != 1: + logger.error("Unexpected no. of consumer keys : " + str(len(consumerKeys)), extra=d) + + if len(producerKeyedMessageIdDict) > 0 and len(consumerKeyedMessageIdDict) > 0: + for key in sorted(producerKeyedMessageIdDict.iterkeys()): + try: + consumerKeyMessageIdList = consumerKeyedMessageIdDict[key] + except: + # this key doesn't exist in consumerKeyedMessageIdDict => so skip this key + continue + producerKeyMessageIdList = producerKeyedMessageIdDict[key] + + # get the count of mismatch MessageID between first MessageID list and the other lists + #diffCount = system_test_utils.diff_lists(producerKeyMessageIdList, consumerKeyMessageIdList) + diffCount = system_test_utils.lists_diff_count(producerKeyMessageIdList, consumerKeyMessageIdList) + logger.info("Mismatch count : " + str(diffCount), extra=d) + else: + return -1 + + return diffCount + + +def validate_keyed_message_id_matched(systemTestEnv, testcaseEnv): + logger.debug("#### Inside validate_keyed_message_id_matched", extra=d) + + totalDiffCount = 0 + + validationStatusDict = testcaseEnv.validationStatusDict + clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList + consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + clusterEntityConfigDictList, "role", "console_consumer", "entity_id") + + prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance") + + for prodPerfCfg in prodPerfCfgList: + producerEntityId = prodPerfCfg["entity_id"] + producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default") + producerLogPathName = producerLogPath + "/producer_performance.log" + + # get topic string from producer "entity" + topicStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") + + # the topic string could be multi topics separated by ',' + topicList = topicStr.split(',') + + for topic in topicList: + logger.debug("working on topic : " + topic, extra=d) + + producerKeyedMessageIdDict = get_keyed_message_id_dict(producerLogPathName, topic) + logger.debug("#### producerKeyedMessageIdDict : ", extra=d) + print producerKeyedMessageIdDict + print + + for consumerEntityId in consumerEntityIdList: + consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", consumerEntityId, "default") + consumerLogPathName = consumerLogPath + "/console_consumer.log" + + consumerKeyedMessageIdDict = get_keyed_message_id_dict(consumerLogPathName, topic) + logger.debug("#### consumerKeyedMessageIdDict : ", extra=d) + print consumerKeyedMessageIdDict + + diffCount = validate_producer_consumer_keyed_message_id_matched(producerKeyedMessageIdDict, consumerKeyedMessageIdDict) + if diffCount > 0: + print "#### mismatched consumer keyed message id : ", consumerKeyedMessageIdDict + totalDiffCount += diffCount + print + + if totalDiffCount == 0: + validationStatusDict["Validate keyed message id matched for [" + topic + "]"] = "PASSED" + else: + validationStatusDict["Validate keyed message id matched for [" + topic + "]"] = "FAILED" + + +def start_simple_consumer_keyed_message(systemTestEnv, testcaseEnv, clusterName, minStartingOffsetDict=None): + + clusterConfigList = systemTestEnv.clusterEntityConfigDictList + allBrokerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", "broker") + brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts(allBrokerConfigList, "cluster_name", clusterName, "entity_id") + consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", "console_consumer") + + partitionId = 0 + startingOffset = -2 + + for consumerConfig in consumerConfigList: + host = consumerConfig["hostname"] + entityId = consumerConfig["entity_id"] + jmxPort = consumerConfig["jmx_port"] + clusterName = consumerConfig["cluster_name"] + kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "kafka_home") + javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "java_home") + kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" + consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + + if host != "localhost": + consumerLogPath = replace_kafka_home(consumerLogPath, kafkaHome) + + # testcase configurations: + testcaseList = testcaseEnv.testcaseConfigsList + topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic") + numPartitions = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", brokerEntityIdList[0], "num.partitions") + + brokerListStr = "" + if clusterName == "source": + brokerListStr = testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"] + elif clusterName == "target": + brokerListStr = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] + else: + logger.error("Invalid cluster name : " + clusterName, extra=d) + raise Exception("Invalid cluster name : " + clusterName) + + if len(brokerListStr) == 0: + logger.error("Empty broker list str", extra=d) + raise Exception("Empty broker list str") + + if numPartitions is None: + logger.error("Invalid no. of partitions: " + numPartitions, extra=d) + raise Exception("Invalid no. of partitions: " + numPartitions) + else: + numPartitions = int(numPartitions) + + leaderDict = get_leader_attributes(systemTestEnv, testcaseEnv, "target", str(partitionId)) + replicaId = leaderDict["brokerid"] + + logger.info("starting debug consumer for replica id [" + str(replicaId) + "] partition [" + str(partitionId) + "]", extra=d) + + if minStartingOffsetDict is not None: + topicPartition = topic + "-" + str(partitionId) + startingOffset = minStartingOffsetDict[topicPartition] + + outputFilePathName = consumerLogPath + "/console_consumer.log" + + cmdList = ["ssh " + host, + "'JAVA_HOME=" + javaHome, + kafkaRunClassBin + " kafka.tools.SimpleConsumerShell", + "--broker-list " + brokerListStr, + "--topic " + topic, + "--partition " + str(partitionId), + "--replica " + str(replicaId), + "--offset " + str(startingOffset), + "--no-wait-at-logend ", + " > " + outputFilePathName, + " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] + + cmdStr = " ".join(cmdList) + + logger.debug("executing command: [" + cmdStr + "]", extra=d) + subproc_1 = system_test_utils.sys_call_return_subproc(cmdStr) + # dummy for-loop to wait until the process is completed + for line in subproc_1.stdout.readlines(): + pass + time.sleep(1) + + partitionId += 1 +