From a4fe8a6c2f66c5bc7e4211c8b52880cdc49d4a1f Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Tue, 19 Aug 2014 21:05:02 -0700 Subject: [PATCH] KAFKA-1604; System Test for Transaction Management --- .../transaction_testsuite/cluster_config.json | 67 +++++ .../config/console_consumer.properties | 2 + .../transaction_testsuite/config/server.properties | 145 +++++++++++ .../config/zookeeper.properties | 23 ++ .../testcase_20001/testcase_20001_properties.json | 97 ++++++++ .../transaction_testsuite/transaction_test.py | 273 +++++++++++++++++++++ system_test/utils/kafka_system_test_utils.py | 6 + 7 files changed, 613 insertions(+) create mode 100644 system_test/transaction_testsuite/cluster_config.json create mode 100644 system_test/transaction_testsuite/config/console_consumer.properties create mode 100644 system_test/transaction_testsuite/config/producer_performance.properties create mode 100644 system_test/transaction_testsuite/config/server.properties create mode 100644 system_test/transaction_testsuite/config/zookeeper.properties create mode 100644 system_test/transaction_testsuite/testcase_20001/testcase_20001_properties.json create mode 100644 system_test/transaction_testsuite/transaction_test.py diff --git a/system_test/transaction_testsuite/cluster_config.json b/system_test/transaction_testsuite/cluster_config.json new file mode 100644 index 0000000..dcb38a7 --- /dev/null +++ b/system_test/transaction_testsuite/cluster_config.json @@ -0,0 +1,67 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9100" + }, + { + "entity_id": "1", + "hostname": "localhost", + "role": "broker", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9101" + }, + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9102" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "broker", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9103" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "broker", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9104" + }, + { + "entity_id": "5", + "hostname": "localhost", + "role": "producer_performance", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9105" + }, + { + "entity_id": "6", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name":"source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9106" + } + ] +} diff --git a/system_test/transaction_testsuite/config/console_consumer.properties b/system_test/transaction_testsuite/config/console_consumer.properties new file mode 100644 index 0000000..a2ab8b9 --- /dev/null +++ b/system_test/transaction_testsuite/config/console_consumer.properties @@ -0,0 +1,2 @@ +auto.offset.reset=smallest +auto.commit.interval.ms=1000 diff --git a/system_test/transaction_testsuite/config/producer_performance.properties b/system_test/transaction_testsuite/config/producer_performance.properties new file mode 100644 index 0000000..e69de29 diff --git a/system_test/transaction_testsuite/config/server.properties b/system_test/transaction_testsuite/config/server.properties new file mode 100644 index 0000000..91ebdc4 --- /dev/null +++ b/system_test/transaction_testsuite/config/server.properties @@ -0,0 +1,145 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned +# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost +# may not be what you want. +#host.name= + + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9091 + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=2 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# The directory under which to store log files +log.dir=/tmp/kafka_server_logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=5 + +# Overrides for for the default given by num.partitions on a per-topic basis +#topic.partition.count.map=topic1:3, topic2:4 + +############################# Log Flush Policy ############################# + +# The following configurations control the flush of data to disk. This is the most +# important performance knob in kafka. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. +# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). +# 3. Throughput: The flush is generally the most expensive operation. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +log.flush.interval.ms=1000 + +# Per-topic overrides for log.flush.interval.ms +#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000 + +# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. +log.flush.scheduler.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 +log.retention.bytes=-1 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +#log.segment.size=536870912 +log.segment.bytes=102400 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.cleanup.interval.mins=1 + +############################# Zookeeper ############################# + +# Enable connecting to zookeeper +enable.zookeeper=true + +# Zk connection string (see zk docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 + +monitoring.period.secs=1 +message.max.bytes=1000000 +queued.max.requests=500 +log.roll.hours=168 +log.index.size.max.bytes=10485760 +log.index.interval.bytes=4096 +auto.create.topics.enable=true +controller.socket.timeout.ms=30000 +controller.message.queue.size=10 +default.replication.factor=1 +replica.lag.time.max.ms=10000 +replica.lag.max.messages=4000 +replica.socket.timeout.ms=30000 +replica.socket.receive.buffer.bytes=65536 +replica.fetch.max.bytes=1048576 +replica.fetch.wait.max.ms=500 +replica.fetch.min.bytes=4096 +num.replica.fetchers=1 + +transaction.topic.num.partitions=2 +transaction.topic.replication.factor=4 +offsets.topic.num.partitions=2 +offsets.topic.replication.factor=4 diff --git a/system_test/transaction_testsuite/config/zookeeper.properties b/system_test/transaction_testsuite/config/zookeeper.properties new file mode 100644 index 0000000..adff28b --- /dev/null +++ b/system_test/transaction_testsuite/config/zookeeper.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=/tmp/zookeeper +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 +syncLimit=5 +initLimit=10 +tickTime=2000 diff --git a/system_test/transaction_testsuite/testcase_20001/testcase_20001_properties.json b/system_test/transaction_testsuite/testcase_20001/testcase_20001_properties.json new file mode 100644 index 0000000..c8c320b --- /dev/null +++ b/system_test/transaction_testsuite/testcase_20001/testcase_20001_properties.json @@ -0,0 +1,97 @@ +{ + "description": {"01":"To Test : 'Transaction management test.'", + "02":"Set up a Zk and Kafka cluster.", + "03":"Produce messages to a multiple topics - various partition counts.", + "04":"Start consumers.", + "05":"Bounce consumers.", + "06":"Verify that there are no duplicate messages or lost messages on any consumer group.", + "07":"Producer dimensions : mode:sync, acks:-1, comp:0" + }, + "testcase_args": { + "bounce_leaders": "true", + "replica_factor": "3", + "num_partition": "5", + "topic": "topic_test", + "num_iteration": "0", + "sleep_seconds_between_producer_calls": "1", + "message_producing_free_time_sec": "15", + "num_messages_to_produce_per_producer_call": "50", + "num_topics_for_auto_generated_string":"-1" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2108", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_0.log", + "config_filename": "zookeeper_0.properties" + }, + { + "entity_id": "1", + "port": "9091", + "broker.id": "1", + "log.segment.bytes": "10240", + "log.dir": "/tmp/kafka_server_1_logs", + "default.replication.factor": "3", + "num.partitions": "5", + "log_filename": "kafka_server_1.log", + "config_filename": "kafka_server_1.properties" + }, + { + "entity_id": "2", + "port": "9092", + "broker.id": "2", + "log.segment.bytes": "10240", + "log.dir": "/tmp/kafka_server_2_logs", + "default.replication.factor": "3", + "num.partitions": "5", + "log_filename": "kafka_server_2.log", + "config_filename": "kafka_server_2.properties" + }, + { + "entity_id": "3", + "port": "9093", + "broker.id": "3", + "log.segment.bytes": "10240", + "log.dir": "/tmp/kafka_server_3_logs", + "default.replication.factor": "3", + "num.partitions": "5", + "log_filename": "kafka_server_3.log", + "config_filename": "kafka_server_3.properties" + }, + { + "entity_id": "4", + "port": "9094", + "broker.id": "4", + "log.segment.bytes": "10240", + "log.dir": "/tmp/kafka_server_4_logs", + "default.replication.factor": "3", + "num.partitions": "5", + "log_filename": "kafka_server_4.log", + "config_filename": "kafka_server_4.properties" + }, + { + "entity_id": "5", + "topic": "topic_test", + "threads": "1", + "compression-codec": "0", + "message-size": "500", + "message": "1000", + "tx-producer": "true", + "message-per-tx": "100", + "request-num-acks": "-1", + "sync":"true", + "producer-num-retries":"5", + "log_filename": "producer_performance_10.log", + "config_filename": "producer_performance_10.properties" + }, + { + "entity_id": "6", + "topic": "topic_test", + "group.id": "group1", + "consumer-timeout-ms": "30000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer_6.properties" + } + ] +} diff --git a/system_test/transaction_testsuite/transaction_test.py b/system_test/transaction_testsuite/transaction_test.py new file mode 100644 index 0000000..e1a2d28 --- /dev/null +++ b/system_test/transaction_testsuite/transaction_test.py @@ -0,0 +1,273 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#!/usr/bin/env python + +# =================================== +# transaction_management_test.py +# =================================== + +import os +import signal +import sys +import time +import traceback +import random + +from system_test_env import SystemTestEnv +sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR) + +from setup_utils import SetupUtils +from replication_utils import ReplicationUtils +import system_test_utils +from testcase_env import TestcaseEnv + +# product specific: Kafka +import kafka_system_test_utils +import metrics + +class TransactionManagementTest(ReplicationUtils, SetupUtils): + + testModuleAbsPathName = os.path.realpath(__file__) + testSuiteAbsPathName = os.path.abspath(os.path.dirname(testModuleAbsPathName)) + + def __init__(self, systemTestEnv): + + # SystemTestEnv - provides cluster level environment settings + # such as entity_id, hostname, kafka_home, java_home which + # are available in a list of dictionary named + # "clusterEntityConfigDictList" + self.systemTestEnv = systemTestEnv + + super(TransactionManagementTest, self).__init__(self) + + # dict to pass user-defined attributes to logger argument: "extra" + d = {'name_of_class': self.__class__.__name__} + + def signal_handler(self, signal, frame): + self.log_message("Interrupt detected - User pressed Ctrl+c") + + # perform the necessary cleanup here when user presses Ctrl+c and it may be product specific + self.log_message("stopping all entities - please wait ...") + kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv) + sys.exit(1) + + def runTest(self): + + # ====================================================================== + # get all testcase directories under this testsuite + # ====================================================================== + testCasePathNameList = system_test_utils.get_dir_paths_with_prefix( + self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX) + testCasePathNameList.sort() + + replicationUtils = ReplicationUtils(self) + + # ============================================================= + # launch each testcase one by one: testcase_1, testcase_2, ... + # ============================================================= + for testCasePathName in testCasePathNameList: + + skipThisTestCase = False + + try: + # ====================================================================== + # A new instance of TestcaseEnv to keep track of this testcase's env vars + # and initialize some env vars as testCasePathName is available now + # ====================================================================== + self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self) + self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName + self.testcaseEnv.initWithKnownTestCasePathName(testCasePathName) + self.testcaseEnv.testcaseArgumentsDict = self.testcaseEnv.testcaseNonEntityDataDict["testcase_args"] + + # ====================================================================== + # SKIP if this case is IN testcase_to_skip.json or NOT IN testcase_to_run.json + # ====================================================================== + testcaseDirName = self.testcaseEnv.testcaseResultsDict["_test_case_name"] + + if self.systemTestEnv.printTestDescriptionsOnly: + self.testcaseEnv.printTestCaseDescription(testcaseDirName) + continue + elif self.systemTestEnv.isTestCaseToSkip(self.__class__.__name__, testcaseDirName): + self.log_message("Skipping : " + testcaseDirName) + skipThisTestCase = True + continue + else: + self.testcaseEnv.printTestCaseDescription(testcaseDirName) + system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName) + + # ============================================================================== # + # ============================================================================== # + # Product Specific Testing Code Starts Here: # + # ============================================================================== # + # ============================================================================== # + + # initialize self.testcaseEnv with user-defined environment variables (product specific) + self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = False + self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False + + # initialize signal handler + signal.signal(signal.SIGINT, self.signal_handler) + + # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file: + # system_test/_testsuite/testcase_/testcase__properties.json + self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data( + self.testcaseEnv.testcasePropJsonPathName) + + # clean up data directories specified in zookeeper.properties and kafka_server_.properties + kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase + # for collecting logs from remote machines + kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv) + + # TestcaseEnv - initialize producer & consumer config / log file pathnames + kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv) + + # generate remote hosts log/config dirs if not exist + kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # generate properties files for zookeeper, kafka, producer, and consumer: + # 1. copy system_test/_testsuite/config/*.properties to + # system_test/_testsuite/testcase_/config/ + # 2. update all properties files in system_test/_testsuite/testcase_/config + # by overriding the settings specified in: + # system_test/_testsuite/testcase_/testcase__properties.json + kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName, + self.testcaseEnv, self.systemTestEnv) + + # ============================================= + # preparing all entities to start the test + # ============================================= + self.log_message("starting zookeepers") + kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 2s") + time.sleep(2) + + self.log_message("starting brokers") + kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 5s") + time.sleep(5) + + self.log_message("creating test topic") + kafka_system_test_utils.create_topic_for_producer_performance(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 5s") + time.sleep(5) + + # ============================================= + # starting producer + # ============================================= + self.log_message("starting producer in the background") + kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv, False) + msgProducingFreeTimeSec = self.testcaseEnv.testcaseArgumentsDict["message_producing_free_time_sec"] + self.anonLogger.info("sleeping for " + msgProducingFreeTimeSec + " sec to produce some messages") + time.sleep(int(msgProducingFreeTimeSec)) + + + # ============================================= + # starting consumer + # ============================================= + self.log_message("starting console consumer") + kafka_system_test_utils.start_console_consumers(self.systemTestEnv, self.testcaseEnv) + + # ============================================= + # A while-loop to bounce consumers as specified + # by "num_iterations" in testcase_n_properties.json + # ============================================= + i = 1 + numIterations = int(self.testcaseEnv.testcaseArgumentsDict["num_iteration"]) + bouncedEntityDownTimeSec = 10 + try: + bouncedEntityDownTimeSec = int(self.testcaseEnv.testcaseArgumentsDict["bounced_entity_downtime_sec"]) + except: + pass + + num_partition = int(self.testcaseEnv.testcaseArgumentsDict["num_partition"]) + topic = self.testcaseEnv.testcaseArgumentsDict["topic"] + partition = 0 + + while i <= numIterations: + #partition = random.randint(0, num_partition-1) + partition = (partition + 1) % num_partition + leader_entity = kafka_system_test_utils.get_leader_for(self.systemTestEnv, self.testcaseEnv, topic, partition) + self.log_message("bounce leaders in iteration " + str(i) + " of " + str(numIterations)) + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, leader_entity, self.testcaseEnv.entityBrokerParentPidDict[leader_entity]) + self.anonLogger.info("sleeping for 15s") + time.sleep(15) + i += 1 + # while loop + + # ============================================= + # tell producer to stop + # ============================================= + self.log_message("stopping background producer") + self.testcaseEnv.lock.acquire() + self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True + time.sleep(1) + self.testcaseEnv.lock.release() + time.sleep(1) + + # ============================================= + # wait for producer thread's update of + # "backgroundProducerStopped" to be "True" + # ============================================= + while 1: + self.testcaseEnv.lock.acquire() + self.logger.info("status of backgroundProducerStopped : [" + \ + str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d) + if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]: + time.sleep(1) + self.logger.info("all producer threads completed", extra=self.d) + break + time.sleep(1) + self.testcaseEnv.lock.release() + time.sleep(2) + + self.anonLogger.info("sleeping for 15s") + time.sleep(15) + + # ============================================= + # this testcase is completed - stop all entities + # ============================================= + self.log_message("stopping all entities") + for entityId, parentPid in self.testcaseEnv.entityBrokerParentPidDict.items(): + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + + for entityId, parentPid in self.testcaseEnv.entityZkParentPidDict.items(): + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid) + + # make sure all entities are stopped + kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv) + + # ============================================= + # collect logs from remote hosts + # ============================================= + kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv) + + # ============================================= + # validate the data matched and checksum + # ============================================= + self.log_message("validating data matched") + kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv, replicationUtils) + + except Exception as e: + self.log_message("Exception while running test {0}".format(e)) + traceback.print_exc() + + finally: + if not skipThisTestCase and not self.systemTestEnv.printTestDescriptionsOnly: + self.log_message("stopping all entities - please wait ...") + kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv) diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 6edd64a..4871e8c 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -1065,8 +1065,10 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk requestNumAcks = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "request-num-acks") syncMode = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "sync") useNewProducer = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "new-producer") + useTxProducer = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "tx-producer") retryBackoffMs = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-retry-backoff-ms") numOfRetries = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-num-retries") + noMsgPerTx = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message-per-tx") # for optional properties in testcase_xxxx_properties.json, # check the length of returned value for those properties: @@ -1105,6 +1107,10 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk boolArgumentsStr = boolArgumentsStr + " --sync" if useNewProducer.lower() == "true": boolArgumentsStr = boolArgumentsStr + " --new-producer" + elif useTxProducer.lower() == "true": + boolArgumentsStr = boolArgumentsStr + " --tx-producer" + if len(noMsgPerTx) > 0: + boolArgumentsStr = boolArgumentsStr + " --messages-per-tx " + noMsgPerTx # keep calling producer until signaled to stop by: # testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] -- 1.7.12.4