diff --git a/system_test/logging.conf b/system_test/logging.conf index e9e9213..afc024d 100644 --- a/system_test/logging.conf +++ b/system_test/logging.conf @@ -35,7 +35,7 @@ propagate=0 # ============================================== [handler_namedConsoleHandler] class=StreamHandler -level=INFO +level=DEBUG formatter=namedFormatter args=[] diff --git a/system_test/testcase_to_run.json b/system_test/testcase_to_run.json index 8252860..f15e19c 100644 --- a/system_test/testcase_to_run.json +++ b/system_test/testcase_to_run.json @@ -1,5 +1,5 @@ { - "ReplicaBasicTest" : [ - "testcase_0001" + "UncleanShutdownTest" : [ + "testcase_9071" ] } diff --git a/system_test/unclean_shutdown_testsuite/__init__.py b/system_test/unclean_shutdown_testsuite/__init__.py new file mode 100644 index 0000000..8d1c8b6 --- /dev/null +++ b/system_test/unclean_shutdown_testsuite/__init__.py @@ -0,0 +1 @@ + diff --git a/system_test/unclean_shutdown_testsuite/config/server.properties b/system_test/unclean_shutdown_testsuite/config/server.properties new file mode 100644 index 0000000..8ef65ba --- /dev/null +++ b/system_test/unclean_shutdown_testsuite/config/server.properties @@ -0,0 +1,139 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +brokerid=0 + +# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned +# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost +# may not be what you want. +#hostname= + + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9091 + +# The number of threads handling network requests +network.threads=2 + +# The number of threads doing disk I/O +io.threads=2 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +max.socket.request.bytes=104857600 + + +############################# Log Basics ############################# + +# The directory under which to store log files +log.dir=/tmp/kafka_server_logs + +# The number of logical partitions per topic per server. More partitions allow greater parallelism +# for consumption, but also mean more files. +num.partitions=5 + +# Overrides for for the default given by num.partitions on a per-topic basis +#topic.partition.count.map=topic1:3, topic2:4 + +############################# Log Flush Policy ############################# + +# The following configurations control the flush of data to disk. This is the most +# important performance knob in kafka. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. +# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). +# 3. Throughput: The flush is generally the most expensive operation. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +log.default.flush.interval.ms=1000 + +# Per-topic overrides for log.default.flush.interval.ms +#topic.flush.intervals.ms=topic1:1000, topic2:3000 + +# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. +log.default.flush.scheduler.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.size. +#log.retention.size=1073741824 +log.retention.size=-1 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +#log.file.size=536870912 +log.file.size=102400 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.cleanup.interval.mins=1 + +############################# Zookeeper ############################# + +# Enable connecting to zookeeper +enable.zookeeper=true + +# Zk connection string (see zk docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zk.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 + +monitoring.period.secs=1 +max.message.size=1000000 +max.queued.requests=500 +log.roll.hours=168 +log.index.max.size=10485760 +log.index.interval.bytes=4096 +auto.create.topics=true +controller.socket.timeout.ms=30000 +controller.message.queue.size=10 +default.replication.factor=1 +replica.max.lag.time.ms=10000 +replica.max.lag.bytes=4000 +replica.socket.timeout.ms=30000 +replica.socket.buffersize=65536 +replica.fetch.size=1048576 +replica.fetch.wait.time.ms=500 +replica.fetch.min.bytes=4096 +replica.fetchers=1 diff --git a/system_test/unclean_shutdown_testsuite/config/zookeeper.properties b/system_test/unclean_shutdown_testsuite/config/zookeeper.properties new file mode 100644 index 0000000..74cbf90 --- /dev/null +++ b/system_test/unclean_shutdown_testsuite/config/zookeeper.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=/tmp/zookeeper +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/system_test/unclean_shutdown_testsuite/testcase_9071/cluster_config.json b/system_test/unclean_shutdown_testsuite/testcase_9071/cluster_config.json new file mode 100644 index 0000000..8e3d63b --- /dev/null +++ b/system_test/unclean_shutdown_testsuite/testcase_9071/cluster_config.json @@ -0,0 +1,49 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9990" + }, + { + "entity_id": "1", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9991" + }, + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9992" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "producer_performance", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9997" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9998" + } + ] +} diff --git a/system_test/unclean_shutdown_testsuite/testcase_9071/testcase_9071_properties.json b/system_test/unclean_shutdown_testsuite/testcase_9071/testcase_9071_properties.json new file mode 100644 index 0000000..4e02bb5 --- /dev/null +++ b/system_test/unclean_shutdown_testsuite/testcase_9071/testcase_9071_properties.json @@ -0,0 +1,84 @@ +{ + "description": {"01":"To Test : 'Unclean Shutdown'", + "02":"Produce and consume messages to 1 topic - 1 partition", + "03":"This test sends messages to 2 replicas", + "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)", + "05":"Restart the terminated broker", + "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully", + "07":"At the end it verifies the log size and contents", + "08":"Use a consumer to verify no message loss.", + "09":"Producer dimensions : mode:async, acks:-1, comp:1", + "10":"Log segment size : 1048576" + }, + "testcase_args": { + "broker_type": "controller", + "bounce_broker": "true", + "replica_factor": "2", + "num_partition": "1", + "num_iteration": "1", + "auto_create_topic": "false", + "sleep_seconds_between_producer_calls": "10", + "message_producing_free_time_sec": "15", + "num_topics_for_auto_generated_string": "-1", + "broker_down_time_in_sec": "5", + "clean_shutdown_num_retries": "10", + "clean_shutdown_retry_interval_ms": "5000", + "num_messages_to_produce_per_producer_call": "5" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + + { + "entity_id": "1", + "port": "9091", + "brokerid": "1", + "num.partitions":"1", + "default.replication.factor": "2", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_1_logs", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "brokerid": "2", + "num.partitions":"1", + "default.replication.factor": "2", + "log.file.size": "1048576", + "log.dir": "/tmp/kafka_server_2_logs", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + + { + "entity_id": "3", + "topic": "t001", + "threads": "1", + "compression-codec": "0", + "message-size": "100", + "message": "5", + "request-num-acks": "-1", + "producer-retry-backoff-ms": "3500", + "producer-num-retries": "3", + "sync":"true", + "log_filename": "producer_performance_5.log", + "config_filename": "producer_performance_5.properties" + }, + { + "entity_id": "4", + "topic": "t001", + "groupid": "mytestgroup", + "consumer-timeout-ms": "60000", + "zookeeper": "localhost:2188", + "log_filename": "console_consumer_6.log", + "config_filename": "console_consumer_6.properties" + } + ] +} diff --git a/system_test/unclean_shutdown_testsuite/unclean_shutdown_test.py b/system_test/unclean_shutdown_testsuite/unclean_shutdown_test.py new file mode 100644 index 0000000..4349e2e --- /dev/null +++ b/system_test/unclean_shutdown_testsuite/unclean_shutdown_test.py @@ -0,0 +1,318 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#!/usr/bin/env python + +# =================================== +# unclean_shutdown_test.py +# =================================== + +import inspect +import logging +import os +import pprint +import signal +import subprocess +import sys +import time +import traceback + +from system_test_env import SystemTestEnv +sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR) + +from setup_utils import SetupUtils +from replication_utils import ReplicationUtils +import system_test_utils +from testcase_env import TestcaseEnv + +# product specific: Kafka +import kafka_system_test_utils +import metrics + +class UncleanShutdownTest(ReplicationUtils, SetupUtils): + + testModuleAbsPathName = os.path.realpath(__file__) + testSuiteAbsPathName = os.path.abspath(os.path.dirname(testModuleAbsPathName)) + + def __init__(self, systemTestEnv): + + # SystemTestEnv - provides cluster level environment settings + # such as entity_id, hostname, kafka_home, java_home which + # are available in a list of dictionary named + # "clusterEntityConfigDictList" + self.systemTestEnv = systemTestEnv + + super(UncleanShutdownTest, self).__init__(self) + + # dict to pass user-defined attributes to logger argument: "extra" + d = {'name_of_class': self.__class__.__name__} + + def signal_handler(self, signal, frame): + self.log_message("Interrupt detected - User pressed Ctrl+c") + + # perform the necessary cleanup here when user presses Ctrl+c and it may be product specific + self.log_message("stopping all entities - please wait ...") + kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv) + sys.exit(1) + + def runTest(self): + + # ====================================================================== + # get all testcase directories under this testsuite + # ====================================================================== + testCasePathNameList = system_test_utils.get_dir_paths_with_prefix( + self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX) + testCasePathNameList.sort() + + # ============================================================= + # launch each testcase one by one: testcase_1, testcase_2, ... + # ============================================================= + for testCasePathName in testCasePathNameList: + + skipThisTestCase = False + + try: + # ====================================================================== + # A new instance of TestcaseEnv to keep track of this testcase's env vars + # and initialize some env vars as testCasePathName is available now + # ====================================================================== + self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self) + self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName + self.testcaseEnv.initWithKnownTestCasePathName(testCasePathName) + self.testcaseEnv.testcaseArgumentsDict = self.testcaseEnv.testcaseNonEntityDataDict["testcase_args"] + + # ====================================================================== + # SKIP if this case is IN testcase_to_skip.json or NOT IN testcase_to_run.json + # ====================================================================== + testcaseDirName = self.testcaseEnv.testcaseResultsDict["_test_case_name"] + + if self.systemTestEnv.printTestDescriptionsOnly: + self.testcaseEnv.printTestCaseDescription(testcaseDirName) + continue + elif self.systemTestEnv.isTestCaseToSkip(self.__class__.__name__, testcaseDirName): + self.log_message("Skipping : " + testcaseDirName) + skipThisTestCase = True + continue + else: + self.testcaseEnv.printTestCaseDescription(testcaseDirName) + system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName) + + + # ============================================================================== # + # ============================================================================== # + # Product Specific Testing Code Starts Here: # + # ============================================================================== # + # ============================================================================== # + + # get optional testcase arguments + logRetentionTest = "false" + try: + logRetentionTest = self.testcaseEnv.testcaseArgumentsDict["log_retention_test"] + except: + pass + consumerMultiTopicsMode = "false" + try: + consumerMultiTopicsMode = self.testcaseEnv.testcaseArgumentsDict["consumer_multi_topics_mode"] + except: + pass + autoCreateTopic = "false" + try: + autoCreateTopic = self.testcaseEnv.testcaseArgumentsDict["auto_create_topic"] + except: + pass + + + # initialize self.testcaseEnv with user-defined environment variables (product specific) + self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = "" + self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = False + self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False + self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"] = [] + + # initialize signal handler + signal.signal(signal.SIGINT, self.signal_handler) + + # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file: + # system_test/_testsuite/testcase_/testcase__properties.json + self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data( + self.testcaseEnv.testcasePropJsonPathName) + + # clean up data directories specified in zookeeper.properties and kafka_server_.properties + kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase + # for collecting logs from remote machines + kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv) + + # TestcaseEnv - initialize producer & consumer config / log file pathnames + kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv) + + # generate remote hosts log/config dirs if not exist + kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # generate properties files for zookeeper, kafka, producer, consumer: + # 1. copy system_test/_testsuite/config/*.properties to + # system_test/_testsuite/testcase_/config/ + # 2. update all properties files in system_test/_testsuite/testcase_/config + # by overriding the settings specified in: + # system_test/_testsuite/testcase_/testcase__properties.json + kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName, + self.testcaseEnv, self.systemTestEnv) + + # ============================================= + # preparing all entities to start the test + # ============================================= + self.log_message("starting zookeepers") + kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 2s") + time.sleep(2) + + self.log_message("starting brokers") + kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 5s") + time.sleep(5) + + if autoCreateTopic.lower() == "false": + self.log_message("creating topics") + kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 5s") + time.sleep(5) + + + # ============================================= + # A while-loop to bounce leader as specified + # by "num_iterations" in testcase_n_properties.json + # ============================================= + i = 1 + numIterations = int(self.testcaseEnv.testcaseArgumentsDict["num_iteration"]) + brokerType = self.testcaseEnv.testcaseArgumentsDict["broker_type"] + bounceBrokerFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_broker"] + numMsgPerCall = int(self.testcaseEnv.testcaseArgumentsDict["num_messages_to_produce_per_producer_call"]) + + # a list of all brokers entityId + brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + self.systemTestEnv.clusterEntityConfigDictList, "role", "broker", "entity_id") + + + + # ============================================= + # 1. send 5 messages + # ============================================= + initMessageId = 0 + msgBatch1 = 5 + kafka_system_test_utils.start_producer_one_off(self.systemTestEnv, self.testcaseEnv, str(initMessageId), str(msgBatch1)) + time.sleep(5) + + # ============================================= + # 2a. stop broker 2 + # ============================================= + kafka_system_test_utils.stop_remote_entity( + self.systemTestEnv, brokerEntityIdList[1], self.testcaseEnv.entityBrokerParentPidDict[brokerEntityIdList[1]]) + time.sleep(5) + + # ============================================= + # 2b. send 20 messages + # ============================================= + initMessageId = msgBatch1 + msgBatch2 = 20 + kafka_system_test_utils.start_producer_one_off(self.systemTestEnv, self.testcaseEnv, str(initMessageId), str(msgBatch2)) + time.sleep(10) + + # ============================================= + # 3. stop broker 1 + # ============================================= + kafka_system_test_utils.stop_remote_entity( + self.systemTestEnv, brokerEntityIdList[0], self.testcaseEnv.entityBrokerParentPidDict[brokerEntityIdList[0]]) + time.sleep(5) + + # ============================================= + # 4a. start broker 2 + # ============================================= + kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, brokerEntityIdList[1]) + time.sleep(5) + + # ============================================= + # 4b. send 5 messages + # ============================================= + initMessageId = msgBatch1 + msgBatch2 + msgBatch3 = 5 + kafka_system_test_utils.start_producer_one_off(self.systemTestEnv, self.testcaseEnv, str(initMessageId), str(msgBatch3)) + time.sleep(5) + + # ============================================= + # 5. start broker 1 + # ============================================= + kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, brokerEntityIdList[0]) + time.sleep(5) + + + + # ============================================= + # starting console consumer + # ============================================= + self.log_message("starting consumer in the background") + kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv) + time.sleep(10) + + # ============================================= + # this testcase is completed - stop all entities + # ============================================= + self.log_message("stopping all entities") + for entityId, parentPid in self.testcaseEnv.entityBrokerParentPidDict.items(): + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + + for entityId, parentPid in self.testcaseEnv.entityZkParentPidDict.items(): + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + + # make sure all entities are stopped + kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv) + + # ============================================= + # collect logs from remote hosts + # ============================================= + kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # ============================================= + # validate the data matched and checksum + # ============================================= + self.log_message("calling validate_data_matched_in_multi_topics_from_single_consumer_producer") + kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv) + + #self.log_message("calling validate_simple_consumer_data_matched_across_replicas") + #kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv) + + self.log_message("calling validate_broker_log_segment_checksum") + kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) + + # ============================================= + # draw graphs + # ============================================= + metrics.draw_all_graphs(self.systemTestEnv.METRICS_PATHNAME, + self.testcaseEnv, + self.systemTestEnv.clusterEntityConfigDictList) + + # build dashboard, one for each role + metrics.build_all_dashboards(self.systemTestEnv.METRICS_PATHNAME, + self.testcaseEnv.testCaseDashboardsDir, + self.systemTestEnv.clusterEntityConfigDictList) + except Exception as e: + self.log_message("Exception while running test {0}".format(e)) + traceback.print_exc() + + finally: + if not skipThisTestCase and not self.systemTestEnv.printTestDescriptionsOnly: + self.log_message("stopping all entities - please wait ...") + kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv) + diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 293ef5a..89ac3a6 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -1747,7 +1747,8 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None # testcase configurations: testcaseList = testcaseEnv.testcaseConfigsList - topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic") + topicsStr = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic") + topicsList = topicsStr.split(',') brokerListStr = "" if clusterName == "source": @@ -1778,39 +1779,38 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None startingOffset = -2 brokerPortList = brokerListStr.split(',') for brokerPort in brokerPortList: - - partitionId = 0 - while (partitionId < numPartitions): - logger.info("starting debug consumer for replica on [" + brokerPort + "] partition [" + str(partitionId) + "]", extra=d) - - if minStartingOffsetDict is not None: - topicPartition = topic + "-" + str(partitionId) - startingOffset = minStartingOffsetDict[topicPartition] - - outputFilePathName = consumerLogPath + "/simple_consumer_" + topic + "-" + str(partitionId) + "_r" + str(replicaIndex) + ".log" - brokerPortLabel = brokerPort.replace(":", "_") - cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, - kafkaRunClassBin + " kafka.tools.SimpleConsumerShell", - "--broker-list " + brokerListStr, - "--topic " + topic, - "--partition " + str(partitionId), - "--replica " + str(replicaIndex), - "--offset " + str(startingOffset), - "--no-wait-at-logend ", - " > " + outputFilePathName, - " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] + for topic in topicsList: + partitionId = 0 + while (partitionId < numPartitions): + logger.info("starting debug consumer for replica on [" + brokerPort + "] partition [" + str(partitionId) + "]", extra=d) + + if minStartingOffsetDict is not None: + topicPartition = topic + "-" + str(partitionId) + startingOffset = minStartingOffsetDict[topicPartition] + + outputFilePathName = consumerLogPath + "/simple_consumer_" + topic + "-" + str(partitionId) + "_r" + str(replicaIndex) + ".log" + brokerPortLabel = brokerPort.replace(":", "_") + cmdList = ["ssh " + host, + "'JAVA_HOME=" + javaHome, + kafkaRunClassBin + " kafka.tools.SimpleConsumerShell", + "--broker-list " + brokerListStr, + "--topic " + topic, + "--partition " + str(partitionId), + "--replica " + str(replicaIndex), + "--offset " + str(startingOffset), + "--no-wait-at-logend ", + " > " + outputFilePathName, + " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] - cmdStr = " ".join(cmdList) + cmdStr = " ".join(cmdList) - logger.debug("executing command: [" + cmdStr + "]", extra=d) - subproc_1 = system_test_utils.sys_call_return_subproc(cmdStr) - # dummy for-loop to wait until the process is completed - for line in subproc_1.stdout.readlines(): - pass - time.sleep(1) - - partitionId += 1 + logger.debug("executing command: [" + cmdStr + "]", extra=d) + subproc_1 = system_test_utils.sys_call_return_subproc(cmdStr) + # dummy for-loop to wait until the process is completed + for line in subproc_1.stdout.readlines(): + pass + time.sleep(1) + partitionId += 1 replicaIndex += 1 def validate_simple_consumer_data_matched(systemTestEnv, testcaseEnv): @@ -2031,42 +2031,46 @@ def validate_simple_consumer_data_matched_across_replicas(systemTestEnv, testcas mismatchCounter = 0 for consumerEntityId in consumerEntityIdList: - topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") + topicsStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") + topicsList = topicsStr.split(',') consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", consumerEntityId, "default") - replicaIdxMsgCountDictList = [] - # replicaIdxMsgCountDictList is being used as follows: - # - # the above replica message count will be organized as follows: - # index of the list would map to the partitionId - # each element in the list maps to the replicaIdx-MessageCount - # to validate that : - # 1. there should be "no. of broker" of non-zero message count and they are equal - # 2. there should be "no. of broker - replication factor" of zero count - # [{"1": "750", "2": "750", "3": "0" }, - # {"1": "0" , "2": "750", "3": "750"}] - - j = 0 - while j < int(numPartition): - newDict = {} - replicaIdxMsgCountDictList.append(newDict) - j += 1 - - for logFile in sorted(os.listdir(consumerLogPath)): - - if logFile.startswith("simple_consumer_") and logFile.endswith(".log"): - matchObj = re.match("simple_consumer_"+topic+"-(\d*)_r(\d*)\.log" , logFile) - partitionId = int(matchObj.group(1)) - replicaIdx = int(matchObj.group(2)) - - consumerLogPathName = consumerLogPath + "/" + logFile - consumerMsgIdList = get_message_id(consumerLogPathName) - consumerMsgIdSet = set(consumerMsgIdList) - - replicaIdxMsgCountDictList[partitionId][replicaIdx] = len(consumerMsgIdSet) - - logger.info("no. of unique messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d) - validationStatusDict["Unique messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdSet)) + for topic in topicsList: + replicaIdxMsgCountDictList = [] + # replicaIdxMsgCountDictList is being used as follows: + # + # the above replica message count will be organized as follows: + # index of the list would map to the partitionId + # each element in the list maps to the replicaIdx-MessageCount + # to validate that : + # 1. there should be "no. of broker" of non-zero message count and they are equal + # 2. there should be "no. of broker - replication factor" of zero count + # [{"1": "750", "2": "750", "3": "0" }, + # {"1": "0" , "2": "750", "3": "750"}] + + j = 0 + while j < int(numPartition): + newDict = {} + replicaIdxMsgCountDictList.append(newDict) + j += 1 + + for logFile in sorted(os.listdir(consumerLogPath)): + + if logFile.startswith("simple_consumer_"+topic) and logFile.endswith(".log"): + logger.debug("file : " + logFile, extra=d) + logger.debug("topic : " + topic, extra=d) + matchObj = re.match("simple_consumer_"+topic+"-(\d*)_r(\d*)\.log" , logFile) + partitionId = int(matchObj.group(1)) + replicaIdx = int(matchObj.group(2)) + + consumerLogPathName = consumerLogPath + "/" + logFile + consumerMsgIdList = get_message_id(consumerLogPathName) + consumerMsgIdSet = set(consumerMsgIdList) + + replicaIdxMsgCountDictList[partitionId][replicaIdx] = len(consumerMsgIdSet) + + logger.info("no. of unique messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d) + validationStatusDict["Unique messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdSet)) pprint.pprint(replicaIdxMsgCountDictList) @@ -2163,4 +2167,254 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d) +def get_ISR(systemTestEnv, testcaseEnv): + logger.info("Querying Zookeeper for ISR info ...", extra=d) + clusterConfigsList = systemTestEnv.clusterEntityConfigDictList + tcConfigsList = testcaseEnv.testcaseConfigsList + + zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") + firstZkDict = zkDictList[0] + firstZkHost = firstZkDict["hostname"] + firstZkEntityId = firstZkDict["entity_id"] + clusterName = firstZkDict["cluster_name"] + firstZkClientPort = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", firstZkEntityId, "clientPort") + firstZkKafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", firstZkEntityId, "kafka_home") + firstZkJavaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", firstZkEntityId, "java_home") + firstZkKafkaRunClassBin = firstZkKafkaHome + "/bin/kafka-run-class.sh" + numPartition = testcaseEnv.testcaseArgumentsDict["num_partition"] + + zkConnectStr = "" + if clusterName == "source": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + elif clusterName == "target": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + else: + logger.error("Invalid cluster name : " + clusterName, extra=d) + sys.exit(1) + + topicPartitionISRDict = {} + + producerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "producer_performance") + for producerConfig in producerConfigList: + entityId = producerConfig["entity_id"] + topicsStr = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", entityId, "topic") + topics = topicsStr.split(",") + + partitionIndex = 0 + for topic in topics: + while partitionIndex < int(numPartition): + cmdStrList = ["ssh " + firstZkHost, + "\"JAVA_HOME=" + firstZkJavaHome, + firstZkKafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain", + "-server " + zkConnectStr, + "'get /brokers/topics/" + topic + "/partitions/" + str(partitionIndex) + "/leaderAndISR' 2> /dev/null | tail -1\""] + + cmdStr = " ".join(cmdStrList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + + for line in subproc.stdout.readlines(): + if "ISR" in line: + matchObj = re.match(".*\"ISR\":\"(.*?)\",.*", line) + isrList = matchObj.group(1) + topicPartitionISRDict[topic + "-" + str(partitionIndex)] = isrList + break + + partitionIndex += 1 + + return topicPartitionISRDict + + +def bounce_broker(systemTestEnv, testcaseEnv, entityId): + # bouncing this broker + logger.info("stopping broker : " + entityId, extra=d) + stop_remote_entity(systemTestEnv, entityId, testcaseEnv.entityBrokerParentPidDict[entityId]) + + # wait here for the specified amount of time + brokerDownTimeInSec = 5 + try: + brokerDownTimeInSec = int(testcaseEnv.testcaseArgumentsDict["broker_down_time_in_sec"]) + except: + pass # take default + logger.info("sleeping for : " + str(brokerDownTimeInSec), extra=d) + time.sleep(brokerDownTimeInSec) + + # starting previously terminated broker + logger.info("starting the previously terminated broker", extra=d) + start_entity_in_background(systemTestEnv, testcaseEnv, entityId) + + +def call_ShutdownBroker_for_clean_shutdown(systemTestEnv, testcaseEnv, shutdownEntityId, controllerEntityId): + logger.info("Sending ShutdownBroker Command for broker [" + shutdownEntityId + "]", extra=d) + + clusterConfigsList = systemTestEnv.clusterEntityConfigDictList + tcConfigsList = testcaseEnv.testcaseConfigsList + validationStatusDict = testcaseEnv.validationStatusDict + + # get the first Zookeeper attributes + zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") + firstZkDict = zkDictList[0] + hostname = firstZkDict["hostname"] + zkEntityId = firstZkDict["entity_id"] + kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home") + javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home") + clusterName = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "cluster_name") + kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" + + # get zk.connect + zkConnectStr = "" + if clusterName == "source": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + elif clusterName == "target": + zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + else: + logger.error("Invalid cluster name : " + clusterName, extra=d) + sys.exit(1) + + # get broker attributes + brokerId = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", shutdownEntityId, "brokerid") + ctlHost = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", controllerEntityId, "hostname") + ctlJmxPort = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", controllerEntityId, "jmx_port") + + # get optional ShutdownBroker command line args from testcase_xxxx_properties.json + cleanShutdownNumRetries = 10 + try: + cleanShutdownNumRetries = int(testcaseEnv.testcaseArgumentsDict["clean_shutdown_num_retries"]) + except: + pass # take default + cleanShutdownRetryIntervalMs = 5000 + try: + cleanShutdownRetryIntervalMs = int(testcaseEnv.testcaseArgumentsDict["clean_shutdown_retry_interval_ms"]) + except: + pass # take default + + # set up the shell command to call ShutdownBroker + cmdStrList = ["ssh " + hostname, + "\"JAVA_HOME=" + javaHome, + kafkaRunClassBin + " kafka.admin.ShutdownBroker", + "--broker " + brokerId, + "--num.retries " + str(cleanShutdownNumRetries), + "--zookeeper " + zkConnectStr, + "--retry.interval.ms " + str(cleanShutdownRetryIntervalMs), + "--jmx.url service:jmx:rmi:///jndi/rmi://" + ctlHost + ":" + ctlJmxPort + "/jmxrmi\""] + + # execute the command + cmdStr = " ".join(cmdStrList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + rstripped_line = line.rstrip('\n') + #logger.debug("#### => " + rstripped_line, extra=d) + if "INFO Shutdown status: complete" in rstripped_line: + logger.info("found clean shutdown status in log : " + rstripped_line, extra=d) + validationStatusDict["Validate for broker " + shutdownEntityId + " clean shutdown"] = "PASSED" + return + + validationStatusDict["Validate for broker " + shutdownEntityId + " clean shutdown"] = "FAILED" + + +def start_producer_one_off(systemTestEnv, testcaseEnv, initMsgId, noMsgPerBatch="-1"): + + entityConfigList = systemTestEnv.clusterEntityConfigDictList + testcaseConfigsList = testcaseEnv.testcaseConfigsList + producerConfigList = system_test_utils.get_dict_from_list_of_dicts(entityConfigList, "role", "producer_performance") + + for producerConfig in producerConfigList: + clusterName = producerConfig["cluster_name"] + entityId = producerConfig["entity_id"] + host = producerConfig["hostname"] + javaHome = producerConfig["java_home"] + jmxPort = producerConfig["jmx_port"] + kafkaHome = producerConfig["kafka_home"] + kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" + + # get optional testcase arguments + numTopicsForAutoGenString = -1 + try: + numTopicsForAutoGenString = int(testcaseEnv.testcaseArgumentsDict["num_topics_for_auto_generated_string"]) + except: + pass + + # testcase configurations: + testcaseConfigsList = testcaseEnv.testcaseConfigsList + topic = "" + if numTopicsForAutoGenString < 0: + topic = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "topic") + else: + topic = generate_topics_string("topic", numTopicsForAutoGenString) + + # update this variable and will be used by data validation functions + testcaseEnv.producerTopicsString = topic + + threads = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "threads") + compCodec = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "compression-codec") + messageSize = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message-size") + requestNumAcks = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "request-num-acks") + syncMode = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "sync") + retryBackoffMs = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-retry-backoff-ms") + numOfRetries = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-num-retries") + if noMsgPerBatch == "-1": + noMsgPerBatch = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message") + + # for optional properties in testcase_xxxx_properties.json, + # check the length of returned value for those properties: + if len(retryBackoffMs) == 0: # no setting for "producer-retry-backoff-ms" + retryBackoffMs = "100" # default + if len(numOfRetries) == 0: # no setting for "producer-num-retries" + numOfRetries = "3" # default + + brokerListStr = "" + if clusterName == "source": + brokerListStr = testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"] + elif clusterName == "target": + brokerListStr = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] + else: + logger.error("Unknown cluster name: " + clusterName, extra=d) + sys.exit(1) + + logger.info("starting producer preformance", extra=d) + + producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + metricsDir = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "metrics") + + if host != "localhost": + producerLogPath = replace_kafka_home(producerLogPath, kafkaHome) + metricsDir = replace_kafka_home(metricsDir, kafkaHome) + + producerLogPathName = producerLogPath + "/producer_performance.log" + + testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName + + boolArgumentsStr = "" + if syncMode.lower() == "true": + boolArgumentsStr = boolArgumentsStr + " --sync" + + logger.info("#### producing [" + str(noMsgPerBatch) + "] messages with starting message id : [" + str(initMsgId) + "]", extra=d) + + cmdList = ["ssh " + host, + "'JAVA_HOME=" + javaHome, + "JMX_PORT=" + jmxPort, + kafkaRunClassBin + " kafka.perf.ProducerPerformance", + "--broker-list " + brokerListStr, + "--initial-message-id " + str(initMsgId), + "--messages " + noMsgPerBatch, + "--topics " + topic, + "--threads " + threads, + "--compression-codec " + compCodec, + "--message-size " + messageSize, + "--request-num-acks " + requestNumAcks, + "--producer-retry-backoff-ms " + retryBackoffMs, + "--producer-num-retries " + numOfRetries, + "--csv-reporter-enabled", + "--metrics-dir " + metricsDir, + boolArgumentsStr, + " >> " + producerLogPathName, + " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"] + + cmdStr = " ".join(cmdList) + logger.debug("executing command: [" + cmdStr + "]", extra=d) + + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + pass # dummy loop to wait until producer is completed