From ae357faa6448a98261df9f3ac85df75d043ada65 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 19 Mar 2015 15:47:31 -0700 Subject: [PATCH 1/2] dummy --- .../replication_testsuite/replica_basic_test.py | 18 +++++++++++++++--- system_test/utils/kafka_system_test_utils.py | 21 +++++++++++++++++++++ 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py index 16a24a4..a0b9e3b 100644 --- a/system_test/replication_testsuite/replica_basic_test.py +++ b/system_test/replication_testsuite/replica_basic_test.py @@ -125,17 +125,27 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): logRetentionTest = self.testcaseEnv.testcaseArgumentsDict["log_retention_test"] except: pass + consumerMultiTopicsMode = "false" try: consumerMultiTopicsMode = self.testcaseEnv.testcaseArgumentsDict["consumer_multi_topics_mode"] except: pass + autoCreateTopic = "false" try: autoCreateTopic = self.testcaseEnv.testcaseArgumentsDict["auto_create_topic"] except: pass + consumerTest = "false" + try: + consumerTest = self.testcaseEnv.testcaseArgumentsDict["consumer_test"] + except: + pass + + startConsumerEarly = logRetentionTest.lower() == "true" or consumerTest.lower() == "true" + # initialize self.testcaseEnv with user-defined environment variables (product specific) self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = "" @@ -193,9 +203,9 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): time.sleep(5) # ============================================= - # start ConsoleConsumer if this is a Log Retention test + # start console consumer early # ============================================= - if logRetentionTest.lower() == "true": + if startConsumerEarly: self.log_message("starting consumer in the background") kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv) time.sleep(1) @@ -397,7 +407,7 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): # ============================================= # starting console consumer # ============================================= - if logRetentionTest.lower() == "false": + if not startConsumerEarly: self.log_message("starting consumer in the background") kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv) time.sleep(10) @@ -427,6 +437,8 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): if logRetentionTest.lower() == "true": kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils) + elif consumerTest.lower() == "true": + kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils) elif consumerMultiTopicsMode.lower() == "true": kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer( self.systemTestEnv, self.testcaseEnv, replicationUtils) diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 41d511c..7f1dc9d 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -861,6 +861,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): def start_console_consumer(systemTestEnv, testcaseEnv): clusterList = systemTestEnv.clusterEntityConfigDictList + testcaseConfigsList = testcaseEnv.testcaseConfigsList consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterList, "role", "console_consumer") for consumerConfig in consumerConfigList: @@ -872,6 +873,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv): kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "java_home") jmxPort = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "jmx_port") + useNewConsumer = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "new-consumer") kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" logger.info("starting console consumer", extra=d) @@ -930,13 +932,31 @@ def start_console_consumer(systemTestEnv, testcaseEnv): logger.error("Invalid cluster name : " + clusterName, extra=d) sys.exit(1) + # set consumer properties consumerProperties = {} consumerProperties["consumer.timeout.ms"] = timeoutMs + + try: + groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "group.id") + consumerProperties["group.id"] = groupOption + except: + pass + + try: + serversOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "bootstrap.servers") + consumerProperties["bootstrap.servers"] = serversOption + except: + pass + props_file_path=write_consumer_properties(consumerProperties) scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/" logger.debug("executing command [" + scpCmdStr + "]", extra=d) system_test_utils.sys_call(scpCmdStr) + boolArgumentsStr = "" + if useNewConsumer.lower() == "true": + boolArgumentsStr = boolArgumentsStr + " --new-consumer" + cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, @@ -948,6 +968,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv): #"--metrics-dir " + metricsDir, formatterOption, "--from-beginning ", + boolArgumentsStr, " >> " + consumerLogPathName, " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] -- 1.7.12.4 From 0c5ba1185e2c97d63b824207d8734ac0405d5c77 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 19 Mar 2015 16:38:01 -0700 Subject: [PATCH 2/2] add new system test files --- .../testcase_6001/cluster_config.json | 58 ++++++++++++++ .../testcase_6001/testcase_6001_properties.json | 87 +++++++++++++++++++++ .../testcase_6002/cluster_config.json | 58 ++++++++++++++ .../testcase_6002/testcase_6002_properties.json | 87 +++++++++++++++++++++ .../testcase_6003/cluster_config.json | 58 ++++++++++++++ .../testcase_6003/testcase_6003_properties.json | 87 +++++++++++++++++++++ .../testcase_6004/cluster_config.json | 58 ++++++++++++++ .../testcase_6004/testcase_6004_properties.json | 91 ++++++++++++++++++++++ 8 files changed, 584 insertions(+) create mode 100644 system_test/replication_testsuite/testcase_6001/cluster_config.json create mode 100644 system_test/replication_testsuite/testcase_6001/testcase_6001_properties.json create mode 100644 system_test/replication_testsuite/testcase_6002/cluster_config.json create mode 100644 system_test/replication_testsuite/testcase_6002/testcase_6002_properties.json create mode 100644 system_test/replication_testsuite/testcase_6003/cluster_config.json create mode 100644 system_test/replication_testsuite/testcase_6003/testcase_6003_properties.json create mode 100644 system_test/replication_testsuite/testcase_6004/cluster_config.json create mode 100644 system_test/replication_testsuite/testcase_6004/testcase_6004_properties.json diff --git a/system_test/replication_testsuite/testcase_6001/cluster_config.json b/system_test/replication_testsuite/testcase_6001/cluster_config.json new file mode 100644 index 0000000..8ed896b --- /dev/null +++ b/system_test/replication_testsuite/testcase_6001/cluster_config.json @@ -0,0 +1,58 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9990" + }, + { + "entity_id": "1", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9991" + }, + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9992" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9993" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "producer_performance", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9997" + }, + { + "entity_id": "5", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9998" + } + ] +} diff --git a/system_test/replication_testsuite/testcase_6001/testcase_6001_properties.json b/system_test/replication_testsuite/testcase_6001/testcase_6001_properties.json new file mode 100644 index 0000000..4b4c9bd --- /dev/null +++ b/system_test/replication_testsuite/testcase_6001/testcase_6001_properties.json @@ -0,0 +1,87 @@ +{ + "description": {"01":"Replication Basic : 1. mode => async; 2. acks => 1; 3. comp => 0", + "02":"Produce and consume messages to a single topic - single partition.", + "03":"This test sends messages to 3 replicas", + "04":"At the end it verifies the log size and contents", + "05":"Use a consumer to verify no message loss.", + "06":"Producer dimensions : mode:async, acks:1, comp:0", + "07":"Log segment size : 20480" + }, + "testcase_args": { + "broker_type": "leader", + "bounce_broker": "false", + "replica_factor": "3", + "num_partition": "1", + "num_iteration": "1", + "sleep_seconds_between_producer_calls": "1", + "message_producing_free_time_sec": "15", + "num_messages_to_produce_per_producer_call": "50", + "consumer_test": "true" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "broker.id": "1", + "log.segment.bytes": "20480", + "log.dir": "/tmp/kafka_server_1_logs", + "default.replication.factor": "3", + "num.partitions": "1", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "broker.id": "2", + "log.segment.bytes": "20480", + "log.dir": "/tmp/kafka_server_2_logs", + "default.replication.factor": "3", + "num.partitions": "1", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "broker.id": "3", + "log.segment.bytes": "20480", + "log.dir": "/tmp/kafka_server_3_logs", + "default.replication.factor": "3", + "num.partitions": "1", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "new-producer":"true", + "topic": "test_1", + "threads": "5", + "compression-codec": "0", + "message-size": "500", + "message": "100", + "request-num-acks": "1", + "producer-retry-backoff-ms": "300", + "sync":"false", + "log_filename": "producer_performance.log", + "config_filename": "producer_performance.properties" + }, + { + "entity_id": "5", + "topic": "test_1", + "new-consumer":"true", + "group.id": "mytestgroup", + "consumer-timeout-ms": "10000", + "bootstrap.servers": "localhost:9991,localhost:9992,localhost:9993", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer.properties" + } + ] +} diff --git a/system_test/replication_testsuite/testcase_6002/cluster_config.json b/system_test/replication_testsuite/testcase_6002/cluster_config.json new file mode 100644 index 0000000..8ed896b --- /dev/null +++ b/system_test/replication_testsuite/testcase_6002/cluster_config.json @@ -0,0 +1,58 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9990" + }, + { + "entity_id": "1", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9991" + }, + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9992" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9993" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "producer_performance", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9997" + }, + { + "entity_id": "5", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9998" + } + ] +} diff --git a/system_test/replication_testsuite/testcase_6002/testcase_6002_properties.json b/system_test/replication_testsuite/testcase_6002/testcase_6002_properties.json new file mode 100644 index 0000000..63198f4 --- /dev/null +++ b/system_test/replication_testsuite/testcase_6002/testcase_6002_properties.json @@ -0,0 +1,87 @@ +{ + "description": {"01":"Replication Basic : 1. mode => async; 2. acks => 1; 3. comp => 1", + "02":"Produce and consume messages to a single topic - single partition.", + "03":"This test sends messages to 3 replicas", + "04":"At the end it verifies the log size and contents", + "05":"Use a consumer to verify no message loss.", + "06":"Producer dimensions : mode:async, acks:1, comp:1", + "07":"Log segment size : 20480" + }, + "testcase_args": { + "broker_type": "leader", + "bounce_broker": "false", + "replica_factor": "3", + "num_partition": "1", + "num_iteration": "1", + "sleep_seconds_between_producer_calls": "1", + "message_producing_free_time_sec": "15", + "num_messages_to_produce_per_producer_call": "50", + "consumer_test": "true" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "broker.id": "1", + "log.segment.bytes": "20480", + "log.dir": "/tmp/kafka_server_1_logs", + "default.replication.factor": "3", + "num.partitions": "1", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "broker.id": "2", + "log.segment.bytes": "20480", + "log.dir": "/tmp/kafka_server_2_logs", + "default.replication.factor": "3", + "num.partitions": "1", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "broker.id": "3", + "log.segment.bytes": "20480", + "log.dir": "/tmp/kafka_server_3_logs", + "default.replication.factor": "3", + "num.partitions": "1", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "new-producer":"true", + "topic": "test_1", + "threads": "5", + "compression-codec": "1", + "message-size": "500", + "message": "100", + "request-num-acks": "1", + "producer-retry-backoff-ms": "300", + "sync":"false", + "log_filename": "producer_performance.log", + "config_filename": "producer_performance.properties" + }, + { + "entity_id": "5", + "topic": "test_1", + "new-consumer":"true", + "group.id": "mytestgroup", + "consumer-timeout-ms": "10000", + "bootstrap.servers": "localhost:9991,localhost:9992,localhost:9993", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer.properties" + } + ] +} diff --git a/system_test/replication_testsuite/testcase_6003/cluster_config.json b/system_test/replication_testsuite/testcase_6003/cluster_config.json new file mode 100644 index 0000000..8ed896b --- /dev/null +++ b/system_test/replication_testsuite/testcase_6003/cluster_config.json @@ -0,0 +1,58 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9990" + }, + { + "entity_id": "1", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9991" + }, + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9992" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9993" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "producer_performance", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9997" + }, + { + "entity_id": "5", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9998" + } + ] +} diff --git a/system_test/replication_testsuite/testcase_6003/testcase_6003_properties.json b/system_test/replication_testsuite/testcase_6003/testcase_6003_properties.json new file mode 100644 index 0000000..8bcea42 --- /dev/null +++ b/system_test/replication_testsuite/testcase_6003/testcase_6003_properties.json @@ -0,0 +1,87 @@ +{ + "description": {"01":"Replication Basic : 1. mode => async; 2. acks => 1; 3. comp => 1", + "02":"Produce and consume messages to a single topic - three partitions.", + "03":"This test sends messages to 3 replicas", + "04":"At the end it verifies the log size and contents", + "05":"Use a consumer to verify no message loss.", + "06":"Producer dimensions : mode:async, acks:1, comp:1", + "07":"Log segment size : 20480" + }, + "testcase_args": { + "broker_type": "leader", + "bounce_broker": "false", + "replica_factor": "3", + "num_partition": "3", + "num_iteration": "1", + "sleep_seconds_between_producer_calls": "1", + "message_producing_free_time_sec": "15", + "num_messages_to_produce_per_producer_call": "50", + "consumer_test": "true" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "broker.id": "1", + "log.segment.bytes": "20480", + "log.dir": "/tmp/kafka_server_1_logs", + "default.replication.factor": "3", + "num.partitions": "1", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "broker.id": "2", + "log.segment.bytes": "20480", + "log.dir": "/tmp/kafka_server_2_logs", + "default.replication.factor": "3", + "num.partitions": "1", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "broker.id": "3", + "log.segment.bytes": "20480", + "log.dir": "/tmp/kafka_server_3_logs", + "default.replication.factor": "3", + "num.partitions": "1", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "new-producer":"true", + "topic": "test_1", + "threads": "5", + "compression-codec": "1", + "message-size": "500", + "message": "100", + "request-num-acks": "1", + "producer-retry-backoff-ms": "300", + "sync":"false", + "log_filename": "producer_performance.log", + "config_filename": "producer_performance.properties" + }, + { + "entity_id": "5", + "topic": "test_1", + "new-consumer":"true", + "group.id": "mytestgroup", + "consumer-timeout-ms": "10000", + "bootstrap.servers": "localhost:9991,localhost:9992,localhost:9993", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer.properties" + } + ] +} diff --git a/system_test/replication_testsuite/testcase_6004/cluster_config.json b/system_test/replication_testsuite/testcase_6004/cluster_config.json new file mode 100644 index 0000000..8ed896b --- /dev/null +++ b/system_test/replication_testsuite/testcase_6004/cluster_config.json @@ -0,0 +1,58 @@ +{ + "cluster_config": [ + { + "entity_id": "0", + "hostname": "localhost", + "role": "zookeeper", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9990" + }, + { + "entity_id": "1", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9991" + }, + { + "entity_id": "2", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9992" + }, + { + "entity_id": "3", + "hostname": "localhost", + "role": "broker", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9993" + }, + { + "entity_id": "4", + "hostname": "localhost", + "role": "producer_performance", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9997" + }, + { + "entity_id": "5", + "hostname": "localhost", + "role": "console_consumer", + "cluster_name": "source", + "kafka_home": "default", + "java_home": "default", + "jmx_port": "9998" + } + ] +} diff --git a/system_test/replication_testsuite/testcase_6004/testcase_6004_properties.json b/system_test/replication_testsuite/testcase_6004/testcase_6004_properties.json new file mode 100644 index 0000000..4dc00e8 --- /dev/null +++ b/system_test/replication_testsuite/testcase_6004/testcase_6004_properties.json @@ -0,0 +1,91 @@ +{ + "description": {"01":"Leader Failure in Replication : Base Test", + "02":"Produce and consume messages to a single topic - three partitions.", + "03":"This test sends messages to 3 replicas", + "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)", + "05":"Restart the terminated broker", + "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully", + "07":"At the end it verifies the log size and contents", + "08":"Use a consumer to verify no message loss.", + "09":"Producer dimensions : mode:sync, acks:11, comp:1", + "10":"Log segment size : 20480" + }, + + "testcase_args": { + "broker_type": "leader", + "bounce_broker": "true", + "replica_factor": "3", + "num_partition": "3", + "num_iteration": "1", + "sleep_seconds_between_producer_calls": "1", + "message_producing_free_time_sec": "15", + "num_messages_to_produce_per_producer_call": "50", + "consumer_test": "true" + }, + "entities": [ + { + "entity_id": "0", + "clientPort": "2188", + "dataDir": "/tmp/zookeeper_0", + "log_filename": "zookeeper_2188.log", + "config_filename": "zookeeper_2188.properties" + }, + { + "entity_id": "1", + "port": "9091", + "broker.id": "1", + "log.segment.bytes": "20480", + "log.dir": "/tmp/kafka_server_1_logs", + "default.replication.factor": "3", + "num.partitions": "1", + "log_filename": "kafka_server_9091.log", + "config_filename": "kafka_server_9091.properties" + }, + { + "entity_id": "2", + "port": "9092", + "broker.id": "2", + "log.segment.bytes": "20480", + "log.dir": "/tmp/kafka_server_2_logs", + "default.replication.factor": "3", + "num.partitions": "1", + "log_filename": "kafka_server_9092.log", + "config_filename": "kafka_server_9092.properties" + }, + { + "entity_id": "3", + "port": "9093", + "broker.id": "3", + "log.segment.bytes": "20480", + "log.dir": "/tmp/kafka_server_3_logs", + "default.replication.factor": "3", + "num.partitions": "1", + "log_filename": "kafka_server_9093.log", + "config_filename": "kafka_server_9093.properties" + }, + { + "entity_id": "4", + "new-producer":"true", + "topic": "test_1", + "threads": "5", + "compression-codec": "1", + "message-size": "500", + "message": "100", + "request-num-acks": "1", + "producer-retry-backoff-ms": "300", + "sync":"false", + "log_filename": "producer_performance.log", + "config_filename": "producer_performance.properties" + }, + { + "entity_id": "5", + "topic": "test_1", + "new-consumer":"true", + "group.id": "mytestgroup", + "consumer-timeout-ms": "10000", + "bootstrap.servers": "localhost:9991,localhost:9992,localhost:9993", + "log_filename": "console_consumer.log", + "config_filename": "console_consumer.properties" + } + ] +} -- 1.7.12.4