Index: system_test/reg_test/suite_replication/config/producer_performance.properties =================================================================== --- system_test/reg_test/suite_replication/config/producer_performance.properties (revision 0) +++ system_test/reg_test/suite_replication/config/producer_performance.properties (revision 0) @@ -0,0 +1,7 @@ +brokerinfo=zk.connect=localhost:2181 +topic=mytest +messages=200 +message-size=100 +thread=5 +initial-message-id=0 +compression-codec=0 Index: system_test/reg_test/suite_replication/config/server.properties =================================================================== --- system_test/reg_test/suite_replication/config/server.properties (revision 0) +++ system_test/reg_test/suite_replication/config/server.properties (revision 0) @@ -0,0 +1,120 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +brokerid=0 + +# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned +# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost +# may not be what you want. +#hostname= + + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9091 + +# The number of threads handling network requests +network.threads=2 + +# The number of threads doing disk I/O +io.threads=2 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +max.socket.request.bytes=104857600 + + +############################# Log Basics ############################# + +# The directory under which to store log files +log.dir=/tmp/kafka_server_logs + +# The number of logical partitions per topic per server. More partitions allow greater parallelism +# for consumption, but also mean more files. +num.partitions=5 + +# Overrides for for the default given by num.partitions on a per-topic basis +#topic.partition.count.map=topic1:3, topic2:4 + +############################# Log Flush Policy ############################# + +# The following configurations control the flush of data to disk. This is the most +# important performance knob in kafka. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. +# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). +# 3. Throughput: The flush is generally the most expensive operation. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +log.default.flush.interval.ms=1000 + +# Per-topic overrides for log.default.flush.interval.ms +#topic.flush.intervals.ms=topic1:1000, topic2:3000 + +# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. +log.default.flush.scheduler.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.size. +#log.retention.size=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +#log.file.size=536870912 +#log.file.size=102400 +log.file.size=128 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.cleanup.interval.mins=1 + +############################# Zookeeper ############################# + +# Enable connecting to zookeeper +enable.zookeeper=true + +# Zk connection string (see zk docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zk.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 Index: system_test/reg_test/suite_replication/config/consumer.properties =================================================================== --- system_test/reg_test/suite_replication/config/consumer.properties (revision 0) +++ system_test/reg_test/suite_replication/config/consumer.properties (revision 0) @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.consumer.ConsumerConfig for more details + +# zk connection string +# comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" +zk.connect=127.0.0.1:2181 + +# timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 + +#consumer group id +groupid=test-consumer-group + +#consumer timeout +#consumer.timeout.ms=5000 Index: system_test/reg_test/suite_replication/config/console_consumer.properties =================================================================== --- system_test/reg_test/suite_replication/config/console_consumer.properties (revision 0) +++ system_test/reg_test/suite_replication/config/console_consumer.properties (revision 0) @@ -0,0 +1,4 @@ +zookeeper=ela4-app0996.prod:2181 +topic=test_1 +from-beginning +consumer-timeout-ms=10000 Index: system_test/reg_test/suite_replication/config/log4j.properties =================================================================== --- system_test/reg_test/suite_replication/config/log4j.properties (revision 0) +++ system_test/reg_test/suite_replication/config/log4j.properties (revision 0) @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +#log4j.appender.fileAppender=org.apache.log4j.FileAppender +#log4j.appender.fileAppender.File=kafka-request.log +#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout +#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n + + +# Turn on all our debugging info +#log4j.logger.kafka=INFO +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG + +# to print message checksum from ProducerPerformance +log4j.logger.kafka.perf=DEBUG +log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG + +# to print message checksum from ProducerPerformance +log4j.logger.kafka.perf=DEBUG +log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG + Index: system_test/reg_test/suite_replication/config/producer.properties =================================================================== --- system_test/reg_test/suite_replication/config/producer.properties (revision 0) +++ system_test/reg_test/suite_replication/config/producer.properties (revision 0) @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.producer.ProducerConfig for more details + +############################# Producer Basics ############################# + +# need to set either broker.list or zk.connect + +# configure brokers statically +# format: brokerid1:host1:port1,brokerid2:host2:port2 ... +#broker.list=0:localhost:9092 + +# discover brokers from ZK +zk.connect=localhost:2181 + +# zookeeper session timeout; default is 6000 +#zk.sessiontimeout.ms= + +# the max time that the client waits to establish a connection to zookeeper; default is 6000 +#zk.connectiontimeout.ms + +# name of the partitioner class for partitioning events; default partition spreads data randomly +#partitioner.class= + +# specifies whether the messages are sent asynchronously (async) or synchronously (sync) +producer.type=sync + +# specify the compression codec for all data generated: 0: no compression, 1: gzip +compression.codec=0 + +# message encoder +serializer.class=kafka.serializer.StringEncoder + +# allow topic level compression +#compressed.topics= + +# max message size; messages larger than that size are discarded; default is 1000000 +#max.message.size= + + +############################# Async Producer ############################# +# maximum time, in milliseconds, for buffering data on the producer queue +#queue.time= + +# the maximum size of the blocking queue for buffering on the producer +#queue.size= + +# Timeout for event enqueue: +# 0: events will be enqueued immediately or dropped if the queue is full +# -ve: enqueue will block indefinitely if the queue is full +# +ve: enqueue will block up to this many milliseconds if the queue is full +#queue.enqueueTimeout.ms= + +# the number of messages batched at the producer +#batch.size= + +# the callback handler for one or multiple events +#callback.handler= + +# properties required to initialize the callback handler +#callback.handler.props= + +# the handler for events +#event.handler= + +# properties required to initialize the event handler +#event.handler.props= + Index: system_test/reg_test/suite_replication/config/zookeeper.properties =================================================================== --- system_test/reg_test/suite_replication/config/zookeeper.properties (revision 0) +++ system_test/reg_test/suite_replication/config/zookeeper.properties (revision 0) @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=/tmp/zookeeper +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 Index: system_test/reg_test/suite_replication/__init__.py =================================================================== --- system_test/reg_test/suite_replication/__init__.py (revision 0) +++ system_test/reg_test/suite_replication/__init__.py (revision 0) @@ -0,0 +1 @@ + Index: system_test/reg_test/suite_replication/bin/__init__.py =================================================================== --- system_test/reg_test/suite_replication/bin/__init__.py (revision 0) +++ system_test/reg_test/suite_replication/bin/__init__.py (revision 0) @@ -0,0 +1 @@ + Index: system_test/reg_test/suite_replication/bin/replica_basic_test.py =================================================================== --- system_test/reg_test/suite_replication/bin/replica_basic_test.py (revision 0) +++ system_test/reg_test/suite_replication/bin/replica_basic_test.py (revision 0) @@ -0,0 +1,218 @@ +#!/usr/bin/env python + +# =================================== +# file: replica_basic_test.py +# =================================== + +import inspect +import os +import subprocess +import sys +import time + +from reg_test_env import RegTestEnv +sys.path.append(RegTestEnv.regTestLibDir) +import reg_test_helper +import li_reg_test_helper +from reg_test_helper import RtLogging + +class ReplicaBasicTest(): + testModuleAbsPathname = os.path.realpath(__file__) + testSuiteAbsPathname = os.path.abspath(os.path.dirname(testModuleAbsPathname) + "/..") + isLeaderLogPattern = "completed the leader state transition" + + def __init__(self, clusterConfigsList): + self.clCfgList = clusterConfigsList + self.rtLogging = RtLogging("ReplicaBasicTest") + self.rtLogger = self.rtLogging.getRtLogger() + + def runTest(self, regTestResultsList): + # get all test cases under this test suite + tcPathnamesList = reg_test_helper.getDirPathsWithPrefix( + ReplicaBasicTest.testSuiteAbsPathname, RegTestEnv.testCasePrefix) + tcPathnamesList.sort() + + kafkaRunClassBin = ReplicaBasicTest.testSuiteAbsPathname + "/bin/kafka-run-class.sh" + + # ============================================================= + # launch each testcase one by one: testcase_1, testcase_2, ... + # ============================================================= + for tcPathname in tcPathnamesList: + + tcResultsDict = {} + tcResultsDict["test_class_name"] = self.__class__.__name__ + tcResultsDict["test_case_name"] = "" + testcaseResultsDict = {} + tcResultsDict["test_case_results"] = testcaseResultsDict + regTestResultsList.append(tcResultsDict) + + if not os.path.exists(tcPathname + "/config"): + os.mkdir(tcPathname + "/config") + + if not os.path.exists(tcPathname + "/logs"): + os.mkdir(tcPathname + "/logs") + + # dictionary holding parent pids for corresponding entityId + entityPPidDict = {} + + # zk.connect str needed by producer + zkConnectStr = "" + + # parsing testcase properties json file + tcPropJsonPathname = reg_test_helper.getTestcasePropJsonPathname(tcPathname) + tcConfigsList = reg_test_helper.get_json_list_data(tcPropJsonPathname) + tcDirname = os.path.basename(tcPathname) + tcResultsDict["test_case_name"] = tcDirname + + # initialize the corresponding config / log file pathname into a dictionary + entityPropsDict = li_reg_test_helper.init_entity_props(self.clCfgList, tcConfigsList, tcPathname) + print + + # get the dictionary that contains the testcase arguments and description + tcDicts = reg_test_helper.get_json_dict_data(tcPropJsonPathname) + + # get testcase args + tcArgDict = tcDicts["testcase_args"] + bounceLeaderFlag = tcArgDict["bounce_leader"] + + # get the testcase description + tcDescription = "" + for k,v in tcDicts.items(): + if ( k == "description" ): tcDescription = v + + # display testcase name and arguments + self.rtLogger.info("======================================================") + self.rtLogger.info("Test Case : " + tcDirname) + self.rtLogger.info("======================================================") + for k,v in tcArgDict.items(): + self.rtLogger.info(" " + k + " : " + v) + self.rtLogger.info("======================================================") + print + self.rtLogger.info("Description : " + tcDescription + "\n") + + # cleanup logs at rmt hosts + li_reg_test_helper.cleanup_data_at_rmt_hosts(self.clCfgList, tcConfigsList, self.rtLogger) + + # generate properties files for zookeeper, kafka, producer, consumer + li_reg_test_helper.generate_overriden_props_files( ReplicaBasicTest.testSuiteAbsPathname, \ + tcPathname, self.clCfgList, tcConfigsList, self.rtLogger) + + # =================== + # FIXME: + # =================== + # if cluster is not local, copy properties files to remote machines + + time.sleep(5) + + # ========================== + # starting zookeepers + # ========================== + zkCfgList = reg_test_helper.getDictFromListOfDictsByKeyVal(self.clCfgList, "role", "zookeeper") + for zkCfg in zkCfgList: + zkConnectStr = li_reg_test_helper.start_zookeeper(self.clCfgList, tcConfigsList, zkCfg, \ + zkConnectStr, kafkaRunClassBin, tcPathname, entityPPidDict, self.rtLogger) + time.sleep(2) + + # ========================== + # start brokers + # ========================== + brokerCfgList = reg_test_helper.getDictFromListOfDictsByKeyVal(self.clCfgList, "role", "broker") + for brokerCfg in brokerCfgList: + li_reg_test_helper.start_broker(self.clCfgList, tcConfigsList, brokerCfg, \ + kafkaRunClassBin, tcPathname, entityPPidDict, self.rtLogger) + time.sleep(2) + + # ========================== + # create topic + # ========================== + firstZkCfg = zkCfgList[0] + firstZkHost = firstZkCfg["hostname"] + li_reg_test_helper.create_topic(self.clCfgList, tcConfigsList, zkConnectStr, tcPathname, \ + firstZkHost, tcArgDict, self.rtLogger) + print + time.sleep(5) + + # ========================== + # get leader + # ========================== + leaderDict = li_reg_test_helper.get_leader_from_broker_logs( + self.clCfgList, tcConfigsList, tcPathname + "/logs", \ + ReplicaBasicTest.isLeaderLogPattern, self.rtLogger) + + validateLeaderElectionResult = "" + + if ( len(leaderDict) > 0 ): + leaderBrokerId = leaderDict["brokerid"] + entityIdList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + tcConfigsList, "brokerid", leaderBrokerId, "entity_id") + leaderEntityId = entityIdList[0] + leaderPid = entityPPidDict[leaderEntityId] + hostname = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + self.clCfgList, "entity_id", leaderEntityId, "hostname") + self.rtLogger.info("found leader in entity [" + leaderEntityId + "] with brokerid [" + \ + leaderBrokerId + "] for partition [" + leaderDict["partition"] + "]") + + validateLeaderElectionResult = "PASSED" + else: + validateLeaderElectionResult = "FAILED" + testcaseResultsDict["Validate leader election successful"] = validateLeaderElectionResult + continue # skip this round as there is no leader available for testing + + testcaseResultsDict["Validate leader election successful"] = validateLeaderElectionResult + + time.sleep(2) + + # ========================== + # if bounce_leader is true + # => stop broker + # ========================== + if (bounceLeaderFlag.lower() == "true"): + self.rtLogger.info("stopping leader in entity [" + leaderEntityId + "] with brokerid [" + leaderBrokerId + "]") + reg_test_helper.stop_entity(leaderPid) + + self.rtLogger.info("sleeping for 10s") + time.sleep(10) + + # ========================== + # start producer_performance + # ========================== + li_reg_test_helper.start_producer_performance(self.clCfgList, tcConfigsList, \ + entityPropsDict["producer_cfg_pathname"], kafkaRunClassBin, \ + entityPropsDict["producer_log_pathname"], self.rtLogger) + time.sleep(2) + + # ========================== + # start console_consumer + # ========================== + li_reg_test_helper.start_console_consumer(self.clCfgList, tcConfigsList, \ + entityPropsDict["consumer_cfg_pathname"], kafkaRunClassBin, \ + entityPropsDict["consumer_log_pathname"], self.rtLogger) + + # FIXME: + # ========================== + # collect logs from remote hosts + # ========================== + # if all hosts are local, no need to call this + # li_reg_test_helper.collect_logs_from_rmt_hosts(self.clCfgList, tcPathname, self.rtLogger) + + # ========================== + # validate test results + # ========================== + dataMatched = li_reg_test_helper.validate_data_matched(tcPathname + "/logs", entityPropsDict, self.rtLogger) + dataMatchedTestResult = "" + if ( dataMatched ): + dataMatchedTestResult = "PASSED" + else: + dataMatchedTestResult = "FAILED" + self.rtLogger.info("## Validation for data matched: " + dataMatchedTestResult) + testcaseResultsDict["Validate for data matched"] = dataMatchedTestResult + print + + # ========================== + # stop all running processes + # ========================== + reg_test_helper.stop_entity_group(entityPPidDict, self.clCfgList, "broker") + reg_test_helper.stop_entity_group(entityPPidDict, self.clCfgList, "zookeeper") + li_reg_test_helper.stop_rmt_java_proc(self.clCfgList, self.rtLogger) + time.sleep(5) + Index: system_test/reg_test/suite_replication/bin/kafka-run-class.sh =================================================================== --- system_test/reg_test/suite_replication/bin/kafka-run-class.sh (revision 0) +++ system_test/reg_test/suite_replication/bin/kafka-run-class.sh (revision 0) @@ -0,0 +1,67 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ $# -lt 1 ]; +then + echo "USAGE: $0 classname [opts]" + exit 1 +fi + +base_dir=$(dirname $0)/.. +kafka_inst_dir=${base_dir}/../../.. + +for file in $kafka_inst_dir/project/boot/scala-2.8.0/lib/*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $kafka_inst_dir/core/target/scala_2.8.0/*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $kafka_inst_dir/core/lib/*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $kafka_inst_dir/perf/target/scala_2.8.0/kafka*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $kafka_inst_dir/core/lib_managed/scala_2.8.0/compile/*.jar; +do + if [ ${file##*/} != "sbt-launch.jar" ]; then + CLASSPATH=$CLASSPATH:$file + fi +done +if [ -z "$KAFKA_JMX_OPTS" ]; then + KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false " +fi +if [ -z "$KAFKA_OPTS" ]; then + KAFKA_OPTS="-Xmx512M -server -Dlog4j.configuration=file:$base_dir/config/log4j.properties" +fi +if [ $JMX_PORT ]; then + KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT " +fi +if [ -z "$JAVA_HOME" ]; then + JAVA="java" +else + JAVA="$JAVA_HOME/bin/java" +fi + +$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@ Property changes on: system_test/reg_test/suite_replication/bin/kafka-run-class.sh ___________________________________________________________________ Added: svn:executable + * Index: system_test/reg_test/suite_replication/testcase_1/testcase_1_properties.json =================================================================== --- system_test/reg_test/suite_replication/testcase_1/testcase_1_properties.json (revision 0) +++ system_test/reg_test/suite_replication/testcase_1/testcase_1_properties.json (revision 0) @@ -0,0 +1,62 @@ +{ + "description": "Basic test to produce and consume messages to a single topic partition. This test sends messages to n replicas and at the end verifies the log size and contents as well as using a consumer to verify no message loss. Optionally, the test bounces the leader periodically to introduce failures during the message replication.", + "testcase_args": { + "bounce_leader": "false", + "replica_factor": "3", + "num_partition": "2" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "brokerid": "1", + "log.file.size": "10240", + "log.dir": "/tmp/kafka_server_1_logs", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "brokerid": "2", + "log.file.size": "10240", + "log.dir": "/tmp/kafka_server_2_logs", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "brokerid": "3", + "log.file.size": "10240", + "log.dir": "/tmp/kafka_server_3_logs", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "topic": "test_1", + "threads": "5", + "compression-codec": "1", + "message-size": "500", + "message": "500", + "log_filename": "producer_performance.log", + "config_filename": "producer_performance.properties" + }, + { + "entity_id": "5", + "topic": "test_1", + "groupid": "mytestgroup", + "consumer-timeout-ms": "10000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer.properties" + } + ] +} Index: system_test/reg_test/suite_replication/testcase_2/testcase_2_properties.json =================================================================== --- system_test/reg_test/suite_replication/testcase_2/testcase_2_properties.json (revision 0) +++ system_test/reg_test/suite_replication/testcase_2/testcase_2_properties.json (revision 0) @@ -0,0 +1,62 @@ +{ + "description": "Basic test to produce and consume messages to a single topic partition. This test sends messages to n replicas and at the end verifies the log size and contents as well as using a consumer to verify no message loss. However, the leader will be terminated before producing messages to test the behavior of leader failure.", + "testcase_args": { + "bounce_leader": "true", + "replica_factor": "3", + "num_partition": "2" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "brokerid": "1", + "log.file.size": "10240", + "log.dir": "/tmp/kafka_server_1_logs", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "brokerid": "2", + "log.file.size": "10240", + "log.dir": "/tmp/kafka_server_2_logs", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "brokerid": "3", + "log.file.size": "10240", + "log.dir": "/tmp/kafka_server_3_logs", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "topic": "test_1", + "threads": "5", + "compression-codec": "1", + "message-size": "500", + "message": "500", + "log_filename": "producer_performance.log", + "config_filename": "producer_performance.properties" + }, + { + "entity_id": "5", + "topic": "test_1", + "groupid": "mytestgroup", + "consumer-timeout-ms": "10000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer.properties" + } + ] +} Index: system_test/reg_test/__init__.py =================================================================== --- system_test/reg_test/__init__.py (revision 0) +++ system_test/reg_test/__init__.py (revision 0) @@ -0,0 +1 @@ + Index: system_test/reg_test/lib/reg_test_helper.py =================================================================== --- system_test/reg_test/lib/reg_test_helper.py (revision 0) +++ system_test/reg_test/lib/reg_test_helper.py (revision 0) @@ -0,0 +1,234 @@ +#!/usr/bin/env python + +# =================================== +# file: reg_test_helper.py +# =================================== + +import inspect +import json +import logging +import os +import signal +import subprocess +import sys + +from reg_test_env import RegTestEnv +dlm = RegTestEnv.delimiter + +class RtLogging(): + def __init__(self, className): + self.className = className + + def getRtLogger(self): + logger = logging.getLogger(self.className) + logger.setLevel(logging.INFO) + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + #formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s (%(name)s)') + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + ch.setFormatter(formatter) + logger.addHandler(ch) + return logger + + +def getDictFromListOfDictsByKeyVal(listOfDicts, lookupKey, lookupVal): + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'} + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'} + # + # Usage: + # + # 1. getDataFromListOfDictsByKeyVal(self.clusterConfigsList, "entity_id", "0", "role") + # returns: + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'} + # + # 2. getDataFromListOfDictsByKeyVal(self.clusterConfigsList, None, None, "role") + # returns: + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'} + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'} + + retList = [] + if ( lookupVal is None or lookupKey is None ): + for d in listOfDicts: + for k,v in d.items(): + if ( k == fieldToRetrieve ): # match with fieldToRetrieve ONLY + retList.append( d ) + else: + for d in listOfDicts: + for k,v in d.items(): + if ( k == lookupKey and v == lookupVal ): # match with lookupKey and lookupVal + retList.append( d ) + + return retList + + +def getDataFromListOfDictsByKeyVal(listOfDicts, lookupKey, lookupVal, fieldToRetrieve): + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'} + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'} + # Usage: + # 1. getDataFromListOfDictsByKeyVal(self.clusterConfigsList, "entity_id", "0", "role") + # => returns ['zookeeper'] + # 2. getDataFromListOfDictsByKeyVal(self.clusterConfigsList, None, None, "role") + # => returns ['zookeeper', 'broker'] + + retList = [] + if ( lookupVal is None or lookupKey is None ): + for d in listOfDicts: + for k,v in d.items(): + if ( k == fieldToRetrieve ): # match with fieldToRetrieve ONLY + retList.append( d[fieldToRetrieve] ) + else: + for d in listOfDicts: + for k,v in d.items(): + if ( k == lookupKey and v == lookupVal ): # match with lookupKey and lookupVal + retList.append( d[fieldToRetrieve] ) + + return retList + + +def getDirPathsWithPrefix(fullPath, dirNamePrefix): + dirsList = [] + for dirName in os.listdir(fullPath): + if not os.path.isfile(dirName) and dirName.startswith(dirNamePrefix): + dirsList.append(os.path.abspath(fullPath + "/" + dirName)) + return dirsList + + +def getTestcasePropJsonPathname(tcPathname): + tcDirname = os.path.basename(tcPathname) + return tcPathname + "/" + tcDirname + "_properties.json" + + +def sys_call(cmd_str): + output = "" + p = subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in p.stdout.readlines(): + output += line + return output + + +def sys_call_return_subproc(cmd_str): + p = subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + return p + + +def async_sys_call(cmd_str): + subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + +def copy_file(source, destination): + inlines = open(source, "r") + fo = open(destination, "w") + for line in inlines: + print line, + fo.write(line) + fo.close() + + +def convert_keyval_to_cmd_args(cfgFilePathname): + cmdArg = "" + inlines = open(cfgFilePathname, "r").readlines() + for inline in inlines: + line = inline.rstrip() + tokens = line.split('=', 1) + + if (len(tokens) == 2): + cmdArg = cmdArg + " --" + tokens[0] + " " + tokens[1] + elif (len(tokens) == 1): + cmdArg = cmdArg + " --" + tokens[0] + else: + print "ERROR: unexpected arguments list", line + + return cmdArg + + +def getChildProcesses(pid): + pidStack = [] + currentPid = pid + parentPid = "" + pidStack.append(pid) + + while ( len(currentPid) > 0 ): + psCommand = subprocess.Popen("ps -o pid --ppid %s --noheaders" % currentPid, shell=True, stdout=subprocess.PIPE) + psOutput = psCommand.stdout.read() + #retcode = ps_command.wait() + #assert retcode == 0, "ps command returned %d" % retcode + outputLine = psOutput.rstrip('\n') + childPid = outputLine.lstrip() + + if ( len(childPid) > 0 ): + pidStack.append(childPid) + currentPid = childPid + else: + break + + return pidStack + + +def terminateProc(pidStack): + while ( len(pidStack) > 0 ): + pid = pidStack.pop() + try: + os.kill(int(pid), signal.SIGTERM) + except: + print "WARN - pid:",pid,"not found" + + +def stop_entity(pPid): + pidStack = getChildProcesses(pPid) + terminateProc(pidStack) + + +def stop_entity_group(entityPPidList, clCfgList, role): + entityIdList = getDataFromListOfDictsByKeyVal(clCfgList, "role", role, "entity_id") + for entityId in entityIdList: + ppid = entityPPidList[entityId] + pidStack = getChildProcesses(ppid) + terminateProc(pidStack) + + +# FIXME: +def get_json_data(infile, field): + json_file_str = open(infile, "r").read() + json_data = json.loads(json_file_str) + + if ( type(json_data) == list ): + print "list type" + elif ( type(json_data) == dict ): + print "dict type" + else: + print "other type" + + +def get_json_list_data(infile): + json_file_str = open(infile, "r").read() + json_data = json.loads(json_file_str) + data_list = [] + + for key,settings in json_data.items(): + if type(settings) == list: + for setting in settings: + if type(setting) == dict: + kv_dict = {} + for k,v in setting.items(): + kv_dict[k] = v + data_list.append(kv_dict) + + return data_list + +def get_json_dict_data(infile): + json_file_str = open(infile, "r").read() + json_data = json.loads(json_file_str) + data_dict = {} + + for key,val in json_data.items(): + if ( type(val) != list ): + data_dict[key] = val + + return data_dict + +def copy_remote_logs_to_local(clCfgList, tcCfgList, tcPathname, role): + print "FIXME: copy_remote_logs_to_local" + cmdStr = "scp " + host + ":" + tcPathname + "/logs/" + logFilename + " " + tcPathname + "/logs" + #sys_call(cmdStr) + + Index: system_test/reg_test/lib/__init__.py =================================================================== --- system_test/reg_test/lib/__init__.py (revision 0) +++ system_test/reg_test/lib/__init__.py (revision 0) @@ -0,0 +1 @@ + Index: system_test/reg_test/lib/li_reg_test_helper.py =================================================================== --- system_test/reg_test/lib/li_reg_test_helper.py (revision 0) +++ system_test/reg_test/lib/li_reg_test_helper.py (revision 0) @@ -0,0 +1,387 @@ +#!/usr/bin/env python + +# =================================== +# file: li_reg_test_helper.py +# =================================== + +import datetime +import inspect +import json +import logging +import os +import re +import subprocess +import sys +import time + +import reg_test_helper + +from reg_test_env import RegTestEnv +from reg_test_helper import RtLogging +from datetime import datetime +from time import mktime + + +def init_entity_props(clCfgList, tcCfgList, tcPathname): + entityPropsDict = {} + + # consumer config / log files location + consEntityIdList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + clCfgList, "role", "console_consumer", "entity_id") + consLogList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + tcCfgList, "entity_id", consEntityIdList[0], "log_filename") + consLogPathname = tcPathname + "/logs/" + consLogList[0] + consCfgList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + tcCfgList, "entity_id", consEntityIdList[0], "config_filename") + consCfgPathname = tcPathname + "/config/" + consCfgList[0] + + # producer config / log files location + prodEntityIdList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + clCfgList, "role", "producer_performance", "entity_id") + prodLogList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + tcCfgList, "entity_id", prodEntityIdList[0], "log_filename") + prodLogPathname = tcPathname + "/logs/" + prodLogList[0] + prodCfgList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + tcCfgList, "entity_id", prodEntityIdList[0], "config_filename") + prodCfgPathname = tcPathname + "/config/" + prodCfgList[0] + + entityPropsDict["consumer_log_pathname"] = consLogPathname + entityPropsDict["consumer_cfg_pathname"] = consCfgPathname + + entityPropsDict["producer_log_pathname"] = prodLogPathname + entityPropsDict["producer_cfg_pathname"] = prodCfgPathname + + return entityPropsDict + + +def get_message_id(logPathname): + logLines = open(logPathname, "r").readlines() + messageIdList = [] + + for line in logLines: + if not "MessageID" in line: + continue + else: + matchObj = re.match('.*MessageID:(.*?):', line) + messageIdList.append( matchObj.group(1) ) + + return messageIdList + + +def validate_data_matched(tcLogsDir, entityPropsDict, rtLogger): + consumerLogPathname = entityPropsDict["consumer_log_pathname"] + producerLogPathname = entityPropsDict["producer_log_pathname"] + msgIdMissingInConsumerLog = "msg_id_missing_in_consumer.log" + + prodMidList = get_message_id(producerLogPathname) + consMidList = get_message_id(consumerLogPathname) + prodMidSet = set(prodMidList) + consMidSet = set(consMidList) + + missingMidInConsumer = prodMidSet - consMidSet + + outfile = open(tcLogsDir + "/" + msgIdMissingInConsumerLog, "w") + for i in missingMidInConsumer: + outfile.write(i + "\n") + outfile.close() + + if ( len(missingMidInConsumer) == 0 ): + return True + + rtLogger.info("See " + tcLogsDir + "/" + msgIdMissingInConsumerLog + " for missing MessageID") + return False + + +def collect_logs_from_rmt_hosts(clCfgList, tcPathname, rtLogger): + for clCfg in clCfgList: + hostname = clCfg["hostname"] + role = clCfg["role"] + entityId = clCfg["entity_id"] + + rtLogger.info("collecting logs from host: " + hostname + " role: " +role) + + cmdStr = "scp " + hostname + ":" + tcPathname + "/logs/* " + tcPathname + "/logs" + #rtLogger.info("executing command: [" + cmdStr + "]") + #reg_test_helper.sys_call(cmdStr) + + +def cleanup_data_at_rmt_hosts(clCfgList, tcCfgList, rtLogger): + for clCfg in clCfgList: + hostname = clCfg["hostname"] + role = clCfg["role"] + entityId = clCfg["entity_id"] + + rtLogger.info("cleaning up log dir on host: " + hostname + " role: " +role) + + if ( role == 'zookeeper' ): + zkDataDirList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "dataDir") + cmdStr = "ssh " + hostname + " \"rm -r " + zkDataDirList[0] + "/*\"" + #rtLogger.info("executing command: [" + cmdStr + "]") + reg_test_helper.sys_call(cmdStr) + elif ( role == 'broker' ): + brokerDataDirList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "log.dir") + cmdStr = "ssh " + hostname + " \"rm -r " + brokerDataDirList[0] + "/*\"" + #rtLogger.info("executing command: [" + cmdStr + "]") + reg_test_helper.sys_call(cmdStr) + + +def stop_rmt_java_proc(clusterCfgList, rtLogger): + for clusterCfg in clusterCfgList: + hostname = clusterCfg["hostname"] + userid = os.getlogin() + rtLogger.info("stopping entity id [" + clusterCfg["entity_id"] + "] by [" + userid + "] in host: " + hostname) + + cmdStr = "ps auxw | grep " + userid + " | grep java | grep -v grep | grep -i kafka " + \ + " | tr -s ' ' | cut -f2 -d ' ' | xargs kill -9" + + #rtLogger.info("executing : " + cmdStr) + subproc = reg_test_helper.sys_call_return_subproc(cmdStr) + #for line in subproc.stdout.readlines(): + # print line + +def copy_file_with_dict_values(srcFile, destFile, dictObj): + infile = open(srcFile, "r") + inlines = infile.readlines() + infile.close() + + # print " => testcase config line : ", tcCfg + outfile = open(destFile, 'w') + + for line in inlines: + for key in dictObj.keys(): + if (line.startswith(key + "=")): + #sys.stdout.write(" ## from : [" + line + "]") + line = key + "=" + dictObj[key] + "\n" + #sys.stdout.write(" ## to : [" + line + "]") + outfile.write(line) + outfile.close() + + +def generate_overriden_props_files(testsuitePathname, tcPathname, clusterConfigsList, tcConfigsList, rtLogger): + rtLogger.info("calling generate_properties_files") + + cfgTemplatePathname = os.path.abspath(testsuitePathname + "/config") + cfgDestPathname = os.path.abspath(tcPathname + "/config") + rtLogger.info("config template (source) pathname : " + cfgTemplatePathname) + rtLogger.info("testcase config (dest) pathname : " + cfgDestPathname) + + # loop through all zookeepers (if more than 1) to retrieve host and clientPort + # to construct a zk.connect str for broker in the form of: + # zk.connect=:,: + zkConnectStr = "" + zkDictList = reg_test_helper.getDictFromListOfDictsByKeyVal(clusterConfigsList, "role", "zookeeper") + for zkDict in zkDictList: + entityID = zkDict["entity_id"] + hostname = zkDict["hostname"] + clientPortList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcConfigsList, "entity_id", entityID, "clientPort") + clientPort = clientPortList[0] + + if ( zkConnectStr.__len__() == 0 ): + zkConnectStr = hostname + ":" + clientPort + else: + zkConnectStr = zkConnectStr + "," + hostname + ":" + clientPort + print + + # for each entity in the cluster config + for clusterCfg in clusterConfigsList: + cl_entity_id = clusterCfg["entity_id"] + + for tcCfg in tcConfigsList: + if (tcCfg["entity_id"] == cl_entity_id): + + # copy the associated .properties template, update values, write to testcase_/config + + if ( clusterCfg["role"] == "broker" ): + tcCfg["zk.connect"] = zkConnectStr + copy_file_with_dict_values(cfgTemplatePathname + "/server.properties", \ + cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg) + + elif ( clusterCfg["role"] == "zookeeper"): + copy_file_with_dict_values(cfgTemplatePathname + "/zookeeper.properties", \ + cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg) + + elif ( clusterCfg["role"] == "producer_performance"): + tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr + copy_file_with_dict_values(cfgTemplatePathname + "/producer_performance.properties", \ + cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg) + + elif ( clusterCfg["role"] == "console_consumer"): + tcCfg["zookeeper"] = zkConnectStr + copy_file_with_dict_values(cfgTemplatePathname + "/console_consumer.properties", \ + cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg) + else: + print " => ", tcCfg + print "UNHANDLED key" + +def get_isleader_log_line(): + print "FIXME: get_isleader_log_line" + cmdStr = "ssh " + host + " \"grep -i -h 'is leader' " + tcPathname + "/logs/" + brokerLogFile + " sort | tail -1" + subproc = reg_test_helper.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + print line + +def copy_remote_logs_to_local(): + print "FIXME: copy_remote_logs_to_local" + cmdStr = "" + +# return the entity_id of the leader and the unix ts +def get_leader_from_broker_logs(clCfgList, tcCfgList, logPathname, isLeaderLogPattern, rtLogger): + rePattern = "\[(.*?)\] INFO Broker (.*?) completed the leader state transition for topic (.*?) partition (.*?) \(.*" + leaderDict = {} + + brokerCfgList = reg_test_helper.getDictFromListOfDictsByKeyVal(clCfgList, "role", "broker") + for brokerCfg in brokerCfgList: + host = brokerCfg["hostname"] + entityId = brokerCfg["entity_id"] + bkrLogList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "log_filename") + cmdStr = "ssh " + host + " \"grep -i -h '" + isLeaderLogPattern + "' " + \ + logPathname + "/" + bkrLogList[0] + "\" 2> /dev/null" + + #rtLogger.info("executing cmd: [" + cmdStr + "]") + subproc = reg_test_helper.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + if isLeaderLogPattern in line: + #rtLogger.info("found the broker log line => " + line) + matchObj = re.match(rePattern, line) + datetimeStr = matchObj.group(1) + datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f") + unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond + #print "{0:.3f}".format(unixTs) + leaderDict["timestamp"] = unixTs + leaderDict["brokerid"] = matchObj.group(2) + leaderDict["topic"] = matchObj.group(3) + leaderDict["partition"] = matchObj.group(4) + else: + rtLogger.error("unable to find string pattern [" + isLeaderLogPattern + "]") + + return leaderDict + +def start_producer_performance(clCfgList, tcCfgList, prodCfgPathname, kafkaRunClassBin, prodLogPathname, rtLogger): + prodPerfCfgList = reg_test_helper.getDictFromListOfDictsByKeyVal(clCfgList, "role", "producer_performance") + for prodPerfCfg in prodPerfCfgList: + host = prodPerfCfg["hostname"] + entityId = prodPerfCfg["entity_id"] + kafkaHomeList = \ + reg_test_helper.getDataFromListOfDictsByKeyVal(clCfgList, "entity_id", entityId, "kafka_home") + kafkaHome = kafkaHomeList[0] + tcProdPerfCfg = reg_test_helper.getDictFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId) + + rtLogger.info("starting producer preformance") + + commandArgs = reg_test_helper.convert_keyval_to_cmd_args(prodCfgPathname) + prodPerfCmdStr = \ + "ssh " + host + " \"" + kafkaRunClassBin + " kafka.perf.ProducerPerformance " + \ + commandArgs + " &> " + prodLogPathname + "\"" + + #rtLogger.info("executing command: [" + prodPerfCmdStr + "]") + reg_test_helper.sys_call(prodPerfCmdStr) + + +def start_console_consumer(clCfgList, tcCfgList, consCfgPathname, kafkaRunClassBin, consLogPathname, rtLogger): + consumerCfgList = reg_test_helper.getDictFromListOfDictsByKeyVal(clCfgList, "role", "console_consumer") + for consumerCfg in consumerCfgList: + host = consumerCfg["hostname"] + entityId = consumerCfg["entity_id"] + kafkaHomeList = \ + reg_test_helper.getDataFromListOfDictsByKeyVal(clCfgList, "entity_id", entityId, "kafka_home") + kafkaHome = kafkaHomeList[0] + tcConsumerCfg = reg_test_helper.getDictFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId) + + rtLogger.info("starting console consumer") + + commandArgs = reg_test_helper.convert_keyval_to_cmd_args(consCfgPathname) + consumerCmdStr = \ + "ssh " + host + " \"" + kafkaRunClassBin + " kafka.consumer.ConsoleConsumer " + \ + commandArgs + " &> " + consLogPathname + "\"" + + #rtLogger.info("executing command: [" + consumerCmdStr + "]") + reg_test_helper.sys_call(consumerCmdStr) + + +def start_broker(clCfgList, tcCfgList, brokerCfg, kafkaRunClassBin, tcPathname, entityPPidDict, rtLogger): + host = brokerCfg["hostname"] + entityId = brokerCfg["entity_id"] + brokeridList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "brokerid") + kafkaHomeList = reg_test_helper.getDataFromListOfDictsByKeyVal(clCfgList, "entity_id", entityId, "kafka_home") + + rtLogger.info("starting broker in host [" + host + "] with entity id [" + entityId + "]") + + # broker config / log files location + brokerLogList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "log_filename") + brokerLogPathname = tcPathname + "/logs/" + brokerLogList[0] + brokerCfgFileList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "config_filename") + brokerCfgPathname = tcPathname + "/config/" + brokerCfgFileList[0] + + brokerCmdStr = "ssh " + host + " '" + kafkaRunClassBin + " kafka.Kafka " + brokerCfgPathname + \ + " &> " + brokerLogPathname + " & echo pid:$! > " + tcPathname + "/logs/entity_" + entityId + "_pid'" + + #rtLogger.info("executing command: [" + brokerCmdStr + "]") + reg_test_helper.async_sys_call(brokerCmdStr) + time.sleep(2) + + pidCmdStr = "ssh " + host + " 'cat " + tcPathname + "/logs/entity_" + entityId + "_pid'" + #rtLogger.info("executing command: [" + pidCmdStr + "]") + subproc = reg_test_helper.sys_call_return_subproc(pidCmdStr) + + for line in subproc.stdout.readlines(): + if line.startswith("pid"): + line = line.rstrip('\n') + tokens = line.split(':') + entityPPidDict[entityId] = tokens[1] + + +def start_zookeeper(clCfgList, tcCfgList, zkCfg, zkConnectStr, kafkaRunClassBin, tcPathname, entityPPidDict, rtLogger): + host = zkCfg["hostname"] + entityId = zkCfg["entity_id"] + clientPortList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "clientPort") + kafkaHomeList = reg_test_helper.getDataFromListOfDictsByKeyVal(clCfgList, "entity_id", entityId, "kafka_home") + + rtLogger.info("starting zookeeper in host [" + host + "] on client port [" + clientPortList[0] + "]") + + # construct zk.connect str + if ( len(zkConnectStr) > 0 ): + zkConnectStr = zkConnectStr + "," + host + ":" + clientPortList[0] + else: + zkConnectStr = host + ":" + clientPortList[0] + + # zk config / log files location + zkLogList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "log_filename") + zkLogPathname = tcPathname + "/logs/" + zkLogList[0] + zkCfgFileList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "config_filename") + zkCfgPathname = tcPathname + "/config/" + zkCfgFileList[0] + + zkCmdStr = "ssh " + host + " '" + kafkaHomeList[0] + "/bin/zookeeper-server-start.sh " + \ + zkCfgPathname + " &> " + zkLogPathname + " & echo pid:$! > " + tcPathname + \ + "/logs/entity_" + entityId + "_pid'" + #rtLogger.info("executing command: [" + zkCmdStr + "]") + reg_test_helper.async_sys_call(zkCmdStr) + time.sleep(2) + + pidCmdStr = "ssh " + host + " 'cat " + tcPathname + "/logs/entity_" + entityId + "_pid'" + #rtLogger.info("executing command: [" + pidCmdStr + "]") + subproc = reg_test_helper.sys_call_return_subproc(pidCmdStr) + + for line in subproc.stdout.readlines(): + if line.startswith("pid"): + line = line.rstrip('\n') + tokens = line.split(':') + entityPPidDict[entityId] = tokens[1] + + return zkConnectStr + + +def create_topic(clCfgList, tcCfgList, zkConnectStr, tcPathname, zkHost, tcArgDict, rtLogger): + prodPerfCfgList = reg_test_helper.getDictFromListOfDictsByKeyVal(clCfgList, "role", "producer_performance") + prodPerfCfgDict = reg_test_helper.getDictFromListOfDictsByKeyVal(tcCfgList, "entity_id", prodPerfCfgList[0]["entity_id"]) + prodTopicList = prodPerfCfgDict[0]["topic"].split(',') + createTopicBin = RegTestEnv.regTestBaseDir + "/../../bin/kafka-create-topic.sh" + + for topic in prodTopicList: + rtLogger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]") + cmdStr = "ssh " + zkHost + " \"" + createTopicBin + " --topic " + topic + " --zookeeper " + zkConnectStr + \ + " --replica " + tcArgDict["replica_factor"] + " --partition " + tcArgDict["num_partition"] + " &> " + \ + tcPathname + "/logs/create_topic.log" + "\"" + #rtLogger.info("executing command: [" + cmdStr + "]") + subproc = reg_test_helper.sys_call_return_subproc(cmdStr) + + Index: system_test/reg_test/bin/__init__.py =================================================================== --- system_test/reg_test/bin/__init__.py (revision 0) +++ system_test/reg_test/bin/__init__.py (revision 0) @@ -0,0 +1 @@ + Index: system_test/reg_test/bin/reg_test_env.py =================================================================== --- system_test/reg_test/bin/reg_test_env.py (revision 0) +++ system_test/reg_test/bin/reg_test_env.py (revision 0) @@ -0,0 +1,44 @@ +#!/usr/bin/env python + +# file: reg_test_env.py + +"""Define regression test framework environments""" + +import inspect +import json +import os +import subprocess +import sys + +class RegTestEnv(): + """Define a class""" + + cwdFullPath = os.getcwd() + thisScriptFullPathname = os.path.realpath(__file__) + thisScriptFullPath = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]))) + + regTestBaseDir = thisScriptFullPath + "/.." + regTestBinDir = thisScriptFullPath + "/../bin" + regTestLibDir = thisScriptFullPath + "/../lib" + + delimiter = "~" + testSuitePrefix = "suite_" + testCasePrefix = "testcase_" + testModuleExt = ".py" + clusterCfgFilename = "cluster_config.json" + + regTestResultsList = [] + + def __init__(self): + "Create an object with this regression test session environment" + self.regTestBaseDir = os.getcwd() + + def __str__(self): + return '(regTestBaseDir: ' + RegTestEnv.regTestBaseDir + \ + ', regTestBinDir: ' + RegTestEnv.regTestBinDir + \ + ', regTestLibDir: ' + RegTestEnv.regTestLibDir + ')' + + def printEnv(self): + print "regTestBaseDir : ", RegTestEnv.regTestBaseDir + print "regTestBinDir : ", RegTestEnv.regTestBinDir + print "regTestLibDir : ", RegTestEnv.regTestLibDir Index: system_test/reg_test/bin/reg_test_driver.py =================================================================== --- system_test/reg_test/bin/reg_test_driver.py (revision 0) +++ system_test/reg_test/bin/reg_test_driver.py (revision 0) @@ -0,0 +1,113 @@ +#!/usr/bin/env python + +# =================================== +# file: run_reg_test.py +# =================================== + +import logging +import os +import sys + +from reg_test_env import RegTestEnv +sys.path.append(RegTestEnv.regTestLibDir) +import reg_test_helper +from reg_test_helper import RtLogging + +rtLogging = RtLogging("RegTest") +rtLogger = rtLogging.getRtLogger() + +class RegTest(RegTestEnv): + def __init__(self, testSuiteName, testModuleName, testSuiteClassName, clusterConfigsList, regTestResultsList): + self.name = testSuiteName + sys.path.append(RegTestEnv.regTestBaseDir + "/" + testSuiteName + "/bin") + + # dynamically loading a module and starting the test class + mod = __import__(testModuleName) + theClass = getattr(mod, testSuiteClassName) + instance = theClass(clusterConfigsList) + instance.runTest(regTestResultsList) + +def main(): + suiteClassDictList = [] + clusterPropJsonPathname = os.path.abspath(RegTestEnv.regTestBaseDir + "/" + RegTestEnv.clusterCfgFilename) + clusterConfigsList = reg_test_helper.get_json_list_data(clusterPropJsonPathname) + print + rtLogger.info("===============================================================") + rtLogger.info("#### SYSTEM REGRESSION TEST FRAMEWORK IN PYTHON ####") + rtLogger.info("===============================================================") + print + rtLogger.info("looking for test modules ...") + + # find all test suites in Reg Test Base Dir + for suiteName in os.listdir(RegTestEnv.regTestBaseDir): + + # make sure this is a valid testsuite directory + if os.path.isdir(suiteName) and suiteName.startswith(RegTestEnv.testSuitePrefix): + + # found a test suite + rtLogger.info("found a test suite : " + suiteName) + + # put all test modules and corresponding test class name into a list + testModBaseDir = RegTestEnv.regTestBaseDir + "/" + suiteName + "/bin" + + # go through all test module file + for moduleFileName in os.listdir(testModBaseDir): + + # make sure it is a valid test module + if moduleFileName.endswith(RegTestEnv.testModuleExt) and not moduleFileName.startswith("__"): + + # found a test module file + rtLogger.info("found a test module file : " + moduleFileName) + + testModPathName = testModBaseDir + "/" + moduleFileName + testModClassName = reg_test_helper.sys_call("grep ^class " + testModPathName + \ + " | sed 's/^class //g' | sed 's/():.*//g'") + testModClassName = testModClassName.rstrip("\n") + + # collect the test suite class data + suiteClassDict = {} + suiteClassDict["suite"] = suiteName + extLenToRemove = RegTestEnv.testModuleExt.__len__() * -1 + suiteClassDict["module"] = moduleFileName[:extLenToRemove] + suiteClassDict["class"] = testModClassName + suiteClassDictList.append(suiteClassDict) + print + + # FIXME: + # copy new testing build to remote hosts + + # loop through suiteClassDictList and start the test class one by one + for suiteClassDict in suiteClassDictList: + + suiteName = suiteClassDict["suite"] + modName = suiteClassDict["module"] + className = suiteClassDict["class"] + + rtLogger.info("===============================================================") + rtLogger.info("Running Test for : ") + rtLogger.info(" suite : " + suiteName) + rtLogger.info(" module : " + modName) + rtLogger.info(" class : " + className) + rtLogger.info("===============================================================") + + # launch the test module class + rt = RegTest(suiteName, modName, className, clusterConfigsList, RegTestEnv.regTestResultsList) + print + + # report test results + rtLogger.info("==============================================") + rtLogger.info(" TEST REPORTS") + rtLogger.info("==============================================") + for testResult in RegTestEnv.regTestResultsList: + rtLogger.info("test class : " + testResult["test_class_name"]) + rtLogger.info("test case : " + testResult["test_case_name"]) + testCaseResults = testResult["test_case_results"] + for vldItem, vldResult in testCaseResults.items(): + rtLogger.info(" " + vldItem + " : " + vldResult) + print +# ========================= +# main entry point +# ========================= +main() + + Index: system_test/reg_test/README.txt =================================================================== --- system_test/reg_test/README.txt (revision 0) +++ system_test/reg_test/README.txt (revision 0) @@ -0,0 +1,80 @@ +# ========================== +# README.txt for reg_test +# ========================== + + +# ========================== +# Overview +# ========================== + +"reg_test" is a regression test framework intended for the automation of system / integration testing of data platform software such as Kafka. The test framework is implemented in Python which is a popular scripting language with well supported features. + +The framework has the following levels: + +1. The first level is generic and does not depend on any product specific details. + location: system_test/reg_test/bin + a. reg_test_driver.py - It implements the main class RegTest as an entry point. + b. reg_test_env.py - It implements the class RegTestEnv which defines the testing environment of a test session such as the base directory and environment variables specific to the local machine. + +2. The second level defines a suite of testing such as Kafka's replication (including basic testing, failure testing, ... etc) + location: system_test/reg_test/*. + + * Please note the test framework will look for a specific prefix of the directories under reg_test to determine what test suites are available. The prefix of can be defined in RegTestEnv class (reg_test_env.py) + + a. bin/replica_basic_test.py - This is a test module file. It implements the test logic for basic replication testing as follows: + + i. start zookeepers + ii. start brokers + iii. create kafka topics + iv. lookup the brokerid as a leader + v. terminate the leader (if defined in the testcase config json file) + vi. start producer to send n messages + vii. start consumer to receive messages + viii. validate if there is data loss + + b. bin/kafka-run-class.sh - This is needed to run the producer and consumer with customized log4j settings (located at reg_test//config/log4j.properties) + + c. config/ - This config directory provides templates for all properties files needed for zookeeper, brokers, producer and consumer (any changes in the files under this directory would be reflected or overwritten by the settings under testcase_/testcase__properties.json) + + d. testcase_** - The testcase directory contains the testcase argument definition file: testcase_1_properties.json. This file defines the specific configurations for the testcase such as the followings (eg. producer related): + i. no. of producer threads + ii. no. of messages to produce + iii. zkconnect string + + When this test case is being run, the test framework will copy and update the template properties files to testcase_/config. The logs of various components will be saved in testcase_/logs + + ** Please note the test framework will look for a specific prefix of the directories under reg_test// to determine what test cases are available. The prefix of can be defined in RegTestEnv class (reg_test_env.py) + +# ========================== +# Quick Start +# ========================== + +* Please note that the following commands should be executed after downloading the kafka source code to build all the required binaries: + 1. / $ ./sbt update + 2. / $ ./sbt package + + Now you are ready to follow the steps below. + 1. Update reg_test/cluster_config.json for "kafka_home" & "java_home" specific to your environment + 2. To run the test, go to /system_test/reg_test and run the following command: + $ python -B bin/reg_test_driver.py + + +# ========================== +# Adding Test Case +# ========================== + +To create a new test suite called "suite_broker", please do the followings: + + 1. Copy and paste reg_test/suite_replication => reg_test/suite_broker + 2. Rename reg_test/suite_broker/bin/replica_basic_test.py => reg_test/suite_broker/bin/broker_basic_test.py + 3. Edit reg_test/suite_broker/bin/broker_basic_test.py and update all ReplicaBasicTest related class name to BrokerBasicTest (for example) + 4. Follow the flow of reg_test/suite_broker/bin/broker_basic_test.py and modify the necessary test logic accordingly. + + +To create a new test case under "suite_replication", please do the followings: + + 1. Copy and paste reg_test/suite_replication/testcase_2 => reg_test/suite_replication/testcase_3 + 2. Rename reg_test/suite_replication/testcase_3/testcase_2_properties.json => reg_test/suite_replication/testcase_3/testcase_3_properties.json + 3. Update reg_test/suite_replication/testcase_3/testcase_3_properties.json with the corresponding settings for testcase 3. + + Index: system_test/reg_test/cluster_config.json =================================================================== --- system_test/reg_test/cluster_config.json (revision 0) +++ system_test/reg_test/cluster_config.json (revision 0) @@ -0,0 +1,46 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "kafka_home": "/home/kafka_0.8", + "java_home": "/export/apps/jdk/JDK-1_6_0_27" + }, + { + "entity_id": "1", + "hostname": "localhost", + "role": "broker", + "kafka_home": "/home/kafka_0.8", + "java_home": "/export/apps/jdk/JDK-1_6_0_27" + }, + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "kafka_home": "/home/kafka_0.8", + "java_home": "/export/apps/jdk/JDK-1_6_0_27" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "broker", + "kafka_home": "/home/kafka_0.8", + "java_home": "/export/apps/jdk/JDK-1_6_0_27" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "producer_performance", + "kafka_home": "/home/kafka_0.8", + "java_home": "/export/apps/jdk/JDK-1_6_0_27" + }, + { + "entity_id": "5", + "hostname": "localhost", + "role": "console_consumer", + "kafka_home": "/home/kafka_0.8", + "java_home": "/export/apps/jdk/JDK-1_6_0_27" + } + ] +}