Index: system_test/replication_testsuite/replica_basic_test.py =================================================================== --- system_test/replication_testsuite/replica_basic_test.py (revision 1398132) +++ system_test/replication_testsuite/replica_basic_test.py (working copy) @@ -308,6 +308,7 @@ self.log_message("validating data matched") kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv) kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) + kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv, "target") # ============================================= # draw graphs Index: system_test/replication_testsuite/testcase_2/testcase_2_properties.json =================================================================== --- system_test/replication_testsuite/testcase_2/testcase_2_properties.json (revision 0) +++ system_test/replication_testsuite/testcase_2/testcase_2_properties.json (revision 0) @@ -0,0 +1,79 @@ +{ + "description": {"01":"Multi Leader Failures in Replication : 1. acks => 1", + "02":"Produce and consume messages to a single topic - three partition.", + "03":"This test sends messages to 3 replicas", + "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)", + "05":"Restart the terminated broker", + "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully", + "07":"At the end it verifies the log size and contents", + "08":"Use a consumer to verify no message loss.", + "09":"Producer dimensions : mode:sync, acks:1, comp:0", + "10":"Log segment size : 102400" + }, + "testcase_args": { + "bounce_leader": "true", + "replica_factor": "3", + "num_partition": "1", + "num_iteration": "1", + "sleep_seconds_between_producer_calls": "1", + "message_producing_free_time_sec": "15", + "num_messages_to_produce_per_producer_call": "50" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "brokerid": "1", + "log.file.size": "102400", + "log.dir": "/tmp/kafka_server_1_logs", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "brokerid": "2", + "log.file.size": "102400", + "log.dir": "/tmp/kafka_server_2_logs", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "brokerid": "3", + "log.file.size": "102400", + "log.dir": "/tmp/kafka_server_3_logs", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "topic": "test_1", + "threads": "5", + "compression-codec": "0", + "message-size": "500", + "message": "150", + "request-num-acks": "1", + "async":"false", + "log_filename": "producer_performance.log", + "config_filename": "producer_performance.properties" + }, + { + "entity_id": "5", + "topic": "test_1", + "groupid": "mytestgroup", + "consumer-timeout-ms": "10000", + "zookeeper": "localhost:2188", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer.properties" + } + ] +} Index: system_test/utils/kafka_system_test_utils.py =================================================================== --- system_test/utils/kafka_system_test_utils.py (revision 1398132) +++ system_test/utils/kafka_system_test_utils.py (working copy) @@ -22,6 +22,7 @@ import datetime import getpass +import hashlib import inspect import json import logging @@ -80,6 +81,8 @@ # type is either "metrics" or "dashboards" or "default" if type == "metrics": return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId + "/metrics" + elif type == "log_segments" : + return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId + "/log_segments" elif type == "default" : return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId elif type == "dashboards": @@ -110,19 +113,19 @@ metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics") if not os.path.exists(metricsPathName) : os.makedirs(metricsPathName) - + # create the role directory under dashboards dashboardsRoleDir = dashboardsPathName + "/" + role if not os.path.exists(dashboardsRoleDir) : os.makedirs(dashboardsRoleDir) - def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): anonLogger.info("================================================") anonLogger.info("collecting logs from remote machines") anonLogger.info("================================================") testCaseBaseDir = testcaseEnv.testCaseBaseDir + tcConfigsList = testcaseEnv.testcaseConfigsList for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: hostname = clusterEntityConfigDict["hostname"] @@ -157,6 +160,20 @@ logger.debug("executing command [" + cmdStr + "]", extra=d) system_test_utils.sys_call(cmdStr) + # ============================== + # collect broker log segment file + # ============================== + if role == "broker": + dataLogPathName = system_test_utils.get_data_by_lookup_keyval( + testcaseEnv.testcaseConfigsList, "entity_id", entity_id, "log.dir") + + cmdList = ["scp -r", + hostname + ":" + dataLogPathName, + logPathName] + cmdStr = " ".join(cmdList) + logger.debug("executing command [" + cmdStr + "]", extra=d) + system_test_utils.sys_call(cmdStr) + # ============================== # collect dashboards file # ============================== @@ -1221,113 +1238,7 @@ return leaderReElectionLatency -def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv): - anonLogger.info("================================================") - anonLogger.info("validating broker log segment checksums") - anonLogger.info("================================================") - - # brokerLogCksumDict - - # a dictionary to keep track of log segment files in each brokers and will look like this: - # - # {u'broker-1': {'test_1-0/00000000000000000000.kafka': '91500855', - # 'test_1-0/00000000000000010255.kafka': '1906285795', - # 'test_1-1/00000000000000000000.kafka': '3785861722', - # 'test_1-1/00000000000000010322.kafka': '1731628308'}, - # u'broker-2': {'test_1-0/00000000000000000000.kafka': '91500855', - # 'test_1-0/00000000000000010255.kafka': '1906285795', - # 'test_1-1/00000000000000000000.kafka': '3785861722', - # 'test_1-1/00000000000000010322.kafka': '1731628308'}, - # u'broker-3': {'test_1-0/00000000000000000000.kafka': '91500855', - # 'test_1-0/00000000000000010255.kafka': '1906285795', - # 'test_1-1/00000000000000000000.kafka': '3431356313'}} - brokerLogCksumDict = {} - - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts(clusterEntityConfigDictList, "role", "broker", "entity_id") - - # access all brokers' hosts to get broker id and its corresponding log - # segment file checksums and populate brokerLogCksumDict - for brokerEntityId in brokerEntityIdList: - logCksumDict = {} - - hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname") - logDir = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log.dir") - - # get the log segment file full path name - cmdStr = "ssh " + hostname + " \"find " + logDir + " -name '*.log'\" 2> /dev/null" - logger.debug("executing command [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) - for line in subproc.stdout.readlines(): - # Need a key to identify each corresponding log segment file in different brokers: - # This can be achieved by using part of the full log segment file path as a key to identify - # the individual log segment file checksums. The key can be extracted from the path name - # and starting from "topic-partition" such as: - # full log segment path name : /tmp/kafka_server_1_logs/test_1-0/00000000000000010255.kafka - # part of the path name as key : test_1-0/00000000000000010255.kafka - logSegmentPathName = line.rstrip('\n') - substrIndex = logSegmentPathName.index(logDir) + len(logDir + "/") - logSegmentFile = logSegmentPathName[substrIndex:] - - # get log segment file checksum - cksumCmdStr = "ssh " + hostname + " \"cksum " + logSegmentPathName + " | cut -f1 -d ' '\" 2> /dev/null" - subproc2 = system_test_utils.sys_call_return_subproc(cksumCmdStr) - for line2 in subproc2.stdout.readlines(): - checksum = line2.rstrip('\n') - # use logSegmentFile as a key to associate with its checksum - logCksumDict[logSegmentFile] = checksum - - # associate this logCksumDict with its broker id - brokerLogCksumDict["broker-"+brokerEntityId] = logCksumDict - - # use a list of sets for checksums comparison - sets = [] - for brokerId, logCksumDict in brokerLogCksumDict.items(): - sets.append( set(logCksumDict.items()) ) - - # looping through the sets and compare between broker[n] & broker[n+1] ... - idx = 0 - diffItemSet = None - while idx < len(sets) - 1: - diffItemSet = sets[idx] ^ sets[idx + 1] - - if (len(diffItemSet) > 0): - logger.error("Mismatch found : " + str(diffItemSet), extra=d) - testcaseEnv.validationStatusDict["Log segment checksum matching across all replicas"] = "FAILED" - - # get the mismatched items key, i.e. the log segment file name - diffItemList = list(diffItemSet) - diffItemKeys = [] - for keyvalSet in diffItemList: - keyvalList = list(keyvalSet) - diffItemKeys.append(keyvalList[0]) - - # mismatch found - so print out the whole log segment file checksum - # info with the mismatched checksum highlighted - for brokerId in sorted(brokerLogCksumDict.iterkeys()): - logCksumDict = brokerLogCksumDict[brokerId] - print brokerId,":" - for logSegmentFile in sorted(logCksumDict.iterkeys()): - checksum = logCksumDict[logSegmentFile] - sys.stdout.write(logSegmentFile + " => " + checksum) - try: - if diffItemKeys.index(logSegmentFile) >= 0: - sys.stdout.write(" <<<< not matching across all replicas") - except: - pass - print - print - return - idx += 1 - - # getting here means all log segment checksums matched - testcaseEnv.validationStatusDict["Log segment checksum matching across all replicas"] = "PASSED" - - anonLogger.info("log segment files checksum :") - print - pprint.pprint(brokerLogCksumDict) - print - def stop_all_remote_running_processes(systemTestEnv, testcaseEnv): entityConfigs = systemTestEnv.clusterEntityConfigDictList @@ -1460,4 +1371,129 @@ logger.info("See " + msgChecksumMissingInConsumerLogPathName + " for missing MessageID", extra=d) #return False +def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName="source"): + anonLogger.info("================================================") + anonLogger.info("validating merged broker log segment checksums") + anonLogger.info("================================================") + + brokerLogCksumDict = {} + testCaseBaseDir = testcaseEnv.testCaseBaseDir + tcConfigsList = testcaseEnv.testcaseConfigsList + validationStatusDict = testcaseEnv.validationStatusDict + clusterConfigList = systemTestEnv.clusterEntityConfigDictList + #brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts(clusterConfigList, "role", "broker", "entity_id") + allBrokerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", "broker") + brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts(allBrokerConfigList, "cluster_name", clusterName, "entity_id") + + # loop through all brokers + for brokerEntityId in brokerEntityIdList: + logCksumDict = {} + # remoteLogSegmentPathName : /tmp/kafka_server_4_logs + # => remoteLogSegmentDir : kafka_server_4_logs + remoteLogSegmentPathName = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", brokerEntityId, "log.dir") + remoteLogSegmentDir = os.path.basename(remoteLogSegmentPathName) + logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") + localLogSegmentPath = logPathName + "/" + remoteLogSegmentDir + + # localLogSegmentPath : + # .../system_test/mirror_maker_testsuite/testcase_5002/logs/broker-4/kafka_server_4_logs + # |- test_1-0 + # |- 00000000000000000000.index + # |- 00000000000000000000.log + # |- 00000000000000000020.index + # |- 00000000000000000020.log + # |- . . . + # |- test_1-1 + # |- 00000000000000000000.index + # |- 00000000000000000000.log + # |- 00000000000000000020.index + # |- 00000000000000000020.log + # |- . . . + + # loop through all topicPartition directories such as : test_1-0, test_1-1, ... + for topicPartition in os.listdir(localLogSegmentPath): + # found a topic-partition directory + if os.path.isdir(localLogSegmentPath + "/" + topicPartition): + # md5 hasher + m = hashlib.md5() + + # logSegmentKey is like this : kafka_server_9_logs:test_1-0 (delimited by ':') + logSegmentKey = remoteLogSegmentDir + ":" + topicPartition + + # log segment files are located in : localLogSegmentPath + "/" + topicPartition + # sort the log segment files under each topic-partition and get the md5 checksum + for logFile in sorted(os.listdir(localLogSegmentPath + "/" + topicPartition)): + # only process log file: *.log + if logFile.endswith(".log"): + # read the log segment file as binary + offsetLogSegmentPathName = localLogSegmentPath + "/" + topicPartition + "/" + logFile + fin = file(offsetLogSegmentPathName, 'rb') + # keep reading 64K max at a time + while True: + data = fin.read(65536) + if not data: + fin.close() + break + # update it into the hasher + m.update(data) + + # update the md5 checksum into brokerLogCksumDict with the corresponding key + brokerLogCksumDict[logSegmentKey] = m.hexdigest() + + # print it out to the console for reference + pprint.pprint(brokerLogCksumDict) + + # brokerLogCksumDict will look like this: + # { + # 'kafka_server_1_logs:tests_1-0': 'd41d8cd98f00b204e9800998ecf8427e', + # 'kafka_server_1_logs:tests_1-1': 'd41d8cd98f00b204e9800998ecf8427e', + # 'kafka_server_1_logs:tests_2-0': 'd41d8cd98f00b204e9800998ecf8427e', + # 'kafka_server_1_logs:tests_2-1': 'd41d8cd98f00b204e9800998ecf8427e', + # 'kafka_server_2_logs:tests_1-0': 'd41d8cd98f00b204e9800998ecf8427e', + # 'kafka_server_2_logs:tests_1-1': 'd41d8cd98f00b204e9800998ecf8427e', + # 'kafka_server_2_logs:tests_2-0': 'd41d8cd98f00b204e9800998ecf8427e', + # 'kafka_server_2_logs:tests_2-1': 'd41d8cd98f00b204e9800998ecf8427e' + # } + + checksumDict = {} + # organize the checksum according to their topic-partition and checksumDict will look like this: + # { + # 'test_1-0' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'], + # 'test_1-1' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'], + # 'test_2-0' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'], + # 'test_2-1' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'] + # } + + for brokerTopicPartitionKey, md5Checksum in brokerLogCksumDict.items(): + tokens = brokerTopicPartitionKey.split(":") + brokerKey = tokens[0] + topicPartition = tokens[1] + if topicPartition in checksumDict: + # key already exist + checksumDict[topicPartition].append(md5Checksum) + else: + # new key => create a new list to store checksum + checksumDict[topicPartition] = [] + checksumDict[topicPartition].append(md5Checksum) + + failureCount = 0 + + # loop through checksumDict: the checksums should be the same inside each + # topic-partition's list. Otherwise, checksum mismatched is detected + for topicPartition, checksumList in checksumDict.items(): + checksumSet = frozenset(checksumList) + if len(checksumSet) > 1: + failureCount += 1 + logger.error("merged log segment checksum in " + topicPartition + " mismatched", extra=d) + elif len(checksumSet) == 1: + logger.debug("merged log segment checksum in " + topicPartition + " matched", extra=d) + else: + logger.error("unexpected error in " + topicPartition, extra=d) + + if failureCount == 0: + validationStatusDict["Validate for merged log segment checksum in cluster [" + clusterName + "]"] = "PASSED" + else: + validationStatusDict["Validate for merged log segment checksum in cluster [" + clusterName + "]"] = "FAILED" + + Index: system_test/mirror_maker_testsuite/mirror_maker_test.py =================================================================== --- system_test/mirror_maker_testsuite/mirror_maker_test.py (revision 1398132) +++ system_test/mirror_maker_testsuite/mirror_maker_test.py (working copy) @@ -170,9 +170,6 @@ self.anonLogger.info("sleeping for 5s") time.sleep(5) - #print "#### sleeping for 30 min" - #time.sleep(1800) - self.log_message("creating topics") kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv) self.anonLogger.info("sleeping for 5s") @@ -189,7 +186,7 @@ time.sleep(int(msgProducingFreeTimeSec)) # ============================================= - # A while-loop to bounce leader as specified + # A while-loop to bounce mirror maker as specified # by "num_iterations" in testcase_n_properties.json # ============================================= i = 1 @@ -198,47 +195,26 @@ self.log_message("Iteration " + str(i) + " of " + str(numIterations)) - self.log_message("looking up leader") - leaderDict = kafka_system_test_utils.get_leader_elected_log_line( - self.systemTestEnv, self.testcaseEnv, self.leaderAttributesDict) - - # ========================== - # leaderDict looks like this: - # ========================== - #{'entity_id': u'3', - # 'partition': '0', - # 'timestamp': 1345050255.8280001, - # 'hostname': u'localhost', - # 'topic': 'test_1', - # 'brokerid': '3'} - # ============================================= - # validate to see if leader election is successful + # Bounce Mirror Maker if specified in testcase config # ============================================= - self.log_message("validating leader election") - result = kafka_system_test_utils.validate_leader_election_successful( - self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict) - - # ============================================= - # trigger leader re-election by stopping leader - # to get re-election latency - # ============================================= - bounceLeaderFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_leader"] - self.log_message("bounce_leader flag : " + bounceLeaderFlag) - if (bounceLeaderFlag.lower() == "true"): - reelectionLatency = kafka_system_test_utils.get_reelection_latency( - self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict) - latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"] - self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms" - - # ============================================= - # starting previously terminated broker - # ============================================= - if bounceLeaderFlag.lower() == "true": - self.log_message("starting the previously terminated broker") - stoppedLeaderEntityId = leaderDict["entity_id"] - kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId) + bounceMirrorMaker = self.testcaseEnv.testcaseArgumentsDict["bounce_mirror_maker"] + self.log_message("bounce_mirror_maker flag : " + bounceMirrorMaker) + if (bounceMirrorMaker.lower() == "true"): + clusterConfigList = self.systemTestEnv.clusterEntityConfigDictList + mirrorMakerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + clusterConfigList, "role", "mirror_maker", "entity_id") + + mirrorMakerPPid = self.testcaseEnv.entityMirrorMakerParentPidDict[mirrorMakerEntityIdList[0]] + self.log_message("stopping mirror maker : " + mirrorMakerPPid) + kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, mirrorMakerEntityIdList[0], mirrorMakerPPid) + time.sleep(1) + + # starting previously terminated broker + self.log_message("starting the previously terminated mirror maker") + kafka_system_test_utils.start_mirror_makers(self.systemTestEnv, self.testcaseEnv) + self.anonLogger.info("sleeping for 15s") time.sleep(15) i += 1 @@ -294,13 +270,14 @@ # collect logs from remote hosts # ============================================= kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv) - + # ============================================= # validate the data matched and checksum # ============================================= self.log_message("validating data matched") kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv) kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) + kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv, "target") # ============================================= # draw graphs @@ -313,7 +290,7 @@ metrics.build_all_dashboards(self.systemTestEnv.METRICS_PATHNAME, self.testcaseEnv.testCaseDashboardsDir, self.systemTestEnv.clusterEntityConfigDictList) - + except Exception as e: self.log_message("Exception while running test {0}".format(e)) traceback.print_exc() Index: system_test/mirror_maker_testsuite/testcase_5001/testcase_5001_properties.json =================================================================== --- system_test/mirror_maker_testsuite/testcase_5001/testcase_5001_properties.json (revision 1398132) +++ system_test/mirror_maker_testsuite/testcase_5001/testcase_5001_properties.json (working copy) @@ -11,6 +11,7 @@ }, "testcase_args": { "bounce_leader": "false", + "bounce_mirror_maker": "false", "replica_factor": "3", "num_partition": "1", "num_iteration": "1", Index: system_test/mirror_maker_testsuite/testcase_5002/testcase_5002_properties.json =================================================================== --- system_test/mirror_maker_testsuite/testcase_5002/testcase_5002_properties.json (revision 0) +++ system_test/mirror_maker_testsuite/testcase_5002/testcase_5002_properties.json (revision 0) @@ -0,0 +1,136 @@ +{ + "description": {"01":"Replication with Mirror Maker => Bounce Mirror Maker", + "02":"Set up 2 clusters such as : SOURCE => MirrorMaker => TARGET", + "03":"Set up 2-node Zk cluster for both SOURCE & TARGET", + "04":"Produce and consume messages to a single topic - single partition.", + "05":"This test sends messages to 3 replicas", + "06":"At the end it verifies the log size and contents", + "07":"Use a consumer to verify no message loss in TARGET cluster.", + "08":"Producer dimensions : mode:sync, acks:-1, comp:0", + "09":"Log segment size : 10240" + }, + "testcase_args": { + "bounce_leader": "false", + "bounce_mirror_maker": "true", + "replica_factor": "3", + "num_partition": "1", + "num_iteration": "1", + "sleep_seconds_between_producer_calls": "1", + "message_producing_free_time_sec": "15", + "num_messages_to_produce_per_producer_call": "50" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2108", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_0.log", + "config_filename": "zookeeper_0.properties" + }, + { + "entity_id": "1", + "clientPort": "2118", + "dataDir": "/tmp/zookeeper_1", + "log_filename": "zookeeper_1.log", + "config_filename": "zookeeper_1.properties" + }, + + { + "entity_id": "2", + "clientPort": "2128", + "dataDir": "/tmp/zookeeper_2", + "log_filename": "zookeeper_2.log", + "config_filename": "zookeeper_2.properties" + }, + { + "entity_id": "3", + "clientPort": "2138", + "dataDir": "/tmp/zookeeper_3", + "log_filename": "zookeeper_3.log", + "config_filename": "zookeeper_3.properties" + }, + + { + "entity_id": "4", + "port": "9091", + "brokerid": "1", + "log.file.size": "10240", + "log.dir": "/tmp/kafka_server_4_logs", + "log_filename": "kafka_server_4.log", + "config_filename": "kafka_server_4.properties" + }, + { + "entity_id": "5", + "port": "9092", + "brokerid": "2", + "log.file.size": "10240", + "log.dir": "/tmp/kafka_server_5_logs", + "log_filename": "kafka_server_5.log", + "config_filename": "kafka_server_5.properties" + }, + { + "entity_id": "6", + "port": "9093", + "brokerid": "3", + "log.file.size": "10240", + "log.dir": "/tmp/kafka_server_6_logs", + "log_filename": "kafka_server_6.log", + "config_filename": "kafka_server_6.properties" + }, + { + "entity_id": "7", + "port": "9094", + "brokerid": "4", + "log.file.size": "10240", + "log.dir": "/tmp/kafka_server_7_logs", + "log_filename": "kafka_server_7.log", + "config_filename": "kafka_server_7.properties" + }, + { + "entity_id": "8", + "port": "9095", + "brokerid": "5", + "log.file.size": "10240", + "log.dir": "/tmp/kafka_server_8_logs", + "log_filename": "kafka_server_8.log", + "config_filename": "kafka_server_8.properties" + }, + { + "entity_id": "9", + "port": "9096", + "brokerid": "6", + "log.file.size": "10240", + "log.dir": "/tmp/kafka_server_9_logs", + "log_filename": "kafka_server_9.log", + "config_filename": "kafka_server_9.properties" + }, + + { + "entity_id": "10", + "topic": "test_1", + "threads": "5", + "compression-codec": "0", + "message-size": "500", + "message": "100", + "request-num-acks": "-1", + "async":"false", + "log_filename": "producer_performance_10.log", + "config_filename": "producer_performance_10.properties" + }, + { + "entity_id": "11", + "topic": "test_1", + "groupid": "mytestgroup", + "consumer-timeout-ms": "10000", + "log_filename": "console_consumer_11.log", + "config_filename": "console_consumer_11.properties" + }, + + { + "entity_id": "12", + "log_filename": "mirror_maker_12.log", + "mirror_consumer_config_filename": "mirror_consumer_12.properties", + "mirror_producer_config_filename": "mirror_producer_12.properties" + } + ] +} Index: system_test/testcase_to_run.json =================================================================== --- system_test/testcase_to_run.json (revision 1398132) +++ system_test/testcase_to_run.json (working copy) @@ -1,6 +1,5 @@ { "ReplicaBasicTest" : [ - "testcase_0001", - "testcase_1" + "testcase_2" ] } Index: system_test/metrics.json =================================================================== --- system_test/metrics.json (revision 1398132) +++ system_test/metrics.json (working copy) @@ -3,115 +3,7 @@ { "role": "broker", "graphs": [ - { - "graph_name": "Produce-Request-Rate", - "y_label": "requests-per-sec", - "bean_name": "kafka.network:type=RequestMetrics,name=Produce-RequestsPerSec", - "attributes": "OneMinuteRate" - }, - { - "graph_name": "Produce-Request-Time", - "y_label": "ns,ns", - "bean_name": "kafka.network:type=RequestMetrics,name=Produce-TotalTimeNs", - "attributes": "Mean,99thPercentile" - }, - { - "graph_name": "Produce-Request-Remote-Time", - "y_label": "ns,ns", - "bean_name": "kafka.network:type=RequestMetrics,name=Produce-RemoteTimeNs", - "attributes": "Mean,99thPercentile" - }, - { - "graph_name": "Fetch-Consumer-Request-Rate", - "y_label": "requests-per-sec", - "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-RequestsPerSec", - "attributes": "OneMinuteRate" - }, - { - "graph_name": "Fetch-Consumer-Request-Time", - "y_label": "ns,ns", - "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-TotalTimeNs", - "attributes": "Mean,99thPercentile" - }, - { - "graph_name": "Fetch-Consumer-Request-Remote-Time", - "y_label": "ns,ns", - "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-RemoteTimeNs", - "attributes": "Mean,99thPercentile" - }, - { - "graph_name": "Fetch-Follower-Request-Rate", - "y_label": "requests-per-sec", - "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-RequestsPerSec", - "attributes": "OneMinuteRate" - }, - { - "graph_name": "Fetch-Follower-Request-Time", - "y_label": "ns,ns", - "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-TotalTimeNs", - "attributes": "Mean,99thPercentile" - }, - { - "graph_name": "Fetch-Follower-Request-Remote-Time", - "y_label": "ns,ns", - "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-RemoteTimeNs", - "attributes": "Mean,99thPercentile" - }, - { - "graph_name": "ProducePurgatoryExpirationRate", - "y_label": "expirations-per-sec", - "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=AllExpiresPerSecond", - "attributes": "OneMinuteRate" - }, - { - "graph_name": "FetchConsumerPurgatoryExpirationRate", - "y_label": "expirations-per-sec", - "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=ConsumerExpiresPerSecond", - "attributes": "OneMinuteRate" - }, { - "graph_name": "FetchFollowerPurgatoryExpirationRate", - "y_label": "expirations-per-sec", - "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=FollowerExpiresPerSecond", - "attributes": "OneMinuteRate" - }, - { - "graph_name": "ProducePurgatoryQueueSize", - "y_label": "size", - "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests", - "attributes": "Value" - }, - { - "graph_name": "FetchPurgatoryQueueSize", - "y_label": "size", - "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests", - "attributes": "Value" - }, - { - "graph_name": "ControllerLeaderElectionRateAndTime", - "y_label": "elections-per-sec,ms,ms", - "bean_name": "kafka.server:type=ControllerStat,name=LeaderElectionRateAndTimeMs", - "attributes": "OneMinuteRate,Mean,99thPercentile" - }, - { - "graph_name": "LogFlushRateAndTime", - "y_label": "flushes-per-sec,ms,ms", - "bean_name": "kafka.message:type=LogFlushStats,name=LogFlushRateAndTimeMs", - "attributes": "OneMinuteRate,Mean,99thPercentile" - }, - { - "graph_name": "AllBytesOutRate", - "y_label": "bytes-per-sec", - "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesOutPerSec", - "attributes": "OneMinuteRate" - }, - { - "graph_name": "AllBytesInRate", - "y_label": "bytes-per-sec", - "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesInPerSec", - "attributes": "OneMinuteRate" - }, - { "graph_name": "AllMessagesInRate", "y_label": "messages-per-sec", "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsMessagesInPerSec", @@ -123,12 +15,6 @@ "role": "producer_performance", "graphs": [ { - "graph_name": "ProduceRequestRateAndTime", - "y_label": "requests-per-sec,ms,ms", - "bean_name": "kafka.producer:type=ProducerRequestStat,name=ProduceRequestRateAndTimeMs", - "attributes": "OneMinuteRate,Mean,99thPercentile" - }, - { "graph_name": "ProduceRequestSize", "y_label": "bytes,bytes", "bean_name": "kafka.producer:type=ProducerRequestStat,name=ProducerRequestSize", @@ -140,18 +26,6 @@ "role": "console_consumer", "graphs": [ { - "graph_name": "FetchRequestRateAndTime", - "y_label": "requests-per-sec,ms,ms", - "bean_name": "kafka.consumer:type=FetchRequestAndResponseStat,name=FetchRequestRateAndTimeMs", - "attributes": "OneMinuteRate,Mean,99thPercentile" - }, - { - "graph_name": "FetchResponseSize", - "y_label": "bytes,bytes", - "bean_name": "kafka.consumer:type=FetchRequestAndResponseStat,name=FetchResponseSize", - "attributes": "Mean,99thPercentile" - }, - { "graph_name": "ConsumedMessageRate", "y_label": "messages-per-sec", "bean_name": "kafka.consumer:type=ConsumerTopicStat,name=AllTopicsMessagesPerSec",