Index: reg_test/suite_replication/config/producer_performance.properties =================================================================== --- reg_test/suite_replication/config/producer_performance.properties (revision 0) +++ reg_test/suite_replication/config/producer_performance.properties (revision 0) @@ -0,0 +1,7 @@ +brokerinfo=zk.connect=localhost:2181 +topic=mytest +messages=200 +message-size=100 +thread=5 +initial-message-id=0 +compression-codec=0 Index: reg_test/suite_replication/config/server.properties =================================================================== --- reg_test/suite_replication/config/server.properties (revision 0) +++ reg_test/suite_replication/config/server.properties (revision 0) @@ -0,0 +1,120 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +brokerid=0 + +# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned +# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost +# may not be what you want. +#hostname= + + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9091 + +# The number of threads handling network requests +network.threads=2 + +# The number of threads doing disk I/O +io.threads=2 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +max.socket.request.bytes=104857600 + + +############################# Log Basics ############################# + +# The directory under which to store log files +log.dir=/tmp/kafka_server_logs + +# The number of logical partitions per topic per server. More partitions allow greater parallelism +# for consumption, but also mean more files. +num.partitions=5 + +# Overrides for for the default given by num.partitions on a per-topic basis +#topic.partition.count.map=topic1:3, topic2:4 + +############################# Log Flush Policy ############################# + +# The following configurations control the flush of data to disk. This is the most +# important performance knob in kafka. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. +# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). +# 3. Throughput: The flush is generally the most expensive operation. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +log.default.flush.interval.ms=1000 + +# Per-topic overrides for log.default.flush.interval.ms +#topic.flush.intervals.ms=topic1:1000, topic2:3000 + +# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. +log.default.flush.scheduler.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.size. +#log.retention.size=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +#log.file.size=536870912 +#log.file.size=102400 +log.file.size=128 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.cleanup.interval.mins=1 + +############################# Zookeeper ############################# + +# Enable connecting to zookeeper +enable.zookeeper=true + +# Zk connection string (see zk docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zk.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 Index: reg_test/suite_replication/config/consumer.properties =================================================================== --- reg_test/suite_replication/config/consumer.properties (revision 0) +++ reg_test/suite_replication/config/consumer.properties (revision 0) @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.consumer.ConsumerConfig for more details + +# zk connection string +# comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" +zk.connect=127.0.0.1:2181 + +# timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 + +#consumer group id +groupid=test-consumer-group + +#consumer timeout +#consumer.timeout.ms=5000 Index: reg_test/suite_replication/config/console_consumer.properties =================================================================== --- reg_test/suite_replication/config/console_consumer.properties (revision 0) +++ reg_test/suite_replication/config/console_consumer.properties (revision 0) @@ -0,0 +1,4 @@ +zookeeper=ela4-app0996.prod:2181 +topic=test_1 +from-beginning +consumer-timeout-ms=10000 Index: reg_test/suite_replication/config/log4j.properties =================================================================== --- reg_test/suite_replication/config/log4j.properties (revision 0) +++ reg_test/suite_replication/config/log4j.properties (revision 0) @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +#log4j.appender.fileAppender=org.apache.log4j.FileAppender +#log4j.appender.fileAppender.File=kafka-request.log +#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout +#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n + + +# Turn on all our debugging info +#log4j.logger.kafka=INFO +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG + +# to print message checksum from ProducerPerformance +log4j.logger.kafka.perf=DEBUG +log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG + +# to print message checksum from ProducerPerformance +log4j.logger.kafka.perf=DEBUG +log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG + Index: reg_test/suite_replication/config/producer.properties =================================================================== --- reg_test/suite_replication/config/producer.properties (revision 0) +++ reg_test/suite_replication/config/producer.properties (revision 0) @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.producer.ProducerConfig for more details + +############################# Producer Basics ############################# + +# need to set either broker.list or zk.connect + +# configure brokers statically +# format: brokerid1:host1:port1,brokerid2:host2:port2 ... +#broker.list=0:localhost:9092 + +# discover brokers from ZK +zk.connect=localhost:2181 + +# zookeeper session timeout; default is 6000 +#zk.sessiontimeout.ms= + +# the max time that the client waits to establish a connection to zookeeper; default is 6000 +#zk.connectiontimeout.ms + +# name of the partitioner class for partitioning events; default partition spreads data randomly +#partitioner.class= + +# specifies whether the messages are sent asynchronously (async) or synchronously (sync) +producer.type=sync + +# specify the compression codec for all data generated: 0: no compression, 1: gzip +compression.codec=0 + +# message encoder +serializer.class=kafka.serializer.StringEncoder + +# allow topic level compression +#compressed.topics= + +# max message size; messages larger than that size are discarded; default is 1000000 +#max.message.size= + + +############################# Async Producer ############################# +# maximum time, in milliseconds, for buffering data on the producer queue +#queue.time= + +# the maximum size of the blocking queue for buffering on the producer +#queue.size= + +# Timeout for event enqueue: +# 0: events will be enqueued immediately or dropped if the queue is full +# -ve: enqueue will block indefinitely if the queue is full +# +ve: enqueue will block up to this many milliseconds if the queue is full +#queue.enqueueTimeout.ms= + +# the number of messages batched at the producer +#batch.size= + +# the callback handler for one or multiple events +#callback.handler= + +# properties required to initialize the callback handler +#callback.handler.props= + +# the handler for events +#event.handler= + +# properties required to initialize the event handler +#event.handler.props= + Index: reg_test/suite_replication/config/zookeeper.properties =================================================================== --- reg_test/suite_replication/config/zookeeper.properties (revision 0) +++ reg_test/suite_replication/config/zookeeper.properties (revision 0) @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=/tmp/zookeeper +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 Index: reg_test/suite_replication/__init__.py =================================================================== --- reg_test/suite_replication/__init__.py (revision 0) +++ reg_test/suite_replication/__init__.py (revision 0) @@ -0,0 +1 @@ + Index: reg_test/suite_replication/bin/__init__.py =================================================================== --- reg_test/suite_replication/bin/__init__.py (revision 0) +++ reg_test/suite_replication/bin/__init__.py (revision 0) @@ -0,0 +1 @@ + Index: reg_test/suite_replication/bin/replica_basic_test.py =================================================================== --- reg_test/suite_replication/bin/replica_basic_test.py (revision 0) +++ reg_test/suite_replication/bin/replica_basic_test.py (revision 0) @@ -0,0 +1,192 @@ +#!/usr/bin/env python + +# =================================== +# file: replica_basic_test.py +# =================================== + +import inspect +import os +import subprocess +import sys +import time + +from reg_test_env import RegTestEnv +sys.path.append(RegTestEnv.regTestLibDir) +import reg_test_helper +import li_reg_test_helper +from reg_test_helper import RtLogging + +class ReplicaBasicTest(): + testModuleAbsPathname = os.path.realpath(__file__) + testSuiteAbsPathname = os.path.abspath(os.path.dirname(testModuleAbsPathname) + "/..") + isLeaderLogPattern = "completed the leader state transition" + + def __init__(self, clusterConfigsList): + self.clCfgList = clusterConfigsList + self.rtLogging = RtLogging("ReplicaBasicTest") + self.rtLogger = self.rtLogging.getRtLogger() + + def runTest(self): + # get all test cases under this test suite + tcPathnamesList = reg_test_helper.getDirPathsWithPrefix( + ReplicaBasicTest.testSuiteAbsPathname, RegTestEnv.testCasePrefix) + tcPathnamesList.sort() + + kafkaRunClassBin = ReplicaBasicTest.testSuiteAbsPathname + "/bin/kafka-run-class.sh" + + # ============================================================= + # launch each testcase one by one: testcase_1, testcase_2, ... + # ============================================================= + for tcPathname in tcPathnamesList: + + if not os.path.exists(tcPathname + "/config"): + os.mkdir(tcPathname + "/config") + + if not os.path.exists(tcPathname + "/logs"): + os.mkdir(tcPathname + "/logs") + + # dictionary holding parent pids for corresponding entityId + entityPPidDict = {} + + # zk.connect str needed by producer + zkConnectStr = "" + + # parsing testcase properties json file + tcPropJsonPathname = reg_test_helper.getTestcasePropJsonPathname(tcPathname) + tcConfigsList = reg_test_helper.get_json_list_data(tcPropJsonPathname) + tcDirname = os.path.basename(tcPathname) + + # initialize the corresponding config / log file pathname into a dictionary + entityPropsDict = li_reg_test_helper.init_entity_props(self.clCfgList, tcConfigsList, tcPathname) + print + + # get the dictionary that contains the testcase arguments and description + tcDicts = reg_test_helper.get_json_dict_data(tcPropJsonPathname) + + # get testcase args + tcArgDict = tcDicts["testcase_args"] + bounceLeaderFlag = tcArgDict["bounce_leader"] + + # get the testcase description + tcDescription = "" + for k,v in tcDicts.items(): + if ( k == "description" ): tcDescription = v + + # display testcase name and arguments + self.rtLogger.info("======================================================") + self.rtLogger.info("Test Case : " + tcDirname) + self.rtLogger.info("======================================================") + for k,v in tcArgDict.items(): + self.rtLogger.info(" " + k + " : " + v) + self.rtLogger.info("======================================================") + print + self.rtLogger.info("Description : " + tcDescription + "\n") + + # cleanup logs at rmt hosts + li_reg_test_helper.cleanup_data_at_rmt_hosts(self.clCfgList, tcConfigsList, self.rtLogger) + + # generate properties files for zookeeper, kafka, producer, consumer + li_reg_test_helper.generate_overriden_props_files( ReplicaBasicTest.testSuiteAbsPathname, \ + tcPathname, self.clCfgList, tcConfigsList, self.rtLogger) + + # =================== + # FIXME: + # =================== + # if cluster is not local, copy properties files to remote machines + + time.sleep(5) + + # ========================== + # starting zookeepers + # ========================== + zkCfgList = reg_test_helper.getDictFromListOfDictsByKeyVal(self.clCfgList, "role", "zookeeper") + for zkCfg in zkCfgList: + zkConnectStr = li_reg_test_helper.start_zookeeper(self.clCfgList, tcConfigsList, zkCfg, \ + zkConnectStr, kafkaRunClassBin, tcPathname, entityPPidDict, self.rtLogger) + time.sleep(2) + + # ========================== + # start brokers + # ========================== + brokerCfgList = reg_test_helper.getDictFromListOfDictsByKeyVal(self.clCfgList, "role", "broker") + for brokerCfg in brokerCfgList: + li_reg_test_helper.start_broker(self.clCfgList, tcConfigsList, brokerCfg, \ + kafkaRunClassBin, tcPathname, entityPPidDict, self.rtLogger) + time.sleep(2) + + # ========================== + # create topic + # ========================== + li_reg_test_helper.create_topic(self.clCfgList, tcConfigsList, zkConnectStr, tcPathname, self.rtLogger) + print + time.sleep(5) + + # ========================== + # get leader + # ========================== + leaderDict = li_reg_test_helper.get_leader_from_broker_logs( + self.clCfgList, tcConfigsList, tcPathname + "/logs", \ + ReplicaBasicTest.isLeaderLogPattern, self.rtLogger) + + leaderBrokerId = leaderDict["brokerid"] + entityIdList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + tcConfigsList, "brokerid", leaderBrokerId, "entity_id") + leaderEntityId = entityIdList[0] + leaderPid = entityPPidDict[leaderEntityId] + hostname = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + self.clCfgList, "entity_id", leaderEntityId, "hostname") + self.rtLogger.info("found leader in entity [" + leaderEntityId + "] with brokerid [" + \ + leaderBrokerId + "] for partition [" + leaderDict["partition"] + "]") + time.sleep(2) + + # ========================== + # if bounce_leader is true + # => stop broker + # ========================== + if (bounceLeaderFlag.lower() == "true"): + self.rtLogger.info("stopping leader in entity [" + leaderEntityId + "] with brokerid [" + leaderBrokerId + "]") + reg_test_helper.stop_entity(leaderPid) + + self.rtLogger.info("sleeping for 10s") + time.sleep(10) + + # ========================== + # start producer_performance + # ========================== + li_reg_test_helper.start_producer_performance(self.clCfgList, tcConfigsList, \ + entityPropsDict["producer_cfg_pathname"], kafkaRunClassBin, \ + entityPropsDict["producer_log_pathname"], self.rtLogger) + time.sleep(2) + + # ========================== + # start console_consumer + # ========================== + li_reg_test_helper.start_console_consumer(self.clCfgList, tcConfigsList, \ + entityPropsDict["consumer_cfg_pathname"], kafkaRunClassBin, \ + entityPropsDict["consumer_log_pathname"], self.rtLogger) + + # FIXME: + # ========================== + # collect logs from remote hosts + # ========================== + # if all hosts are local, no need to call this + # li_reg_test_helper.collect_logs_from_rmt_hosts(self.clCfgList, tcPathname, self.rtLogger) + + # ========================== + # validate test results + # ========================== + dataMatched = li_reg_test_helper.validate_data_matched(tcPathname + "/logs", entityPropsDict, self.rtLogger) + if ( dataMatched ): + self.rtLogger.info("## Validation for data matched: PASSED") + else: + self.rtLogger.info("## Validation for data matched: FAILED") + print + + # ========================== + # stop all running processes + # ========================== + reg_test_helper.stop_entity_group(entityPPidDict, self.clCfgList, "broker") + reg_test_helper.stop_entity_group(entityPPidDict, self.clCfgList, "zookeeper") + li_reg_test_helper.stop_rmt_java_proc(self.clCfgList, self.rtLogger) + time.sleep(5) + Index: reg_test/suite_replication/bin/kafka-run-class.sh =================================================================== --- reg_test/suite_replication/bin/kafka-run-class.sh (revision 0) +++ reg_test/suite_replication/bin/kafka-run-class.sh (revision 0) @@ -0,0 +1,67 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ $# -lt 1 ]; +then + echo "USAGE: $0 classname [opts]" + exit 1 +fi + +base_dir=$(dirname $0)/.. +kafka_inst_dir=${base_dir}/../.. + +for file in $kafka_inst_dir/project/boot/scala-2.8.0/lib/*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $kafka_inst_dir/core/target/scala_2.8.0/*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $kafka_inst_dir/core/lib/*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $kafka_inst_dir/perf/target/scala_2.8.0/kafka*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $kafka_inst_dir/core/lib_managed/scala_2.8.0/compile/*.jar; +do + if [ ${file##*/} != "sbt-launch.jar" ]; then + CLASSPATH=$CLASSPATH:$file + fi +done +if [ -z "$KAFKA_JMX_OPTS" ]; then + KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false " +fi +if [ -z "$KAFKA_OPTS" ]; then + KAFKA_OPTS="-Xmx512M -server -Dlog4j.configuration=file:$base_dir/config/log4j.properties" +fi +if [ $JMX_PORT ]; then + KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT " +fi +if [ -z "$JAVA_HOME" ]; then + JAVA="java" +else + JAVA="$JAVA_HOME/bin/java" +fi + +$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@ Property changes on: reg_test/suite_replication/bin/kafka-run-class.sh ___________________________________________________________________ Added: svn:executable + * Index: reg_test/suite_replication/testcase_1/testcase_1_properties.json =================================================================== --- reg_test/suite_replication/testcase_1/testcase_1_properties.json (revision 0) +++ reg_test/suite_replication/testcase_1/testcase_1_properties.json (revision 0) @@ -0,0 +1,63 @@ +{ + "description": "Basic test to produce and consume messages to a single topic partition. This test sends messages to n replicas and at the end verifies the log size and contents as well as use a consumer to verify no message loss. Optionally, the test bounces the leader periodically to introduce failures during the message replication.", + "testcase_args": { + "bounce_leader": "false", + "server_to_bounce": "source,mirror_maker,target", + "replica_factor": "3", + "partition": "2" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/jfung/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "brokerid": "1", + "log.file.size": "10240", + "log.dir": "/tmp/jfung/kafka_server_1_logs", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "brokerid": "2", + "log.file.size": "10240", + "log.dir": "/tmp/jfung/kafka_server_2_logs", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "brokerid": "3", + "log.file.size": "10240", + "log.dir": "/tmp/jfung/kafka_server_3_logs", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "topic": "test_1", + "threads": "5", + "compression-codec": "1", + "message-size": "500", + "message": "500", + "log_filename": "producer_performance.log", + "config_filename": "producer_performance.properties" + }, + { + "entity_id": "5", + "topic": "test_1", + "groupid": "mytestgroup", + "consumer-timeout-ms": "10000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer.properties" + } + ] +} Index: reg_test/suite_replication/testcase_2/testcase_2_properties.json =================================================================== --- reg_test/suite_replication/testcase_2/testcase_2_properties.json (revision 0) +++ reg_test/suite_replication/testcase_2/testcase_2_properties.json (revision 0) @@ -0,0 +1,63 @@ +{ + "description": "Similar to ReplicaBasicTest.testcase_1 but terminate the leader before producing messages", + "testcase_args": { + "bounce_leader": "true", + "server_to_bounce": "source,mirror_maker,target", + "replica_factor": "3", + "partition": "2" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/jfung/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "brokerid": "1", + "log.file.size": "10240", + "log.dir": "/tmp/jfung/kafka_server_1_logs", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "brokerid": "2", + "log.file.size": "10240", + "log.dir": "/tmp/jfung/kafka_server_2_logs", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "brokerid": "3", + "log.file.size": "10240", + "log.dir": "/tmp/jfung/kafka_server_3_logs", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "topic": "test_1", + "threads": "5", + "compression-codec": "1", + "message-size": "500", + "message": "500", + "log_filename": "producer_performance.log", + "config_filename": "producer_performance.properties" + }, + { + "entity_id": "5", + "topic": "test_1", + "groupid": "mytestgroup", + "consumer-timeout-ms": "10000", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer.properties" + } + ] +} Index: reg_test/__init__.py =================================================================== --- reg_test/__init__.py (revision 0) +++ reg_test/__init__.py (revision 0) @@ -0,0 +1 @@ + Index: reg_test/lib/reg_test_helper.py =================================================================== --- reg_test/lib/reg_test_helper.py (revision 0) +++ reg_test/lib/reg_test_helper.py (revision 0) @@ -0,0 +1,234 @@ +#!/usr/bin/env python + +# =================================== +# file: reg_test_helper.py +# =================================== + +import inspect +import json +import logging +import os +import signal +import subprocess +import sys + +from reg_test_env import RegTestEnv +dlm = RegTestEnv.delimiter + +class RtLogging(): + def __init__(self, className): + self.className = className + + def getRtLogger(self): + logger = logging.getLogger(self.className) + logger.setLevel(logging.INFO) + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + #formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s (%(name)s)') + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + ch.setFormatter(formatter) + logger.addHandler(ch) + return logger + + +def getDictFromListOfDictsByKeyVal(listOfDicts, lookupKey, lookupVal): + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'} + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'} + # + # Usage: + # + # 1. getDataFromListOfDictsByKeyVal(self.clusterConfigsList, "entity_id", "0", "role") + # returns: + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'} + # + # 2. getDataFromListOfDictsByKeyVal(self.clusterConfigsList, None, None, "role") + # returns: + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'} + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'} + + retList = [] + if ( lookupVal is None or lookupKey is None ): + for d in listOfDicts: + for k,v in d.items(): + if ( k == fieldToRetrieve ): # match with fieldToRetrieve ONLY + retList.append( d ) + else: + for d in listOfDicts: + for k,v in d.items(): + if ( k == lookupKey and v == lookupVal ): # match with lookupKey and lookupVal + retList.append( d ) + + return retList + + +def getDataFromListOfDictsByKeyVal(listOfDicts, lookupKey, lookupVal, fieldToRetrieve): + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'} + # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'} + # Usage: + # 1. getDataFromListOfDictsByKeyVal(self.clusterConfigsList, "entity_id", "0", "role") + # => returns ['zookeeper'] + # 2. getDataFromListOfDictsByKeyVal(self.clusterConfigsList, None, None, "role") + # => returns ['zookeeper', 'broker'] + + retList = [] + if ( lookupVal is None or lookupKey is None ): + for d in listOfDicts: + for k,v in d.items(): + if ( k == fieldToRetrieve ): # match with fieldToRetrieve ONLY + retList.append( d[fieldToRetrieve] ) + else: + for d in listOfDicts: + for k,v in d.items(): + if ( k == lookupKey and v == lookupVal ): # match with lookupKey and lookupVal + retList.append( d[fieldToRetrieve] ) + + return retList + + +def getDirPathsWithPrefix(fullPath, dirNamePrefix): + dirsList = [] + for dirName in os.listdir(fullPath): + if not os.path.isfile(dirName) and dirName.startswith(dirNamePrefix): + dirsList.append(os.path.abspath(fullPath + "/" + dirName)) + return dirsList + + +def getTestcasePropJsonPathname(tcPathname): + tcDirname = os.path.basename(tcPathname) + return tcPathname + "/" + tcDirname + "_properties.json" + + +def sys_call(cmd_str): + output = "" + p = subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in p.stdout.readlines(): + output += line + return output + + +def sys_call_return_subproc(cmd_str): + p = subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + return p + + +def async_sys_call(cmd_str): + subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + +def copy_file(source, destination): + inlines = open(source, "r") + fo = open(destination, "w") + for line in inlines: + print line, + fo.write(line) + fo.close() + + +def convert_keyval_to_cmd_args(cfgFilePathname): + cmdArg = "" + inlines = open(cfgFilePathname, "r").readlines() + for inline in inlines: + line = inline.rstrip() + tokens = line.split('=', 1) + + if (len(tokens) == 2): + cmdArg = cmdArg + " --" + tokens[0] + " " + tokens[1] + elif (len(tokens) == 1): + cmdArg = cmdArg + " --" + tokens[0] + else: + print "ERROR: unexpected arguments list", line + + return cmdArg + + +def getChildProcesses(pid): + pidStack = [] + currentPid = pid + parentPid = "" + pidStack.append(pid) + + while ( len(currentPid) > 0 ): + psCommand = subprocess.Popen("ps -o pid --ppid %s --noheaders" % currentPid, shell=True, stdout=subprocess.PIPE) + psOutput = psCommand.stdout.read() + #retcode = ps_command.wait() + #assert retcode == 0, "ps command returned %d" % retcode + outputLine = psOutput.rstrip('\n') + childPid = outputLine.lstrip() + + if ( len(childPid) > 0 ): + pidStack.append(childPid) + currentPid = childPid + else: + break + + return pidStack + + +def terminateProc(pidStack): + while ( len(pidStack) > 0 ): + pid = pidStack.pop() + try: + os.kill(int(pid), signal.SIGTERM) + except: + print "WARN - pid:",pid,"not found" + + +def stop_entity(pPid): + pidStack = getChildProcesses(pPid) + terminateProc(pidStack) + + +def stop_entity_group(entityPPidList, clCfgList, role): + entityIdList = getDataFromListOfDictsByKeyVal(clCfgList, "role", role, "entity_id") + for entityId in entityIdList: + ppid = entityPPidList[entityId] + pidStack = getChildProcesses(ppid) + terminateProc(pidStack) + + +# FIXME: +def get_json_data(infile, field): + json_file_str = open(infile, "r").read() + json_data = json.loads(json_file_str) + + if ( type(json_data) == list ): + print "list type" + elif ( type(json_data) == dict ): + print "dict type" + else: + print "other type" + + +def get_json_list_data(infile): + json_file_str = open(infile, "r").read() + json_data = json.loads(json_file_str) + data_list = [] + + for key,settings in json_data.items(): + if type(settings) == list: + for setting in settings: + if type(setting) == dict: + kv_dict = {} + for k,v in setting.items(): + kv_dict[k] = v + data_list.append(kv_dict) + + return data_list + +def get_json_dict_data(infile): + json_file_str = open(infile, "r").read() + json_data = json.loads(json_file_str) + data_dict = {} + + for key,val in json_data.items(): + if ( type(val) != list ): + data_dict[key] = val + + return data_dict + +def copy_remote_logs_to_local(clCfgList, tcCfgList, tcPathname, role): + print "FIXME: copy_remote_logs_to_local" + cmdStr = "scp " + host + ":" + tcPathname + "/logs/" + logFilename + " " + tcPathname + "/logs" + #sys_call(cmdStr) + + Property changes on: reg_test/lib/reg_test_helper.py ___________________________________________________________________ Added: svn:executable + * Index: reg_test/lib/__init__.py =================================================================== --- reg_test/lib/__init__.py (revision 0) +++ reg_test/lib/__init__.py (revision 0) @@ -0,0 +1 @@ + Index: reg_test/lib/li_reg_test_helper.py =================================================================== --- reg_test/lib/li_reg_test_helper.py (revision 0) +++ reg_test/lib/li_reg_test_helper.py (revision 0) @@ -0,0 +1,383 @@ +#!/usr/bin/env python + +# =================================== +# file: li_reg_test_helper.py +# =================================== + +import datetime +import inspect +import json +import logging +import os +import re +import subprocess +import sys +import time + +import reg_test_helper + +from reg_test_env import RegTestEnv +from reg_test_helper import RtLogging +from datetime import datetime +from time import mktime + + +def init_entity_props(clCfgList, tcCfgList, tcPathname): + entityPropsDict = {} + + # consumer config / log files location + consEntityIdList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + clCfgList, "role", "console_consumer", "entity_id") + consLogList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + tcCfgList, "entity_id", consEntityIdList[0], "log_filename") + consLogPathname = tcPathname + "/logs/" + consLogList[0] + consCfgList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + tcCfgList, "entity_id", consEntityIdList[0], "config_filename") + consCfgPathname = tcPathname + "/config/" + consCfgList[0] + + # producer config / log files location + prodEntityIdList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + clCfgList, "role", "producer_performance", "entity_id") + prodLogList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + tcCfgList, "entity_id", prodEntityIdList[0], "log_filename") + prodLogPathname = tcPathname + "/logs/" + prodLogList[0] + prodCfgList = reg_test_helper.getDataFromListOfDictsByKeyVal( \ + tcCfgList, "entity_id", prodEntityIdList[0], "config_filename") + prodCfgPathname = tcPathname + "/config/" + prodCfgList[0] + + entityPropsDict["consumer_log_pathname"] = consLogPathname + entityPropsDict["consumer_cfg_pathname"] = consCfgPathname + + entityPropsDict["producer_log_pathname"] = prodLogPathname + entityPropsDict["producer_cfg_pathname"] = prodCfgPathname + + return entityPropsDict + + +def get_message_id(logPathname): + logLines = open(logPathname, "r").readlines() + messageIdList = [] + + for line in logLines: + if not "MessageID" in line: + continue + else: + matchObj = re.match('.*MessageID:(.*?):', line) + messageIdList.append( matchObj.group(1) ) + + return messageIdList + + +def validate_data_matched(tcLogsDir, entityPropsDict, rtLogger): + consumerLogPathname = entityPropsDict["consumer_log_pathname"] + producerLogPathname = entityPropsDict["producer_log_pathname"] + msgIdMissingInConsumerLog = "msg_id_missing_in_consumer.log" + + prodMidList = get_message_id(producerLogPathname) + consMidList = get_message_id(consumerLogPathname) + prodMidSet = set(prodMidList) + consMidSet = set(consMidList) + + missingMidInConsumer = prodMidSet - consMidSet + + outfile = open(tcLogsDir + "/" + msgIdMissingInConsumerLog, "w") + for i in missingMidInConsumer: + outfile.write(i + "\n") + outfile.close() + + if ( len(missingMidInConsumer) == 0 ): + return True + + rtLogger.info("See " + tcLogsDir + "/" + msgIdMissingInConsumerLog + " for missing MessageID") + return False + + +def collect_logs_from_rmt_hosts(clCfgList, tcPathname, rtLogger): + for clCfg in clCfgList: + hostname = clCfg["hostname"] + role = clCfg["role"] + entityId = clCfg["entity_id"] + + rtLogger.info("collecting logs from host: " + hostname + " role: " +role) + + cmdStr = "scp " + hostname + ":" + tcPathname + "/logs/* " + tcPathname + "/logs" + #rtLogger.info("executing command: [" + cmdStr + "]") + #reg_test_helper.sys_call(cmdStr) + + +def cleanup_data_at_rmt_hosts(clCfgList, tcCfgList, rtLogger): + for clCfg in clCfgList: + hostname = clCfg["hostname"] + role = clCfg["role"] + entityId = clCfg["entity_id"] + + rtLogger.info("cleaning up log dir on host: " + hostname + " role: " +role) + + if ( role == 'zookeeper' ): + zkDataDirList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "dataDir") + cmdStr = "ssh " + hostname + " \"rm -r " + zkDataDirList[0] + "/*\"" + #rtLogger.info("executing command: [" + cmdStr + "]") + reg_test_helper.sys_call(cmdStr) + elif ( role == 'broker' ): + brokerDataDirList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "log.dir") + cmdStr = "ssh " + hostname + " \"rm -r " + brokerDataDirList[0] + "/*\"" + rtLogger.info("executing command: [" + cmdStr + "]") + reg_test_helper.sys_call(cmdStr) + + +def stop_rmt_java_proc(clusterCfgList, rtLogger): + for clusterCfg in clusterCfgList: + hostname = clusterCfg["hostname"] + userid = os.getlogin() + rtLogger.info("stopping entity id [" + clusterCfg["entity_id"] + "] by [" + userid + "] in host: " + hostname) + + cmdStr = "ps auxw | grep " + userid + " | grep java | grep -v grep | grep -i kafka " + \ + " | tr -s ' ' | cut -f2 -d ' ' | xargs kill -9" + + #rtLogger.info("executing : " + cmdStr) + subproc = reg_test_helper.sys_call_return_subproc(cmdStr) + #for line in subproc.stdout.readlines(): + # print line + +def copy_file_with_dict_values(srcFile, destFile, dictObj): + infile = open(srcFile, "r") + inlines = infile.readlines() + infile.close() + + # print " => testcase config line : ", tcCfg + outfile = open(destFile, 'w') + + for line in inlines: + for key in dictObj.keys(): + if (line.startswith(key + "=")): + #sys.stdout.write(" ## from : [" + line + "]") + line = key + "=" + dictObj[key] + "\n" + #sys.stdout.write(" ## to : [" + line + "]") + outfile.write(line) + outfile.close() + + +def generate_overriden_props_files(testsuitePathname, tcPathname, clusterConfigsList, tcConfigsList, rtLogger): + rtLogger.info("calling generate_properties_files") + + cfgTemplatePathname = os.path.abspath(testsuitePathname + "/config") + cfgDestPathname = os.path.abspath(tcPathname + "/config") + rtLogger.info("config template (source) pathname : " + cfgTemplatePathname) + rtLogger.info("testcase config (dest) pathname : " + cfgDestPathname) + + # loop through all zookeepers (if more than 1) to retrieve host and clientPort + # to construct a zk.connect str for broker in the form of: + # zk.connect=:,: + zkConnectStr = "" + zkDictList = reg_test_helper.getDictFromListOfDictsByKeyVal(clusterConfigsList, "role", "zookeeper") + for zkDict in zkDictList: + entityID = zkDict["entity_id"] + hostname = zkDict["hostname"] + clientPortList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcConfigsList, "entity_id", entityID, "clientPort") + clientPort = clientPortList[0] + + if ( zkConnectStr.__len__() == 0 ): + zkConnectStr = hostname + ":" + clientPort + else: + zkConnectStr = zkConnectStr + "," + hostname + ":" + clientPort + print + + # for each entity in the cluster config + for clusterCfg in clusterConfigsList: + cl_entity_id = clusterCfg["entity_id"] + + for tcCfg in tcConfigsList: + if (tcCfg["entity_id"] == cl_entity_id): + + # copy the associated .properties template, update values, write to testcase_/config + + if ( clusterCfg["role"] == "broker" ): + tcCfg["zk.connect"] = zkConnectStr + copy_file_with_dict_values(cfgTemplatePathname + "/server.properties", \ + cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg) + + elif ( clusterCfg["role"] == "zookeeper"): + copy_file_with_dict_values(cfgTemplatePathname + "/zookeeper.properties", \ + cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg) + + elif ( clusterCfg["role"] == "producer_performance"): + tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr + copy_file_with_dict_values(cfgTemplatePathname + "/producer_performance.properties", \ + cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg) + + elif ( clusterCfg["role"] == "console_consumer"): + tcCfg["zookeeper"] = zkConnectStr + copy_file_with_dict_values(cfgTemplatePathname + "/console_consumer.properties", \ + cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg) + else: + print " => ", tcCfg + print "UNHANDLED key" + +def get_isleader_log_line(): + print "FIXME: get_isleader_log_line" + cmdStr = "ssh " + host + " \"grep -i -h 'is leader' " + tcPathname + "/logs/" + brokerLogFile + " sort | tail -1" + subproc = reg_test_helper.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + print line + +def copy_remote_logs_to_local(): + print "FIXME: copy_remote_logs_to_local" + cmdStr = "" + +# return the entity_id of the leader and the unix ts +def get_leader_from_broker_logs(clCfgList, tcCfgList, logPathname, isLeaderLogPattern, rtLogger): + rePattern = "\[(.*?)\] INFO Broker (.*?) completed the leader state transition for topic (.*?) partition (.*?) \(.*" + leaderDict = {} + + brokerCfgList = reg_test_helper.getDictFromListOfDictsByKeyVal(clCfgList, "role", "broker") + for brokerCfg in brokerCfgList: + host = brokerCfg["hostname"] + entityId = brokerCfg["entity_id"] + bkrLogList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "log_filename") + cmdStr = "ssh " + host + " \"grep -i -h '" + isLeaderLogPattern + "' " + \ + logPathname + "/" + bkrLogList[0] + "\" 2> /dev/null" + + #rtLogger.info("executing cmd: [" + cmdStr + "]") + subproc = reg_test_helper.sys_call_return_subproc(cmdStr) + for line in subproc.stdout.readlines(): + #rtLogger.info("found the broker log line => " + line) + matchObj = re.match(rePattern, line) + datetimeStr = matchObj.group(1) + datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f") + unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond + #print "{0:.3f}".format(unixTs) + leaderDict["timestamp"] = unixTs + leaderDict["brokerid"] = matchObj.group(2) + leaderDict["topic"] = matchObj.group(3) + leaderDict["partition"] = matchObj.group(4) + + return leaderDict + +def start_producer_performance(clCfgList, tcCfgList, prodCfgPathname, kafkaRunClassBin, prodLogPathname, rtLogger): + prodPerfCfgList = reg_test_helper.getDictFromListOfDictsByKeyVal(clCfgList, "role", "producer_performance") + for prodPerfCfg in prodPerfCfgList: + host = prodPerfCfg["hostname"] + entityId = prodPerfCfg["entity_id"] + kafkaHomeList = \ + reg_test_helper.getDataFromListOfDictsByKeyVal(clCfgList, "entity_id", entityId, "kafka_home") + kafkaHome = kafkaHomeList[0] + tcProdPerfCfg = reg_test_helper.getDictFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId) + + rtLogger.info("starting producer preformance") + + commandArgs = reg_test_helper.convert_keyval_to_cmd_args(prodCfgPathname) + prodPerfCmdStr = \ + "ssh " + host + " \"" + kafkaRunClassBin + " kafka.perf.ProducerPerformance " + \ + commandArgs + " &> " + prodLogPathname + "\"" + + #rtLogger.info("executing command: [" + prodPerfCmdStr + "]") + reg_test_helper.sys_call(prodPerfCmdStr) + + +def start_console_consumer(clCfgList, tcCfgList, consCfgPathname, kafkaRunClassBin, consLogPathname, rtLogger): + consumerCfgList = reg_test_helper.getDictFromListOfDictsByKeyVal(clCfgList, "role", "console_consumer") + for consumerCfg in consumerCfgList: + host = consumerCfg["hostname"] + entityId = consumerCfg["entity_id"] + kafkaHomeList = \ + reg_test_helper.getDataFromListOfDictsByKeyVal(clCfgList, "entity_id", entityId, "kafka_home") + kafkaHome = kafkaHomeList[0] + tcConsumerCfg = reg_test_helper.getDictFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId) + + rtLogger.info("starting console consumer") + + commandArgs = reg_test_helper.convert_keyval_to_cmd_args(consCfgPathname) + consumerCmdStr = \ + "ssh " + host + " \"" + kafkaRunClassBin + " kafka.consumer.ConsoleConsumer " + \ + commandArgs + " &> " + consLogPathname + "\"" + + #rtLogger.info("executing command: [" + consumerCmdStr + "]") + reg_test_helper.sys_call(consumerCmdStr) + + +def start_broker(clCfgList, tcCfgList, brokerCfg, kafkaRunClassBin, tcPathname, entityPPidDict, rtLogger): + host = brokerCfg["hostname"] + entityId = brokerCfg["entity_id"] + brokeridList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "brokerid") + kafkaHomeList = reg_test_helper.getDataFromListOfDictsByKeyVal(clCfgList, "entity_id", entityId, "kafka_home") + + rtLogger.info("starting broker in host [" + host + "] with entity id [" + entityId + "]") + + # broker config / log files location + brokerLogList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "log_filename") + brokerLogPathname = tcPathname + "/logs/" + brokerLogList[0] + brokerCfgFileList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "config_filename") + brokerCfgPathname = tcPathname + "/config/" + brokerCfgFileList[0] + + brokerCmdStr = "ssh " + host + " '" + kafkaRunClassBin + " kafka.Kafka " + brokerCfgPathname + \ + " &> " + brokerLogPathname + " & echo pid:$! > " + tcPathname + "/logs/entity_" + entityId + "_pid'" + + #rtLogger.info("executing command: [" + brokerCmdStr + "]") + reg_test_helper.async_sys_call(brokerCmdStr) + time.sleep(2) + + pidCmdStr = "ssh " + host + " 'cat " + tcPathname + "/logs/entity_" + entityId + "_pid'" + #rtLogger.info("executing command: [" + pidCmdStr + "]") + subproc = reg_test_helper.sys_call_return_subproc(pidCmdStr) + + for line in subproc.stdout.readlines(): + if line.startswith("pid"): + line = line.rstrip('\n') + tokens = line.split(':') + entityPPidDict[entityId] = tokens[1] + + +def start_zookeeper(clCfgList, tcCfgList, zkCfg, zkConnectStr, kafkaRunClassBin, tcPathname, entityPPidDict, rtLogger): + host = zkCfg["hostname"] + entityId = zkCfg["entity_id"] + clientPortList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "clientPort") + kafkaHomeList = reg_test_helper.getDataFromListOfDictsByKeyVal(clCfgList, "entity_id", entityId, "kafka_home") + + rtLogger.info("starting zookeeper in host [" + host + "] on client port [" + clientPortList[0] + "]") + + # construct zk.connect str + if ( len(zkConnectStr) > 0 ): + zkConnectStr = zkConnectStr + "," + host + ":" + clientPortList[0] + else: + zkConnectStr = host + ":" + clientPortList[0] + + # zk config / log files location + zkLogList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "log_filename") + zkLogPathname = tcPathname + "/logs/" + zkLogList[0] + zkCfgFileList = reg_test_helper.getDataFromListOfDictsByKeyVal(tcCfgList, "entity_id", entityId, "config_filename") + zkCfgPathname = tcPathname + "/config/" + zkCfgFileList[0] + + zkCmdStr = "ssh " + host + " '" + kafkaHomeList[0] + "/bin/zookeeper-server-start.sh " + \ + zkCfgPathname + " &> " + zkLogPathname + " & echo pid:$! > " + tcPathname + \ + "/logs/entity_" + entityId + "_pid'" + #rtLogger.info("executing command: [" + zkCmdStr + "]") + reg_test_helper.async_sys_call(zkCmdStr) + time.sleep(2) + + pidCmdStr = "ssh " + host + " 'cat " + tcPathname + "/logs/entity_" + entityId + "_pid'" + #rtLogger.info("executing command: [" + pidCmdStr + "]") + subproc = reg_test_helper.sys_call_return_subproc(pidCmdStr) + + for line in subproc.stdout.readlines(): + if line.startswith("pid"): + line = line.rstrip('\n') + tokens = line.split(':') + entityPPidDict[entityId] = tokens[1] + + return zkConnectStr + + +def create_topic(clCfgList, tcCfgList, zkConnectStr, tcPathname, rtLogger): + prodPerfCfgList = reg_test_helper.getDictFromListOfDictsByKeyVal(clCfgList, "role", "producer_performance") + prodPerfCfgDict = reg_test_helper.getDictFromListOfDictsByKeyVal(tcCfgList, "entity_id", prodPerfCfgList[0]["entity_id"]) + prodTopicList = prodPerfCfgDict[0]["topic"].split(',') + createTopicBin = RegTestEnv.regTestBaseDir + "/../bin/kafka-create-topic.sh" + + for topic in prodTopicList: + rtLogger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]") + cmdStr = "ssh localhost \"" + createTopicBin + " --topic " + topic + " --zookeeper " + zkConnectStr + \ + " --replica 3 --partition 2 &> " + tcPathname + "/logs/create_topic.log" + "\"" + #rtLogger.info("executing command: [" + cmdStr + "]") + subproc = reg_test_helper.sys_call_return_subproc(cmdStr) + + Property changes on: reg_test/lib/li_reg_test_helper.py ___________________________________________________________________ Added: svn:executable + * Index: reg_test/bin/__init__.py =================================================================== --- reg_test/bin/__init__.py (revision 0) +++ reg_test/bin/__init__.py (revision 0) @@ -0,0 +1 @@ + Index: reg_test/bin/reg_test_env.py =================================================================== --- reg_test/bin/reg_test_env.py (revision 0) +++ reg_test/bin/reg_test_env.py (revision 0) @@ -0,0 +1,42 @@ +#!/usr/bin/env python + +# file: reg_test_env.py + +"""Define regression test framework environments""" + +import inspect +import json +import os +import subprocess +import sys + +class RegTestEnv(): + """Define a class""" + + cwdFullPath = os.getcwd() + thisScriptFullPathname = os.path.realpath(__file__) + thisScriptFullPath = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]))) + + regTestBaseDir = thisScriptFullPath + "/.." + regTestBinDir = thisScriptFullPath + "/../bin" + regTestLibDir = thisScriptFullPath + "/../lib" + + delimiter = "~" + testSuitePrefix = "suite_" + testCasePrefix = "testcase_" + testModuleExt = ".py" + clusterCfgFilename = "cluster_config.json" + + def __init__(self): + "Create an object with this regression test session environment" + self.regTestBaseDir = os.getcwd() + + def __str__(self): + return '(regTestBaseDir: ' + RegTestEnv.regTestBaseDir + \ + ', regTestBinDir: ' + RegTestEnv.regTestBinDir + \ + ', regTestLibDir: ' + RegTestEnv.regTestLibDir + ')' + + def printEnv(self): + print "regTestBaseDir : ", RegTestEnv.regTestBaseDir + print "regTestBinDir : ", RegTestEnv.regTestBinDir + print "regTestLibDir : ", RegTestEnv.regTestLibDir Property changes on: reg_test/bin/reg_test_env.py ___________________________________________________________________ Added: svn:executable + * Index: reg_test/bin/reg_test_driver.py =================================================================== --- reg_test/bin/reg_test_driver.py (revision 0) +++ reg_test/bin/reg_test_driver.py (revision 0) @@ -0,0 +1,104 @@ +#!/usr/bin/env python + +# =================================== +# file: run_reg_test.py +# =================================== + +import logging +import os +import sys + +from reg_test_env import RegTestEnv +sys.path.append(RegTestEnv.regTestLibDir) +import reg_test_helper +from reg_test_helper import RtLogging + +rtLogging = RtLogging("RegTest") +rtLogger = rtLogging.getRtLogger() + +class RegTest(RegTestEnv): + def __init__(self, testSuiteName, testModuleName, testSuiteClassName, clusterConfigsList): + self.name = testSuiteName + sys.path.append(RegTestEnv.regTestBaseDir + "/" + testSuiteName + "/bin") + + # dynamically loading a module and starting the test class + mod = __import__(testModuleName) + theClass = getattr(mod, testSuiteClassName) + instance = theClass(clusterConfigsList) + instance.runTest() + +def main(): + suiteClassDictList = [] + clusterPropJsonPathname = os.path.abspath(RegTestEnv.regTestBaseDir + "/" + RegTestEnv.clusterCfgFilename) + clusterConfigsList = reg_test_helper.get_json_list_data(clusterPropJsonPathname) + print + rtLogger.info("===============================================================") + rtLogger.info("#### SYSTEM REGRESSION TEST FRAMEWORK IN PYTHON ####") + rtLogger.info("===============================================================") + print + rtLogger.info("looking for test modules ...") + + # find all test suites in Reg Test Base Dir + for suiteName in os.listdir(RegTestEnv.regTestBaseDir): + + # make sure this is a valid testsuite directory + if os.path.isdir(suiteName) and suiteName.startswith(RegTestEnv.testSuitePrefix): + + # found a test suite + rtLogger.info("found a test suite : " + suiteName) + + # put all test modules and corresponding test class name into a list + testModBaseDir = RegTestEnv.regTestBaseDir + "/" + suiteName + "/bin" + + # go through all test module file + for moduleFileName in os.listdir(testModBaseDir): + + # make sure it is a valid test module + if moduleFileName.endswith(RegTestEnv.testModuleExt) and not moduleFileName.startswith("__"): + + # found a test module file + rtLogger.info("found a test module file : " + moduleFileName) + + testModPathName = testModBaseDir + "/" + moduleFileName + testModClassName = reg_test_helper.sys_call("grep ^class " + testModPathName + \ + " | sed 's/^class //g' | sed 's/():.*//g'") + testModClassName = testModClassName.rstrip("\n") + + # collect the test suite class data + suiteClassDict = {} + suiteClassDict["suite"] = suiteName + extLenToRemove = RegTestEnv.testModuleExt.__len__() * -1 + suiteClassDict["module"] = moduleFileName[:extLenToRemove] + suiteClassDict["class"] = testModClassName + suiteClassDictList.append(suiteClassDict) + print + + # FIXME: + # copy new testing build to remote hosts + + # loop through suiteClassDictList and start the test class one by one + for suiteClassDict in suiteClassDictList: + + suiteName = suiteClassDict["suite"] + modName = suiteClassDict["module"] + className = suiteClassDict["class"] + + rtLogger.info("===============================================================") + rtLogger.info("Running Test for : ") + rtLogger.info(" suite : " + suiteName) + rtLogger.info(" module : " + modName) + rtLogger.info(" class : " + className) + rtLogger.info("===============================================================") + + # launch the test module class + rt = RegTest(suiteName, modName, className, clusterConfigsList) + print + + print + +# ========================= +# main entry point +# ========================= +main() + + Property changes on: reg_test/bin/reg_test_driver.py ___________________________________________________________________ Added: svn:executable + * Index: reg_test/cluster_config.json =================================================================== --- reg_test/cluster_config.json (revision 0) +++ reg_test/cluster_config.json (revision 0) @@ -0,0 +1,46 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "kafka_home": "/home/jfung/kafka_0.8", + "java_home": "/export/apps/jdk/JDK-1_6_0_27" + }, + { + "entity_id": "1", + "hostname": "localhost", + "role": "broker", + "kafka_home": "/home/jfung/kafka_0.8", + "java_home": "/export/apps/jdk/JDK-1_6_0_27" + }, + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "kafka_home": "/home/jfung/kafka_0.8", + "java_home": "/export/apps/jdk/JDK-1_6_0_27" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "broker", + "kafka_home": "/home/jfung/kafka_0.8", + "java_home": "/export/apps/jdk/JDK-1_6_0_27" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "producer_performance", + "kafka_home": "/home/jfung/kafka_0.8", + "java_home": "/export/apps/jdk/JDK-1_6_0_27" + }, + { + "entity_id": "5", + "hostname": "localhost", + "role": "console_consumer", + "kafka_home": "/home/jfung/kafka_0.8", + "java_home": "/export/apps/jdk/JDK-1_6_0_27" + } + ] +}