Index: system_test/replication_testsuite/testcase_9052/testcase_9052_properties.json =================================================================== --- system_test/replication_testsuite/testcase_9052/testcase_9052_properties.json (revision 0) +++ system_test/replication_testsuite/testcase_9052/testcase_9052_properties.json (revision 0) @@ -0,0 +1,82 @@ +{ + "description": {"01":"Replication Test with 100 topics: Base Test", + "02":"Produce and consume messages to 100 topics - 2 partitions.", + "03":"This test sends messages to 3 replicas", + "04":"At the end it verifies the log size and contents", + "05":"Use a consumer to verify no message loss.", + "06":"Producer dimensions : mode:sync, acks:-1, comp:0", + "07":"Log segment size : 10240" + }, + "testcase_args": { + "broker_type": "leader", + "bounce_broker": "false", + "replica_factor": "3", + "num_partition": "2", + "num_iteration": "1", + "producer_multi_topics_mode": "true", + "consumer_multi_topics_mode": "true", + "auto_create_topic": "true", + "sleep_seconds_between_producer_calls": "5", + "message_producing_free_time_sec": "15", + "num_messages_to_produce_per_producer_call": "50" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "brokerid": "1", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_1_logs", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "brokerid": "2", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_2_logs", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "brokerid": "3", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_3_logs", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020", + "threads": "5", + "compression-codec": "0", + "message-size": "500", + "message": "500", + "request-num-acks": "-1", + "producer-retry-backoff-ms": "3500", + "producer-num-retries": "5", + "async":"false", + "log_filename": "producer_performance_4.log", + "config_filename": "producer_performance_4.properties" + }, + { + "entity_id": "5", + "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020", + "groupid": "mytestgroup", + "consumer-timeout-ms": "60000", + "zookeeper": "localhost:2188", + "log_filename": "console_consumer_5.log", + "config_filename": "console_consumer_5.properties" + } + ] +} Index: system_test/migration_tool_testsuite/migration_tool_test.py =================================================================== --- system_test/migration_tool_testsuite/migration_tool_test.py (revision 1417061) +++ system_test/migration_tool_testsuite/migration_tool_test.py (working copy) @@ -123,21 +123,21 @@ # initialize signal handler signal.signal(signal.SIGINT, self.signal_handler) - # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase - # for collecting logs from remote machines - kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv) - # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file: # system_test/_testsuite/testcase_/testcase__properties.json self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data( self.testcaseEnv.testcasePropJsonPathName) + # clean up data directories specified in zookeeper.properties and kafka_server_.properties + kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase + # for collecting logs from remote machines + kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv) + # TestcaseEnv - initialize producer & consumer config / log file pathnames kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv) - # clean up data directories specified in zookeeper.properties and kafka_server_.properties - kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv) - # generate remote hosts log/config dirs if not exist kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv) Index: system_test/mirror_maker_testsuite/mirror_maker_test.py =================================================================== --- system_test/mirror_maker_testsuite/mirror_maker_test.py (revision 1417061) +++ system_test/mirror_maker_testsuite/mirror_maker_test.py (working copy) @@ -123,22 +123,21 @@ # initialize signal handler signal.signal(signal.SIGINT, self.signal_handler) - - # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase - # for collecting logs from remote machines - kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv) - # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file: # system_test/_testsuite/testcase_/testcase__properties.json self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data( self.testcaseEnv.testcasePropJsonPathName) - - # TestcaseEnv - initialize producer & consumer config / log file pathnames - kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv) - + # clean up data directories specified in zookeeper.properties and kafka_server_.properties kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv) + # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase + # for collecting logs from remote machines + kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv) + + # TestcaseEnv - initialize producer & consumer config / log file pathnames + kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv) + # generate remote hosts log/config dirs if not exist kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv) Index: system_test/replication_testsuite/replica_basic_test.py =================================================================== --- system_test/replication_testsuite/replica_basic_test.py (revision 1417061) +++ system_test/replication_testsuite/replica_basic_test.py (working copy) @@ -123,12 +123,18 @@ logRetentionTest = self.testcaseEnv.testcaseArgumentsDict["log_retention_test"] except: pass + consumerMultiTopicsMode = "false" try: consumerMultiTopicsMode = self.testcaseEnv.testcaseArgumentsDict["consumer_multi_topics_mode"] except: pass + autoCreateTopic = "false" + try: + autoCreateTopic = self.testcaseEnv.testcaseArgumentsDict["auto_create_topic"] + except: + pass # initialize self.testcaseEnv with user-defined environment variables (product specific) self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = "" @@ -139,22 +145,21 @@ # initialize signal handler signal.signal(signal.SIGINT, self.signal_handler) - # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase - # for collecting logs from remote machines - kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv) - # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file: # system_test/_testsuite/testcase_/testcase__properties.json self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data( self.testcaseEnv.testcasePropJsonPathName) - - # TestcaseEnv - initialize producer & consumer config / log file pathnames - kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv) - # clean up data directories specified in zookeeper.properties and kafka_server_.properties kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv) + # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase + # for collecting logs from remote machines + kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv) + + # TestcaseEnv - initialize producer & consumer config / log file pathnames + kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv) + # generate remote hosts log/config dirs if not exist kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv) @@ -180,10 +185,11 @@ self.anonLogger.info("sleeping for 5s") time.sleep(5) - self.log_message("creating topics") - kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv) - self.anonLogger.info("sleeping for 5s") - time.sleep(5) + if autoCreateTopic.lower() == "false": + self.log_message("creating topics") + kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 5s") + time.sleep(5) # ============================================= # start ConsoleConsumer if this is a Log Retention test Index: system_test/utils/kafka_system_test_utils.py =================================================================== --- system_test/utils/kafka_system_test_utils.py (revision 1417061) +++ system_test/utils/kafka_system_test_utils.py (working copy) @@ -131,6 +131,7 @@ hostname = clusterEntityConfigDict["hostname"] entity_id = clusterEntityConfigDict["entity_id"] role = clusterEntityConfigDict["role"] + kafkaHome = clusterEntityConfigDict["kafka_home"] logger.debug("entity_id : " + entity_id, extra=d) logger.debug("hostname : " + hostname, extra=d) @@ -139,12 +140,19 @@ configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config") metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics") logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "default") + rmtLogPathName = logPathName + rmtMetricsPathName = metricsPathName + if hostname != "localhost": + rmtConfigPathName = replace_kafka_home(configPathName, kafkaHome) + rmtMetricsPathName = replace_kafka_home(metricsPathName, kafkaHome) + rmtLogPathName = replace_kafka_home(logPathName, kafkaHome) + # ============================== # collect entity log file # ============================== cmdList = ["scp", - hostname + ":" + logPathName + "/*", + hostname + ":" + rmtLogPathName + "/*", logPathName] cmdStr = " ".join(cmdList) logger.debug("executing command [" + cmdStr + "]", extra=d) @@ -154,7 +162,7 @@ # collect entity metrics file # ============================== cmdList = ["scp", - hostname + ":" + metricsPathName + "/*", + hostname + ":" + rmtMetricsPathName + "/*", metricsPathName] cmdStr = " ".join(cmdList) logger.debug("executing command [" + cmdStr + "]", extra=d) @@ -192,8 +200,13 @@ # collect dashboards file # ============================== dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards") + rmtDashboardsPathName = dashboardsPathName + + if hostname != "localhost": + rmtDashboardsPathName = replace_kafka_home(dashboardsPathName, kafkaHome) + cmdList = ["scp", - hostname + ":" + dashboardsPathName + "/*", + hostname + ":" + rmtDashboardsPathName + "/*", dashboardsPathName] cmdStr = " ".join(cmdList) logger.debug("executing command [" + cmdStr + "]", extra=d) @@ -207,6 +220,7 @@ hostname = clusterEntityConfigDict["hostname"] entity_id = clusterEntityConfigDict["entity_id"] role = clusterEntityConfigDict["role"] + kafkaHome = clusterEntityConfigDict["kafka_home"] logger.debug("entity_id : " + entity_id, extra=d) logger.debug("hostname : " + hostname, extra=d) @@ -216,6 +230,11 @@ metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics") dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards") + if hostname != "localhost": + configPathName = replace_kafka_home(configPathName, kafkaHome) + metricsPathName = replace_kafka_home(metricsPathName, kafkaHome) + dashboardsPathName = replace_kafka_home(dashboardsPathName, kafkaHome) + cmdList = ["ssh " + hostname, "'mkdir -p", configPathName, @@ -443,9 +462,14 @@ for clusterEntityConfigDict in clusterEntityConfigDictList: hostname = clusterEntityConfigDict["hostname"] - testcasePathName = testcaseEnv.testCaseBaseDir + kafkaHome = clusterEntityConfigDict["kafka_home"] + localTestcasePathName = testcaseEnv.testCaseBaseDir + remoteTestcasePathName = localTestcasePathName - cmdStr = "scp " + testcasePathName + "/config/* " + hostname + ":" + testcasePathName + "/config" + if hostname != "localhost": + remoteTestcasePathName = replace_kafka_home(localTestcasePathName, kafkaHome) + + cmdStr = "scp " + localTestcasePathName + "/config/* " + hostname + ":" + remoteTestcasePathName + "/config" logger.debug("executing command [" + cmdStr + "]", extra=d) system_test_utils.sys_call(cmdStr) @@ -586,10 +610,16 @@ hostname = system_test_utils.get_data_by_lookup_keyval( \ clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname") + kafkaHome = system_test_utils.get_data_by_lookup_keyval( \ + clusterEntityConfigDictList, "entity_id", brokerEntityId, "kafka_home") logFile = system_test_utils.get_data_by_lookup_keyval( \ testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename") logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") + + if hostname != "localhost": + logPathName = replace_kafka_home(logPathName, kafkaHome) + cmdStrList = ["ssh " + hostname, "\"grep -i -h '" + leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] + "' ", logPathName + "/" + logFile + " | ", @@ -659,6 +689,10 @@ configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "config") logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "default") + if hostname != "localhost": + configPathName = replace_kafka_home(configPathName, kafkaHome) + logPathName = replace_kafka_home(logPathName, kafkaHome) + if role == "zookeeper": cmdList = ["ssh " + hostname, "'JAVA_HOME=" + javaHome, @@ -728,10 +762,15 @@ jmxPort = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "jmx_port") kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" - logger.info("starting console consumer", extra=d) - consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + metricsDir = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "metrics"), + + if host != "localhost": + consumerLogPath = replace_kafka_home(consumerLogPath, kafkaHome) + #metricsDir = replace_kafka_home(metricsDir, kafkaHome) + consumerLogPathName = consumerLogPath + "/console_consumer.log" testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consumerLogPathName @@ -741,7 +780,6 @@ topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic") timeoutMs = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "consumer-timeout-ms") - formatterOption = "" try: formatterOption = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "formatter") @@ -768,7 +806,7 @@ "--topic " + topic, "--consumer-timeout-ms " + timeoutMs, "--csv-reporter-enabled", - "--metrics-dir " + get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "metrics"), + #"--metrics-dir " + metricsDir, formatterOption, "--from-beginning ", " >> " + consumerLogPathName, @@ -860,7 +898,13 @@ logger.info("starting producer preformance", extra=d) - producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + metricsDir = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "metrics") + + if host != "localhost": + producerLogPath = replace_kafka_home(producerLogPath, kafkaHome) + metricsDir = replace_kafka_home(metricsDir, kafkaHome) + producerLogPathName = producerLogPath + "/producer_performance.log" testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName @@ -898,7 +942,7 @@ "--producer-retry-backoff-ms " + retryBackoffMs, "--producer-num-retries " + numOfRetries, "--csv-reporter-enabled", - "--metrics-dir " + get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "metrics"), + "--metrics-dir " + metricsDir, boolArgumentsStr, " >> " + producerLogPathName, " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"] @@ -1016,6 +1060,11 @@ else: raise Exception("Empty zkConnectStr found") + testcaseBaseDir = testcaseEnv.testCaseBaseDir + + if zkHost != "localhost": + testcaseBaseDir = replace_kafka_home(testcaseBaseDir, kafkaHome) + for topic in topicsList: logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d) cmdList = ["ssh " + zkHost, @@ -1025,7 +1074,7 @@ " --zookeeper " + zkConnectStr, " --replica " + testcaseEnv.testcaseArgumentsDict["replica_factor"], " --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ", - testcaseEnv.testCaseBaseDir + "/logs/create_source_cluster_topic.log'"] + testcaseBaseDir + "/logs/create_source_cluster_topic.log'"] cmdStr = " ".join(cmdList) logger.debug("executing command: [" + cmdStr + "]", extra=d) @@ -1159,17 +1208,38 @@ clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList testcaseConfigsList = testcaseEnv.testcaseConfigsList + testCaseBaseDir = testcaseEnv.testCaseBaseDir + # clean up the following directories in localhost + # system_test//testcase_xxxx/config + # system_test//testcase_xxxx/dashboards + # system_test//testcase_xxxx/logs + logger.info("cleaning up test case dir: [" + testCaseBaseDir + "]", extra=d) + + if "system_test" not in testCaseBaseDir: + logger.warn("possible destructive command [" + cmdStr + "]", extra=d) + logger.warn("check config file: system_test/cluster_config.properties", extra=d) + logger.warn("aborting test...", extra=d) + sys.exit(1) + else: + system_test_utils.sys_call("rm -rf " + testCaseBaseDir + "/config/*") + system_test_utils.sys_call("rm -rf " + testCaseBaseDir + "/dashboards/*") + system_test_utils.sys_call("rm -rf " + testCaseBaseDir + "/logs/*") + for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: hostname = clusterEntityConfigDict["hostname"] entityId = clusterEntityConfigDict["entity_id"] role = clusterEntityConfigDict["role"] kafkaHome = clusterEntityConfigDict["kafka_home"] - testCaseBaseDir = testcaseEnv.testCaseBaseDir cmdStr = "" dataDir = "" + if hostname == "localhost": + remoteTestCaseBaseDir = testCaseBaseDir + else: + remoteTestCaseBaseDir = replace_kafka_home(testCaseBaseDir, kafkaHome) + logger.info("cleaning up data dir on host: [" + hostname + "]", extra=d) if role == 'zookeeper': @@ -1199,26 +1269,31 @@ # ============================ if system_test_utils.remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"): # so kafkaHome is a real kafka installation - cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.log' | xargs rm 2> /dev/null\"" + cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.log' | xargs rm 2> /dev/null\"" logger.debug("executing command [" + cmdStr + "]", extra=d) system_test_utils.sys_call(cmdStr) - cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*_pid' | xargs rm 2> /dev/null\"" + cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*_pid' | xargs rm 2> /dev/null\"" logger.debug("executing command [" + cmdStr + "]", extra=d) system_test_utils.sys_call(cmdStr) - cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.csv' | xargs rm 2> /dev/null\"" + cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.csv' | xargs rm 2> /dev/null\"" logger.debug("executing command [" + cmdStr + "]", extra=d) system_test_utils.sys_call(cmdStr) - cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.svg' | xargs rm 2> /dev/null\"" + cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.svg' | xargs rm 2> /dev/null\"" logger.debug("executing command [" + cmdStr + "]", extra=d) system_test_utils.sys_call(cmdStr) - cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.html' | xargs rm 2> /dev/null\"" + cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.html' | xargs rm 2> /dev/null\"" logger.debug("executing command [" + cmdStr + "]", extra=d) system_test_utils.sys_call(cmdStr) +def replace_kafka_home(systemTestSubDirPath, kafkaHome): + matchObj = re.match(".*(\/system_test\/.*)$", systemTestSubDirPath) + relativeSubDirPath = matchObj.group(1) + return kafkaHome + relativeSubDirPath + def get_entity_log_directory(testCaseBaseDir, entity_id, role): return testCaseBaseDir + "/logs/" + role + "-" + entity_id @@ -1602,6 +1677,9 @@ kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + if host != "localhost": + consumerLogPath = replace_kafka_home(consumerLogPath, kafkaHome) + # testcase configurations: testcaseList = testcaseEnv.testcaseConfigsList topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic")