Index: system_test/replication_testsuite/testcase_9051/cluster_config.json =================================================================== --- system_test/replication_testsuite/testcase_9051/cluster_config.json (revision 1419765) +++ system_test/replication_testsuite/testcase_9051/cluster_config.json (working copy) @@ -2,57 +2,57 @@ "cluster_config": [ { "entity_id": "0", - "hostname": "esv4-app18.corp", + "hostname": "localhost", "role": "zookeeper", "cluster_name": "source", - "kafka_home": "/mnt/u001/kafka_08_replication_system_test", - "java_home": "/export/apps/jdk/JDK-1_6_0_21", - "jmx_port": "9900" + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9990" }, { "entity_id": "1", - "hostname": "esv4-app18.corp", + "hostname": "localhost", "role": "broker", "cluster_name": "source", - "kafka_home": "/mnt/u001/kafka_08_replication_system_test", - "java_home": "/export/apps/jdk/JDK-1_6_0_21", - "jmx_port": "9901" + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9991" }, { "entity_id": "2", - "hostname": "esv4-app19.corp", + "hostname": "localhost", "role": "broker", "cluster_name": "source", - "kafka_home": "/mnt/u001/kafka_08_replication_system_test", - "java_home": "/export/apps/jdk/JDK-1_6_0_21", - "jmx_port": "9902" + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9992" }, { "entity_id": "3", - "hostname": "esv4-app20.corp", + "hostname": "localhost", "role": "broker", "cluster_name": "source", - "kafka_home": "/mnt/u001/kafka_08_replication_system_test", - "java_home": "/export/apps/jdk/JDK-1_6_0_21", - "jmx_port": "9903" + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9993" }, { "entity_id": "4", - "hostname": "esv4-app18.corp", + "hostname": "localhost", "role": "producer_performance", "cluster_name": "source", - "kafka_home": "/mnt/u001/kafka_08_replication_system_test", - "java_home": "/export/apps/jdk/JDK-1_6_0_21", - "jmx_port": "9909" + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9997" }, { "entity_id": "5", - "hostname": "esv4-app18.corp", + "hostname": "localhost", "role": "console_consumer", "cluster_name": "source", - "kafka_home": "/mnt/u001/kafka_08_replication_system_test", - "java_home": "/export/apps/jdk/JDK-1_6_0_21", - "jmx_port": "9910" + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9998" } ] } Index: system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json =================================================================== --- system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json (revision 1419765) +++ system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json (working copy) @@ -20,6 +20,7 @@ "consumer_multi_topics_mode": "true", "sleep_seconds_between_producer_calls": "5", "message_producing_free_time_sec": "15", + "num_topics_for_auto_generated_string": "20", "num_messages_to_produce_per_producer_call": "50" }, "entities": [ @@ -59,7 +60,7 @@ }, { "entity_id": "4", - "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020", + "topic": "t001", "threads": "5", "compression-codec": "0", "message-size": "500", @@ -73,7 +74,7 @@ }, { "entity_id": "5", - "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020", + "topic": "t001", "groupid": "mytestgroup", "consumer-timeout-ms": "60000", "zookeeper": "localhost:2188", Index: system_test/utils/kafka_system_test_utils.py =================================================================== --- system_test/utils/kafka_system_test_utils.py (revision 1419765) +++ system_test/utils/kafka_system_test_utils.py (working copy) @@ -777,9 +777,28 @@ # testcase configurations: testcaseList = testcaseEnv.testcaseConfigsList - topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic") + + # get testcase arguments + # 1. topics + numTopicsForAutoGenString = -1 + try: + numTopicsForAutoGenString = int(testcaseEnv.testcaseArgumentsDict["num_topics_for_auto_generated_string"]) + except: + pass + + topic = "" + if numTopicsForAutoGenString < 0: + topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic") + else: + topic = generate_topics_string("topic", numTopicsForAutoGenString) + + # update this variable and will be used by data validation functions + testcaseEnv.consumerTopicsString = topic + + # 2. consumer timeout timeoutMs = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "consumer-timeout-ms") + # 3. consumer formatter formatterOption = "" try: formatterOption = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "formatter") @@ -789,6 +808,7 @@ if len(formatterOption) > 0: formatterOption = " --formatter " + formatterOption + " " + # get zk.connect zkConnectStr = "" if clusterName == "source": zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] @@ -857,6 +877,34 @@ time.sleep(1) testcaseEnv.lock.release() +def generate_topics_string(topicPrefix, numOfTopics): + # return a topics string in the following format: + # _0001,_0002,... + # eg. "topic_0001,topic_0002,...,topic_xxxx" + + topicsStr = "" + counter = 1 + idx = "1" + while counter <= numOfTopics: + if counter <= 9: + idx = "000" + str(counter) + elif counter <= 99: + idx = "00" + str(counter) + elif counter <= 999: + idx = "0" + str(counter) + elif counter <= 9999: + idx = str(counter) + else: + raise Exception("Error: no. of topics must be under 10000 - current topics count : " + counter) + + if len(topicsStr) == 0: + topicsStr = topicPrefix + "_" + idx + else: + topicsStr = topicsStr + "," + topicPrefix + "_" + idx + + counter += 1 + return topicsStr + def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafka07Client): host = producerConfig["hostname"] entityId = producerConfig["entity_id"] @@ -868,9 +916,24 @@ jmxPort = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "jmx_port") kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" + # get optional testcase arguments + numTopicsForAutoGenString = -1 + try: + numTopicsForAutoGenString = int(testcaseEnv.testcaseArgumentsDict["num_topics_for_auto_generated_string"]) + except: + pass + # testcase configurations: testcaseConfigsList = testcaseEnv.testcaseConfigsList - topic = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "topic") + topic = "" + if numTopicsForAutoGenString < 0: + topic = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "topic") + else: + topic = generate_topics_string("topic", numTopicsForAutoGenString) + + # update this variable and will be used by data validation functions + testcaseEnv.producerTopicsString = topic + threads = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "threads") compCodec = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "compression-codec") messageSize = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message-size") @@ -1125,7 +1188,8 @@ for prodPerfCfg in prodPerfCfgList: producerEntityId = prodPerfCfg["entity_id"] - topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") + #topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") + topic = testcaseEnv.producerTopicsString acks = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks") consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \ @@ -1133,7 +1197,8 @@ matchingConsumerEntityId = None for consumerEntityId in consumerEntityIdList: - consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") + #consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") + consumerTopic = testcaseEnv.consumerTopicsString if consumerTopic in topic: matchingConsumerEntityId = consumerEntityId break @@ -2042,14 +2107,14 @@ for prodPerfCfg in prodPerfCfgList: producerEntityId = prodPerfCfg["entity_id"] - topicStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") + topicStr = testcaseEnv.producerTopicsString acks = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks") consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer", "entity_id") matchingConsumerEntityId = None for consumerEntityId in consumerEntityIdList: - consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") + consumerTopic = testcaseEnv.consumerTopicsString if consumerTopic in topicStr: matchingConsumerEntityId = consumerEntityId break Index: system_test/utils/testcase_env.py =================================================================== --- system_test/utils/testcase_env.py (revision 1419765) +++ system_test/utils/testcase_env.py (working copy) @@ -127,6 +127,11 @@ self.numProducerThreadsRunning = 0 + # to be used when validating data match - these variables will be + # updated by kafka_system_test_utils.start_producer_in_thread + self.producerTopicsString = "" + self.consumerTopicsString = "" + def initWithKnownTestCasePathName(self, testCasePathName): testcaseDirName = os.path.basename(testCasePathName) self.testcaseResultsDict["_test_case_name"] = testcaseDirName