Index: system_test/replication_testsuite/replica_basic_test.py =================================================================== --- system_test/replication_testsuite/replica_basic_test.py (revision 1403363) +++ system_test/replication_testsuite/replica_basic_test.py (working copy) @@ -137,10 +137,6 @@ # TestcaseEnv - initialize producer & consumer config / log file pathnames kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv) - - # clean up data directories specified in zookeeper.properties and kafka_server_.properties - kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv) - # generate remote hosts log/config dirs if not exist kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv) @@ -163,178 +159,17 @@ self.log_message("starting brokers") kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv) - self.anonLogger.info("sleeping for 5s") - time.sleep(5) - self.log_message("creating topics") - kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv) - self.anonLogger.info("sleeping for 5s") - time.sleep(5) - - # ============================================= - # starting producer - # ============================================= - self.log_message("starting producer in the background") - kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv, False) - msgProducingFreeTimeSec = self.testcaseEnv.testcaseArgumentsDict["message_producing_free_time_sec"] - self.anonLogger.info("sleeping for " + msgProducingFreeTimeSec + " sec to produce some messages") - time.sleep(int(msgProducingFreeTimeSec)) + print + print "=====================================================" + print "*** Sleeping for 30 min ..." + print "You may now run : /run_simple_consumer.sh" + print "=====================================================" + print - # ============================================= - # A while-loop to bounce leader as specified - # by "num_iterations" in testcase_n_properties.json - # ============================================= - i = 1 - numIterations = int(self.testcaseEnv.testcaseArgumentsDict["num_iteration"]) - brokerType = self.testcaseEnv.testcaseArgumentsDict["broker_type"] - bounceBrokerFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_broker"] - while i <= numIterations: - self.log_message("Iteration " + str(i) + " of " + str(numIterations)) + time.sleep(1800) - if brokerType == "leader" or brokerType == "follower": - self.log_message("looking up leader") - leaderDict = kafka_system_test_utils.get_leader_elected_log_line( - self.systemTestEnv, self.testcaseEnv, self.leaderAttributesDict) - - # ========================== - # leaderDict looks like this: - # ========================== - #{'entity_id': u'3', - # 'partition': '0', - # 'timestamp': 1345050255.8280001, - # 'hostname': u'localhost', - # 'topic': 'test_1', - # 'brokerid': '3'} - - if brokerType == "leader": - # validate to see if leader election is successful - self.log_message("validating leader election") - result = kafka_system_test_utils.validate_leader_election_successful( \ - self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict) - - # trigger leader re-election by stopping leader to get re-election latency - self.log_message("bounce_broker flag : " + bounceBrokerFlag) - if bounceBrokerFlag.lower() == "true": - reelectionLatency = kafka_system_test_utils.get_reelection_latency( - self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict) - latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"] - self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms" - self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency * 1000)) - - time.sleep(1) - - # starting previously terminated broker - self.log_message("starting the previously terminated broker") - stoppedLeaderEntityId = leaderDict["entity_id"] - kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId) - else: - try: - # GC Pause simulation - hostname = leaderDict["hostname"] - pauseTime = self.testcaseEnv.testcaseArgumentsDict["pause_time_in_seconds"] - parentPid = self.testcaseEnv.entityBrokerParentPidDict[leaderDict["entity_id"]] - pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid) - system_test_utils.simulate_garbage_collection_pause_in_remote_process(hostname, pidStack, pauseTime) - except: - pass - - else: # follower - # a list of all brokers - brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( - self.systemTestEnv.clusterEntityConfigDictList, "role", "broker", "entity_id") - - # we pick the follower from the first broker which is not the leader - firstFollowerEntityId = None - for brokerEntityId in brokerEntityIdList: - if brokerEntityId != leaderDict["entity_id"]: - firstFollowerEntityId = brokerEntityId - break - - # stopping Follower - self.log_message("bounce_broker flag : " + bounceBrokerFlag) - if bounceBrokerFlag.lower() == "true": - self.log_message("stopping follower with entity id: " + firstFollowerEntityId) - kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, firstFollowerEntityId, - self.testcaseEnv.entityBrokerParentPidDict[firstFollowerEntityId]) - - time.sleep(1) - - # starting previously terminated broker - self.log_message("starting the previously terminated broker") - stoppedBrokerEntityId = firstFollowerEntityId - kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, stoppedBrokerEntityId) - - elif brokerType == "controller": - self.log_message("looking up controller") - controllerDict = kafka_system_test_utils.get_controller_attributes(self.systemTestEnv, self.testcaseEnv) - - # ========================== - # controllerDict looks like this: - # ========================== - #{'entity_id': u'3', - # 'timestamp': 1345050255.8280001, - # 'hostname': u'localhost', - # 'brokerid': '3'} - - # stopping Controller - self.log_message("bounce_broker flag : " + bounceBrokerFlag) - if bounceBrokerFlag.lower() == "true": - self.log_message("stopping controller : " + controllerDict["brokerid"]) - kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, controllerDict["entity_id"], - self.testcaseEnv.entityBrokerParentPidDict[controllerDict["entity_id"]]) - - time.sleep(1) - - # starting previously terminated broker - self.log_message("starting the previously terminated broker") - stoppedBrokerEntityId = controllerDict["entity_id"] - kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, stoppedBrokerEntityId) - - self.anonLogger.info("sleeping for 15s") - time.sleep(15) - i += 1 - # while loop - - self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = None - try: - self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = \ - min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"]) - except: - pass - - self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = None - try: - self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = \ - max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"]) - except: - pass - # ============================================= - # tell producer to stop - # ============================================= - self.testcaseEnv.lock.acquire() - self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True - time.sleep(1) - self.testcaseEnv.lock.release() - time.sleep(1) - - # ============================================= - # wait for producer thread's update of - # "backgroundProducerStopped" to be "True" - # ============================================= - while 1: - self.testcaseEnv.lock.acquire() - self.logger.info("status of backgroundProducerStopped : [" + \ - str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d) - if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]: - time.sleep(1) - self.logger.info("all producer threads completed", extra=self.d) - break - time.sleep(1) - self.testcaseEnv.lock.release() - time.sleep(2) - - # ============================================= # starting consumer # ============================================= self.log_message("starting debug consumers in the background") Index: system_test/testcase_to_run.json =================================================================== --- system_test/testcase_to_run.json (revision 1403363) +++ system_test/testcase_to_run.json (working copy) @@ -1,6 +1,5 @@ { "ReplicaBasicTest" : [ - "testcase_0001", - "testcase_1" + "testcase_0108" ] } Index: validate_data_and_log_segment.sh =================================================================== --- validate_data_and_log_segment.sh (revision 0) +++ validate_data_and_log_segment.sh (revision 0) @@ -0,0 +1,102 @@ +#!/bin/bash + +# clean up old merged data +rm -rf /tmp/broker_*_partion_*_merged + +sleep 1 + +# create merged data directories +mkdir /tmp/broker_1_partion_0_merged +mkdir /tmp/broker_1_partion_1_merged +mkdir /tmp/broker_1_partion_2_merged + +mkdir /tmp/broker_2_partion_0_merged +mkdir /tmp/broker_2_partion_1_merged +mkdir /tmp/broker_2_partion_2_merged + +mkdir /tmp/broker_3_partion_0_merged +mkdir /tmp/broker_3_partion_1_merged +mkdir /tmp/broker_3_partion_2_merged + + +# read data for all 3 partitions in replica 1 +bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9091,localhost:9092,localhost:9093 --topic test_1 --partition 0 --replica 1 --no-wait-at-logend > replica_data_r1_p0.log +bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9091,localhost:9092,localhost:9093 --topic test_1 --partition 1 --replica 1 --no-wait-at-logend > replica_data_r1_p1.log +bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9091,localhost:9092,localhost:9093 --topic test_1 --partition 2 --replica 1 --no-wait-at-logend > replica_data_r1_p2.log + +# read data for all 3 partitions in replica 2 +bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9091,localhost:9092,localhost:9093 --topic test_1 --partition 0 --replica 2 --no-wait-at-logend > replica_data_r2_p0.log +bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9091,localhost:9092,localhost:9093 --topic test_1 --partition 1 --replica 2 --no-wait-at-logend > replica_data_r2_p1.log +bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9091,localhost:9092,localhost:9093 --topic test_1 --partition 2 --replica 2 --no-wait-at-logend > replica_data_r2_p2.log + +# read data for all 3 partitions in replica 3 +bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9091,localhost:9092,localhost:9093 --topic test_1 --partition 0 --replica 3 --no-wait-at-logend > replica_data_r3_p0.log +bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9091,localhost:9092,localhost:9093 --topic test_1 --partition 1 --replica 3 --no-wait-at-logend > replica_data_r3_p1.log +bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9091,localhost:9092,localhost:9093 --topic test_1 --partition 2 --replica 3 --no-wait-at-logend > replica_data_r3_p2.log +echo + +echo "Validated by SimpleConsumerShell : " + +echo "replica 1 message count:" +grep MessageID replica_data_r1_p0.log | sed 's/.*MessageID://' | sed 's/:.*//' | sort -u | wc -l +grep MessageID replica_data_r1_p1.log | sed 's/.*MessageID://' | sed 's/:.*//' | sort -u | wc -l +grep MessageID replica_data_r1_p2.log | sed 's/.*MessageID://' | sed 's/:.*//' | sort -u | wc -l +echo + +echo "replica 2 message count:" +grep MessageID replica_data_r2_p0.log | sed 's/.*MessageID://' | sed 's/:.*//' | sort -u | wc -l +grep MessageID replica_data_r2_p1.log | sed 's/.*MessageID://' | sed 's/:.*//' | sort -u | wc -l +grep MessageID replica_data_r2_p2.log | sed 's/.*MessageID://' | sed 's/:.*//' | sort -u | wc -l +echo + +echo "replica 3 message count:" +grep MessageID replica_data_r3_p0.log | sed 's/.*MessageID://' | sed 's/:.*//' | sort -u | wc -l +grep MessageID replica_data_r3_p1.log | sed 's/.*MessageID://' | sed 's/:.*//' | sort -u | wc -l +grep MessageID replica_data_r3_p2.log | sed 's/.*MessageID://' | sed 's/:.*//' | sort -u | wc -l + +echo + +# merge log segment files for broker 1: +for i in `find /tmp/kafka_server_1_logs/test_1-0/ -name '0*.log' | sort`; do cat $i >> /tmp/broker_1_partion_0_merged/00000000000000000000.log; done; +for i in `find /tmp/kafka_server_1_logs/test_1-1/ -name '0*.log' | sort`; do cat $i >> /tmp/broker_1_partion_1_merged/00000000000000000000.log; done; +for i in `find /tmp/kafka_server_1_logs/test_1-2/ -name '0*.log' | sort`; do cat $i >> /tmp/broker_1_partion_2_merged/00000000000000000000.log; done; + +# merge log segment files for broker 2: +for i in `find /tmp/kafka_server_2_logs/test_1-0/ -name '0*.log' | sort`; do cat $i >> /tmp/broker_2_partion_0_merged/00000000000000000000.log; done; +for i in `find /tmp/kafka_server_2_logs/test_1-1/ -name '0*.log' | sort`; do cat $i >> /tmp/broker_2_partion_1_merged/00000000000000000000.log; done; +for i in `find /tmp/kafka_server_2_logs/test_1-2/ -name '0*.log' | sort`; do cat $i >> /tmp/broker_2_partion_2_merged/00000000000000000000.log; done; + +# merge log segment files for broker 3: +for i in `find /tmp/kafka_server_3_logs/test_1-0/ -name '0*.log' | sort`; do cat $i >> /tmp/broker_3_partion_0_merged/00000000000000000000.log; done; +for i in `find /tmp/kafka_server_3_logs/test_1-1/ -name '0*.log' | sort`; do cat $i >> /tmp/broker_3_partion_1_merged/00000000000000000000.log; done; +for i in `find /tmp/kafka_server_3_logs/test_1-2/ -name '0*.log' | sort`; do cat $i >> /tmp/broker_3_partion_2_merged/00000000000000000000.log; done; + +echo "Validated by DumpLogSegments : " + +echo -n "broker 1 partition 0 messages count : " +bin/kafka-run-class.sh kafka.tools.DumpLogSegments /tmp/broker_1_partion_0_merged/00000000000000000000.log | grep "crc: " | wc -l +echo -n "broker 1 partition 1 messages count : " +bin/kafka-run-class.sh kafka.tools.DumpLogSegments /tmp/broker_1_partion_1_merged/00000000000000000000.log | grep "crc: " | wc -l +echo -n "broker 1 partition 2 messages count : " +bin/kafka-run-class.sh kafka.tools.DumpLogSegments /tmp/broker_1_partion_2_merged/00000000000000000000.log | grep "crc: " | wc -l +echo + +echo -n "broker 2 partition 0 messages count : " +bin/kafka-run-class.sh kafka.tools.DumpLogSegments /tmp/broker_2_partion_0_merged/00000000000000000000.log | grep "crc: " | wc -l +echo -n "broker 2 partition 1 messages count : " +bin/kafka-run-class.sh kafka.tools.DumpLogSegments /tmp/broker_2_partion_1_merged/00000000000000000000.log | grep "crc: " | wc -l +echo -n "broker 2 partition 2 messages count : " +bin/kafka-run-class.sh kafka.tools.DumpLogSegments /tmp/broker_2_partion_2_merged/00000000000000000000.log | grep "crc: " | wc -l +echo + +echo -n "broker 3 partition 0 messages count : " +bin/kafka-run-class.sh kafka.tools.DumpLogSegments /tmp/broker_3_partion_0_merged/00000000000000000000.log | grep "crc: " | wc -l +echo -n "broker 3 partition 1 messages count : " +bin/kafka-run-class.sh kafka.tools.DumpLogSegments /tmp/broker_3_partion_1_merged/00000000000000000000.log | grep "crc: " | wc -l +echo -n "broker 3 partition 2 messages count : " +bin/kafka-run-class.sh kafka.tools.DumpLogSegments /tmp/broker_3_partion_2_merged/00000000000000000000.log | grep "crc: " | wc -l +echo + + + + Property changes on: validate_data_and_log_segment.sh ___________________________________________________________________ Added: svn:executable + *