diff --git a/system_test/README.txt b/system_test/README.txt index 87937ec..f9972d1 100644 --- a/system_test/README.txt +++ b/system_test/README.txt @@ -54,9 +54,8 @@ The framework has the following levels: 1. Update system_test/cluster_config.json for "kafka_home" & "java_home" specific to your environment 2. Edit system_test/replication_testsuite/testcase_1/testcase_1_properties.json and update "broker-list" to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.) 3. To run the test, go to /system_test and run the following command: - $ python -B system_test_runner.py - 4. To turn on debugging, update system_test/system_test_runner.py and uncomment the following line: - namedLogger.setLevel(logging.DEBUG) + $ python -u -B system_test_runner.py 2>&1 | tee system_test_output.log + 4. To turn on debugging, update system_test/logging.conf by changing the level in handlers session from INFO to DEBUG. # ========================== # Adding Test Case diff --git a/system_test/migration_tool_testsuite/0.7/config/log4j.properties b/system_test/migration_tool_testsuite/0.7/config/log4j.properties new file mode 100644 index 0000000..baa698b --- /dev/null +++ b/system_test/migration_tool_testsuite/0.7/config/log4j.properties @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kafka.logs.dir=logs + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log +log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log +log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log +log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.cleanerAppender.File=log-cleaner.log +log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log +log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +# Turn on all our debugging info +#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender +#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender +#log4j.logger.kafka.perf=DEBUG, kafkaAppender +#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG +log4j.logger.kafka=INFO, kafkaAppender + +log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender +log4j.additivity.kafka.network.RequestChannel$=false + +#log4j.logger.kafka.network.Processor=TRACE, requestAppender +#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender +#log4j.additivity.kafka.server.KafkaApis=false +log4j.logger.kafka.request.logger=WARN, requestAppender +log4j.additivity.kafka.request.logger=false + +log4j.logger.kafka.controller=TRACE, controllerAppender +log4j.additivity.kafka.controller=false + +log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender +log4j.additivity.kafka.log.LogCleaner=false + +log4j.logger.state.change.logger=TRACE, stateChangeAppender +log4j.additivity.state.change.logger=false diff --git a/system_test/migration_tool_testsuite/config/migration_producer.properties b/system_test/migration_tool_testsuite/config/migration_producer.properties index 17b5928..7a2265a 100644 --- a/system_test/migration_tool_testsuite/config/migration_producer.properties +++ b/system_test/migration_tool_testsuite/config/migration_producer.properties @@ -37,6 +37,8 @@ metadata.broker.list=localhost:9094,localhost:9095,localhost:9096 # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync +retry.backoff.ms=500 + # specify the compression codec for all data generated: 0: no compression, 1: gzip compression.codec=0 diff --git a/system_test/migration_tool_testsuite/migration_tool_test.py b/system_test/migration_tool_testsuite/migration_tool_test.py index 2386a58..9594835 100644 --- a/system_test/migration_tool_testsuite/migration_tool_test.py +++ b/system_test/migration_tool_testsuite/migration_tool_test.py @@ -170,11 +170,6 @@ class MigrationToolTest(ReplicationUtils, SetupUtils): self.anonLogger.info("sleeping for 5s") time.sleep(5) - self.log_message("creating topics") - kafka_system_test_utils.create_topic_for_producer_performance(self.systemTestEnv, self.testcaseEnv) - self.anonLogger.info("sleeping for 5s") - time.sleep(5) - # ============================================= # starting producer # ============================================= diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 35f2d1b..d8eb56f 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -734,7 +734,8 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): elif role == "broker": cmdList = ["ssh " + hostname, "'JAVA_HOME=" + javaHome, - "JMX_PORT=" + jmxPort, + "JMX_PORT=" + jmxPort, + "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/log4j.properties" % kafkaHome, kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka", configPathName + "/" + configFile + " >> ", logPathName + "/" + logFile + " & echo pid:$! > ", @@ -975,10 +976,12 @@ def start_producer_performance(systemTestEnv, testcaseEnv, kafka07Client): role = producerConfig["role"] thread.start_new_thread(start_producer_in_thread, (testcaseEnv, entityConfigList, producerConfig, kafka07Client)) + logger.debug("calling testcaseEnv.lock.acquire()", extra=d) testcaseEnv.lock.acquire() testcaseEnv.numProducerThreadsRunning += 1 logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d) time.sleep(1) + logger.debug("calling testcaseEnv.lock.release()", extra=d) testcaseEnv.lock.release() def generate_topics_string(topicPrefix, numOfTopics): @@ -1119,7 +1122,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk "--metrics-dir " + metricsDir, boolArgumentsStr, " >> " + producerLogPathName, - " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"] + " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"] if kafka07Client: cmdList[:] = [] @@ -1150,17 +1153,17 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk "--message-size " + messageSize, "--vary-message-size --async", " >> " + producerLogPathName, - " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"] + " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"] cmdStr = " ".join(cmdList) logger.debug("executing command: [" + cmdStr + "]", extra=d) subproc = system_test_utils.sys_call_return_subproc(cmdStr) - for line in subproc.stdout.readlines(): - pass # dummy loop to wait until producer is completed + wait_for_producer_to_exit(host, producerLogPath + "/entity_" + entityId + "_pid") else: testcaseEnv.numProducerThreadsRunning -= 1 logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d) + logger.debug("calling testcaseEnv.lock.release()", extra=d) testcaseEnv.lock.release() break @@ -1172,20 +1175,40 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk # wait until other producer threads also stops and # let the main testcase know all producers have stopped while 1: + logger.debug("calling testcaseEnv.lock.acquire()", extra=d) testcaseEnv.lock.acquire() time.sleep(1) if testcaseEnv.numProducerThreadsRunning == 0: testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = True + logger.debug("calling testcaseEnv.lock.release()", extra=d) testcaseEnv.lock.release() break else: logger.debug("waiting for TRUE of testcaseEnv.userDefinedEnvVarDict['backgroundProducerStopped']", extra=d) + logger.debug("calling testcaseEnv.lock.release()", extra=d) testcaseEnv.lock.release() time.sleep(1) # finally remove itself from the tracking pids del testcaseEnv.producerHostParentPidDict[entityId] +def wait_for_producer_to_exit(hostname, pidFilePath): + logger.debug("waiting for producer to finish", extra=d) + + cmdStr = "ssh " + hostname + " 'cat " + pidFilePath + "' 2> /dev/null" + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + line = subproc.stdout.readline() + pid = line.rstrip('\n') + + while 1: + cmdStr = "ssh " + hostname + " 'ps -p " + pid + " -o comm=' 2> /dev/null" + subproc = system_test_utils.sys_call_return_subproc(cmdStr) + line = subproc.stdout.readline() + procName = line.rstrip('\n') + if len(procName) == 0: + break; + time.sleep(0.05) + def stop_remote_entity(systemTestEnv, entityId, parentPid, signalType="SIGTERM"): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList @@ -1617,8 +1640,10 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv): # ============================================= # tell producer to stop # ============================================= + logger.debug("calling testcaseEnv.lock.acquire()", extra=d) testcaseEnv.lock.acquire() testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True + logger.debug("calling testcaseEnv.lock.release()", extra=d) testcaseEnv.lock.release() # ============================================= @@ -1626,13 +1651,16 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv): # "backgroundProducerStopped" to be "True" # ============================================= while 1: + logger.debug("calling testcaseEnv.lock.acquire()", extra=d) testcaseEnv.lock.acquire() logger.info("status of backgroundProducerStopped : [" + \ str(testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=d) if testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]: + logger.debug("calling testcaseEnv.lock.release()", extra=d) testcaseEnv.lock.release() logger.info("all producer threads completed", extra=d) break + logger.debug("calling testcaseEnv.lock.release()", extra=d) testcaseEnv.lock.release() testcaseEnv.producerHostParentPidDict.clear()