Index: system_test/single_host_multi_brokers/config/server_source_1.properties =================================================================== --- system_test/single_host_multi_brokers/config/server_source_1.properties (revision 0) +++ system_test/single_host_multi_brokers/config/server_source_1.properties (revision 0) @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +brokerid=0 + +# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned +# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost +# may not be what you want. +#hostname= + + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9091 + +# The number of threads handling network requests +network.threads=2 + +# The number of threads doing disk I/O +io.threads=2 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +max.socket.request.bytes=104857600 + + +############################# Log Basics ############################# + +# The directory under which to store log files +log.dir=/tmp/kafka_source_1_logs + +# The number of logical partitions per topic per server. More partitions allow greater parallelism +# for consumption, but also mean more files. +num.partitions=1 + +# Overrides for for the default given by num.partitions on a per-topic basis +#topic.partition.count.map=topic1:3, topic2:4 + +############################# Log Flush Policy ############################# + +# The following configurations control the flush of data to disk. This is the most +# important performance knob in kafka. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. +# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). +# 3. Throughput: The flush is generally the most expensive operation. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +log.default.flush.interval.ms=1000 + +# Per-topic overrides for log.default.flush.interval.ms +#topic.flush.intervals.ms=topic1:1000, topic2:3000 + +# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. +log.default.flush.scheduler.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.size. +#log.retention.size=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.file.size=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.cleanup.interval.mins=1 + +############################# Zookeeper ############################# + +# Enable connecting to zookeeper +enable.zookeeper=true + +# Zk connection string (see zk docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zk.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 Index: system_test/single_host_multi_brokers/config/server_source_2.properties =================================================================== --- system_test/single_host_multi_brokers/config/server_source_2.properties (revision 0) +++ system_test/single_host_multi_brokers/config/server_source_2.properties (revision 0) @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +brokerid=1 + +# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned +# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost +# may not be what you want. +#hostname= + + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9092 + +# The number of threads handling network requests +network.threads=2 + +# The number of threads doing disk I/O +io.threads=2 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +max.socket.request.bytes=104857600 + + +############################# Log Basics ############################# + +# The directory under which to store log files +log.dir=/tmp/kafka_source_2_logs + +# The number of logical partitions per topic per server. More partitions allow greater parallelism +# for consumption, but also mean more files. +num.partitions=1 + +# Overrides for for the default given by num.partitions on a per-topic basis +#topic.partition.count.map=topic1:3, topic2:4 + +############################# Log Flush Policy ############################# + +# The following configurations control the flush of data to disk. This is the most +# important performance knob in kafka. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. +# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). +# 3. Throughput: The flush is generally the most expensive operation. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +log.default.flush.interval.ms=1000 + +# Per-topic overrides for log.default.flush.interval.ms +#topic.flush.intervals.ms=topic1:1000, topic2:3000 + +# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. +log.default.flush.scheduler.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.size. +#log.retention.size=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.file.size=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.cleanup.interval.mins=1 + +############################# Zookeeper ############################# + +# Enable connecting to zookeeper +enable.zookeeper=true + +# Zk connection string (see zk docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zk.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 Index: system_test/single_host_multi_brokers/config/server_source_3.properties =================================================================== --- system_test/single_host_multi_brokers/config/server_source_3.properties (revision 0) +++ system_test/single_host_multi_brokers/config/server_source_3.properties (revision 0) @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +brokerid=2 + +# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned +# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost +# may not be what you want. +#hostname= + + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9093 + +# The number of threads handling network requests +network.threads=2 + +# The number of threads doing disk I/O +io.threads=2 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +max.socket.request.bytes=104857600 + + +############################# Log Basics ############################# + +# The directory under which to store log files +log.dir=/tmp/kafka_source_3_logs + +# The number of logical partitions per topic per server. More partitions allow greater parallelism +# for consumption, but also mean more files. +num.partitions=1 + +# Overrides for for the default given by num.partitions on a per-topic basis +#topic.partition.count.map=topic1:3, topic2:4 + +############################# Log Flush Policy ############################# + +# The following configurations control the flush of data to disk. This is the most +# important performance knob in kafka. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. +# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). +# 3. Throughput: The flush is generally the most expensive operation. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +log.default.flush.interval.ms=1000 + +# Per-topic overrides for log.default.flush.interval.ms +#topic.flush.intervals.ms=topic1:1000, topic2:3000 + +# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. +log.default.flush.scheduler.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.size. +#log.retention.size=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.file.size=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.cleanup.interval.mins=1 + +############################# Zookeeper ############################# + +# Enable connecting to zookeeper +enable.zookeeper=true + +# Zk connection string (see zk docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zk.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 Index: system_test/single_host_multi_brokers/config/consumer.properties =================================================================== --- system_test/single_host_multi_brokers/config/consumer.properties (revision 0) +++ system_test/single_host_multi_brokers/config/consumer.properties (revision 0) @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.consumer.ConsumerConfig for more details + +# zk connection string +# comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" +zk.connect=127.0.0.1:2181 + +# timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 + +#consumer group id +groupid=test-consumer-group + +#consumer timeout +#consumer.timeout.ms=5000 Index: system_test/single_host_multi_brokers/config/log4j.properties =================================================================== --- system_test/single_host_multi_brokers/config/log4j.properties (revision 0) +++ system_test/single_host_multi_brokers/config/log4j.properties (revision 0) @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +#log4j.appender.fileAppender=org.apache.log4j.FileAppender +#log4j.appender.fileAppender.File=kafka-request.log +#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout +#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n + + +# Turn on all our debugging info +#log4j.logger.kafka=INFO +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG + +# to print message checksum from ProducerPerformance +log4j.logger.kafka.perf=DEBUG +log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG + +# to print message checksum from ProducerPerformance +log4j.logger.kafka.perf=DEBUG +log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG + Index: system_test/single_host_multi_brokers/config/producer.properties =================================================================== --- system_test/single_host_multi_brokers/config/producer.properties (revision 0) +++ system_test/single_host_multi_brokers/config/producer.properties (revision 0) @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.producer.ProducerConfig for more details + +############################# Producer Basics ############################# + +# need to set either broker.list or zk.connect + +# configure brokers statically +# format: brokerid1:host1:port1,brokerid2:host2:port2 ... +#broker.list=0:localhost:9092 + +# discover brokers from ZK +zk.connect=localhost:2181 + +# zookeeper session timeout; default is 6000 +#zk.sessiontimeout.ms= + +# the max time that the client waits to establish a connection to zookeeper; default is 6000 +#zk.connectiontimeout.ms + +# name of the partitioner class for partitioning events; default partition spreads data randomly +#partitioner.class= + +# specifies whether the messages are sent asynchronously (async) or synchronously (sync) +producer.type=sync + +# specify the compression codec for all data generated: 0: no compression, 1: gzip +compression.codec=0 + +# message encoder +serializer.class=kafka.serializer.StringEncoder + +# allow topic level compression +#compressed.topics= + +# max message size; messages larger than that size are discarded; default is 1000000 +#max.message.size= + + +############################# Async Producer ############################# +# maximum time, in milliseconds, for buffering data on the producer queue +#queue.time= + +# the maximum size of the blocking queue for buffering on the producer +#queue.size= + +# Timeout for event enqueue: +# 0: events will be enqueued immediately or dropped if the queue is full +# -ve: enqueue will block indefinitely if the queue is full +# +ve: enqueue will block up to this many milliseconds if the queue is full +#queue.enqueueTimeout.ms= + +# the number of messages batched at the producer +#batch.size= + +# the callback handler for one or multiple events +#callback.handler= + +# properties required to initialize the callback handler +#callback.handler.props= + +# the handler for events +#event.handler= + +# properties required to initialize the event handler +#event.handler.props= + Index: system_test/single_host_multi_brokers/config/zookeeper_source.properties =================================================================== --- system_test/single_host_multi_brokers/config/zookeeper_source.properties (revision 0) +++ system_test/single_host_multi_brokers/config/zookeeper_source.properties (revision 0) @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=/tmp/zookeeper_source +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 Index: system_test/single_host_multi_brokers/bin/run-test.sh =================================================================== --- system_test/single_host_multi_brokers/bin/run-test.sh (revision 0) +++ system_test/single_host_multi_brokers/bin/run-test.sh (revision 0) @@ -0,0 +1,406 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ================================================================= +# run-test.sh +# =========== +# +# ================================================================= + + +readonly base_dir=$(dirname $0)/.. +readonly config_dir=$base_dir/config +readonly test_start_time="$(date +%s)" + +readonly zk_source_prop_pathname=${config_dir}/zookeeper_source.properties +readonly zk_source_log4j_log_pathname=${base_dir}/zookeeper_source.log +readonly zk_target_prop_pathname=${config_dir}/zookeeper_target.properties +readonly zk_target_log4j_log_pathname=${base_dir}/zookeeper_target.log + +readonly producer_prop_pathname=${config_dir}/producer.properties +readonly consumer_prop_pathname=${config_dir}/consumer.properties + +readonly producer_perf_output_log_pathname=${base_dir}/producer_perf_output.log +readonly producer_perf_crc_log_pathname=${base_dir}/producer_perf_crc.log +readonly producer_perf_crc_sorted_log_pathname=${base_dir}/producer_perf_crc_sorted.log +readonly producer_perf_crc_sorted_uniq_log_pathname=${base_dir}/producer_perf_crc_sorted_uniq.log + +readonly console_consumer_log_pathname=${base_dir}/console_consumer.log +readonly console_consumer_crc_log_pathname=${base_dir}/console_consumer_crc.log +readonly console_consumer_crc_sorted_log_pathname=${base_dir}/console_consumer_crc_sorted.log +readonly console_consumer_crc_sorted_uniq_log_pathname=${base_dir}/console_consumer_crc_sorted_uniq.log + +readonly this_test_stderr_output_log_pathname=${base_dir}/this_test_stderr_output.log + +readonly num_kafka_source_server=3 # requires same no. of property files such as: + # $config_dir/server_source_{1..n}.properties +readonly num_kafka_target_server=0 # requires same no. of property files such as: + # $config_dir/server_target_{1..n}.properties +readonly test_topic=mytest_1 +readonly replica_factor=3 # should be less than or equal to num_kafka_source_server + +# arrays for properties of kafka source brokers +kafak_source_data_log_dirs= +kafka_source_log4j_log_pathnames= +kafka_source_prop_pathnames= +kafka_source_server_brokerids= +kafka_source_server_sock_ports= +kafka_source_server_first_data_file_sizes= +kafka_source_server_first_data_file_checksums= + +# arrays for properties of kafka target brokers +kafak_target_data_log_dirs= +kafka_target_log4j_log_pathnames= +kafka_target_prop_pathnames= +kafka_target_server_brokerids= +kafka_target_server_sock_ports= + +zk_source_port= +zk_source_data_log_dir= +zk_target_port= +zk_target_data_log_dir= + +pid_zk_source= +pid_zk_target= +kafka_source_pids= + +initialize() { + info "initializing ..." + + zk_source_port=`grep clientPort ${zk_source_prop_pathname} | cut -f2 -d '='` + zk_source_data_log_dir=`grep dataDir ${zk_source_prop_pathname} | cut -f2 -d '='` + + #zk_target_port=`grep clientPort ${zk_target_prop_pathname} | cut -f2 -d '='` + #zk_target_data_log_dir=`grep dataDir ${zk_target_prop_pathname} | cut -f2 -d '='` + + for ((i=1; i<=$num_kafka_source_server; i++)) + do + kafka_source_log4j_log_pathnames[${i}]=$base_dir/kafka_source_${i}.log + kafka_source_prop_pathnames[${i}]=${config_dir}/server_source_${i}.properties + + kafka_source_data_log_dirs[${i}]=`grep ^log.dir ${kafka_source_prop_pathnames[${i}]} | cut -f2 -d '='` + kafka_source_server_brokerids[${i}]=`grep ^brokerid= ${kafka_source_prop_pathnames[${i}]} | cut -f2 -d '='` + kafka_source_server_sock_ports[${i}]=`grep ^port= ${kafka_source_prop_pathnames[${i}]} | cut -f2 -d '='` + + info "kafka source $i data dir : ${kafka_source_data_log_dirs[$i]}" + info "kafka source $i log4j log : ${kafka_source_log4j_log_pathnames[$i]}" + info "kafka source $i prop file : ${kafka_source_prop_pathnames[$i]}" + info "kafka source $i brokerid : ${kafka_source_server_brokerids[$i]}" + info "kafka source $i socket : ${kafka_source_server_sock_ports[$i]}" + done + + for ((i=1; i<=$num_kafka_target_server; i++)) + do + kafka_target_log4j_log_pathnames[${i}]=$base_dir/kafka_target_${i}.log + kafka_target_prop_pathnames[${i}]=${config_dir}/server_target_${i}.properties + + kafka_target_data_log_dirs[${i}]=`grep ^log.dir ${kafka_target_prop_pathnames[${i}]} | cut -f2 -d '='` + kafka_target_server_brokerids[${i}]=`grep ^brokerid= ${kafka_target_prop_pathnames[${i}]} | cut -f2 -d '='` + kafka_target_server_sock_ports[${i}]=`grep ^port= ${kafka_target_prop_pathnames[${i}]} | cut -f2 -d '='` + + info "kafka target $i data dir : ${kafka_target_data_log_dirs[$i]}" + info "kafka target $i log4j log : ${kafka_target_log4j_log_pathnames[$i]}" + info "kafka target $i prop file : ${kafka_target_prop_pathnames[$i]}" + info "kafka target $i brokerid : ${kafka_target_server_brokerids[$i]}" + info "kafka target $i socket : ${kafka_target_server_sock_ports[$i]}" + done + + info "zookeeper source port : $zk_source_port" + info "zookeeper source data dir : $zk_source_data_log_dir" + echo +} + +info() { + echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*" +} + +info_no_newline() { + echo -e -n "$(date +"%Y-%m-%d %H:%M:%S") $*" +} + +cleanup() { + info "cleaning up kafka source server log/data dir" + for ((i=1; i<=$num_kafka_source_server; i++)) + do + rm -rf ${kafka_source_data_log_dirs[$i]} + rm -f ${kafka_source_log4j_log_pathnames[$i]} + done + + info "cleaning up kafka target server log/data dir" + for ((i=1; i<=$num_kafka_target_server; i++)) + do + rm -rf ${kafka_target_data_log_dirs[$i]} + rm -f ${kafka_target_log4j_log_pathnames[$i]} + done + + rm -rf $zk_source_data_log_dir + rm -f $zk_source_log4j_log_pathname + rm -f $this_test_stderr_output_log_pathname + + rm -f $producer_perf_output_log_pathname + rm -f $producer_perf_crc_log_pathname + rm -f $producer_perf_crc_sorted_log_pathname + rm -f $producer_perf_crc_sorted_uniq_log_pathname + + rm -f $console_consumer_log_pathname + rm -f $console_consumer_crc_log_pathname + rm -f $console_consumer_crc_sorted_log_pathname + rm -f $console_consumer_crc_sorted_uniq_log_pathname +} + +# ========================================= +# get_random_range - return a random number +# between the lower & upper bounds +# usage: +# get_random_range $lower $upper +# random_no=$? +# ========================================= +get_random_range() { + lo=$1 + up=$2 + range=$(($up - $lo + 1)) + + return $(($(($RANDOM % range)) + $lo)) +} + +get_leader_brokerid() { + log_line=`grep -i -h 'is leader' ${base_dir}/kafka_source_*.log | sort | tail -1` + info "found the log line: $log_line" + broker_id=`echo $log_line | cut -f5 -d ' '` + + return $broker_id +} + +start_zk() { + info "starting zookeeper source" + $base_dir/../../bin/zookeeper-server-start.sh $zk_source_prop_pathname \ + 2>&1 > ${zk_source_log4j_log_pathname} & + pid_zk_source=$! + + #info "starting zookeeper target" + #$base_dir/../../bin/zookeeper-server-start.sh $zk_target_prop_pathname \ + # 2>&1 > ${zk_target_log4j_log_pathname} & + #pid_zk_target=$! +} + +stop_source_server() { + s_idx=$1 + + info "stopping source server: $s_idx" + + if [ "x${kafka_source_pids[${s_idx}]}" != "x" ]; then + kill_child_processes 0 ${kafka_source_pids[${s_idx}]}; + fi + + kafka_source_pids[${s_idx}]= +} + +start_source_server() { + s_idx=$1 + + info "starting kafka source server" + $base_dir/bin/kafka-run-class.sh kafka.Kafka ${kafka_source_prop_pathnames[$s_idx]} \ + 2>&1 >> ${kafka_source_log4j_log_pathnames[$s_idx]} & + kafka_source_pids[${s_idx}]=$! + info " -> kafka_source_pids[$s_idx]: ${kafka_source_pids[$s_idx]}" +} + +start_source_servers_cluster() { + info "starting source cluster" + + for ((i=1; i<=$num_kafka_source_server; i++)) + do + start_source_server $i + done +} + +start_producer_perf() { + this_topic=$1 + zk_conn_str=$2 + no_msg_to_produce=$3 + + info "starting producer performance" + + ${base_dir}/bin/kafka-run-class.sh kafka.perf.ProducerPerformance \ + --brokerinfo "zk.connect=${zk_conn_str}" \ + --topic ${this_topic} \ + --messages $no_msg_to_produce \ + --vary-message-size \ + --message-size 100 \ + --threads 1 \ + --async \ + 2>&1 >> $producer_perf_output_log_pathname +} + +start_console_consumer() { + this_consumer_topic=$1 + this_zk_conn_str=$2 + + info "starting console consumer" + $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \ + --zookeeper $this_zk_conn_str \ + --topic $this_consumer_topic \ + --formatter 'kafka.consumer.ConsoleConsumer$ChecksumMessageFormatter' \ + --consumer-timeout-ms 10000 \ + 2>&1 >> $console_consumer_log_pathname & +} + +kill_child_processes() { + isTopmost=$1 + curPid=$2 + childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}') + + for childPid in $childPids + do + kill_child_processes 0 $childPid + done + if [ $isTopmost -eq 0 ]; then + kill -15 $curPid 2> /dev/null + fi +} + +shutdown_servers() { + + info "shutting down source servers" + for ((i=1; i<=$num_kafka_source_server; i++)) + do + if [ "x${kafka_source_pids[$i]}" != "x" ]; then + kill_child_processes 0 ${kafka_source_pids[$i]}; + fi + done + + info "shutting down zookeeper servers" + if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi + if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi +} + +create_topic() { + this_topic_to_create=$1 + this_zk_conn_str=$2 + this_replica_factor=$3 + + info "creating topic [$this_topic_to_create] on [$this_zk_conn_str]" + $base_dir/../../bin/kafka-create-topic.sh --topic $this_topic_to_create \ + --zookeeper $this_zk_conn_str --replica $this_replica_factor +} + +validate_results() { + + echo + echo "========================================================" + + for ((i=1; i<=$num_kafka_source_server; i++)) + do + first_data_file_dir=${kafka_source_data_log_dirs[$i]}/${test_topic}-0 + first_data_file=`ls ${first_data_file_dir} | head -1` + first_data_file_pathname=${first_data_file_dir}/$first_data_file + kafka_source_server_first_data_file_sizes[$i]=`stat -c%s ${first_data_file_pathname}` + kafka_source_server_first_data_file_checksums[$i]=`cksum ${first_data_file_pathname} | awk '{print $1}'` + echo "broker[$i] data file: ${first_data_file_pathname} : [${kafka_source_server_first_data_file_sizes[$i]}]" + echo " ==> crc ${kafka_source_server_first_data_file_checksums[$i]}" + done + + grep ^checksum $console_consumer_log_pathname | tr -d ' ' | cut -f2 -d ':' > $console_consumer_crc_log_pathname + grep ^checksum $producer_perf_output_log_pathname | tr -d ' ' | cut -f2 -d ':' > $producer_perf_crc_log_pathname + + sort $console_consumer_crc_log_pathname > $console_consumer_crc_sorted_log_pathname + sort $producer_perf_crc_log_pathname > $producer_perf_crc_sorted_log_pathname + + sort -u $console_consumer_crc_sorted_log_pathname > $console_consumer_crc_sorted_uniq_log_pathname + sort -u $producer_perf_crc_sorted_log_pathname > $producer_perf_crc_sorted_uniq_log_pathname + + msg_count_from_console_consumer=`cat $console_consumer_crc_log_pathname | wc -l | tr -d ' '` + uniq_msg_count_from_console_consumer=`cat $console_consumer_crc_sorted_uniq_log_pathname | wc -l | tr -d ' '` + + msg_count_from_producer_perf=`cat $producer_perf_crc_log_pathname | wc -l | tr -d ' '` + uniq_msg_count_from_producer_perf=`cat $producer_perf_crc_sorted_uniq_log_pathname | wc -l | tr -d ' '` + + echo + echo "no. of messages published : $msg_count_from_producer_perf" + echo "producer unique msg published : $uniq_msg_count_from_producer_perf" + echo "console consumer msg rec'd : $msg_count_from_console_consumer" + echo "console consumer unique msg rec'd : $uniq_msg_count_from_console_consumer" + echo "========================================================" + +} + + +start_test() { + info "=======================================" + info "#### Kafka Replicas System Test ####" + info "=======================================" + echo + + initialize + + cleanup + sleep 10 + + start_zk + sleep 5 + + start_source_servers_cluster + sleep 5 + + create_topic $test_topic localhost:$zk_source_port $replica_factor 2> $this_test_stderr_output_log_pathname + + info "sleeping for 10s" + sleep 10 + + for ((i=1; i<=$num_kafka_source_server; i++)) + do + get_leader_brokerid + ldr_bkr_id=$? + info "leader broker id: $ldr_bkr_id" + + svr_idx=$(($ldr_bkr_id + 1)) + + # ========================================================== + # If KAFKA-350 is fixed, uncomment the following 3 lines to + # STOP the source server for failure test + # ========================================================== + #stop_source_server $svr_idx + #info "sleeping for 10s" + #sleep 10 + + start_console_consumer $test_topic localhost:$zk_source_port + sleep 5 + + start_producer_perf $test_topic localhost:$zk_source_port 200 + info "sleeping for 60s" + sleep 60 + echo + + # ========================================================== + # If KAFKA-350 is fixed, uncomment the following 3 lines to + # START the source server for failure test + # ========================================================== + #start_source_server $svr_idx + #info "sleeping for 30s" + #sleep 30 + done + + sleep 10 + validate_results + + shutdown_servers +} + +# ================================================= +# Main Test +# ================================================= + +start_test Property changes on: system_test/single_host_multi_brokers/bin/run-test.sh ___________________________________________________________________ Added: svn:executable + * Index: system_test/single_host_multi_brokers/bin/kafka-run-class.sh =================================================================== --- system_test/single_host_multi_brokers/bin/kafka-run-class.sh (revision 0) +++ system_test/single_host_multi_brokers/bin/kafka-run-class.sh (revision 0) @@ -0,0 +1,67 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ $# -lt 1 ]; +then + echo "USAGE: $0 classname [opts]" + exit 1 +fi + +base_dir=$(dirname $0)/.. +kafka_inst_dir=${base_dir}/../.. + +for file in $kafka_inst_dir/project/boot/scala-2.8.0/lib/*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $kafka_inst_dir/core/target/scala_2.8.0/*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $kafka_inst_dir/core/lib/*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $kafka_inst_dir/perf/target/scala_2.8.0/kafka*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $kafka_inst_dir/core/lib_managed/scala_2.8.0/compile/*.jar; +do + if [ ${file##*/} != "sbt-launch.jar" ]; then + CLASSPATH=$CLASSPATH:$file + fi +done +if [ -z "$KAFKA_JMX_OPTS" ]; then + KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false " +fi +if [ -z "$KAFKA_OPTS" ]; then + KAFKA_OPTS="-Xmx512M -server -Dlog4j.configuration=file:$base_dir/config/log4j.properties" +fi +if [ $JMX_PORT ]; then + KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT " +fi +if [ -z "$JAVA_HOME" ]; then + JAVA="java" +else + JAVA="$JAVA_HOME/bin/java" +fi + +$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@ Property changes on: system_test/single_host_multi_brokers/bin/kafka-run-class.sh ___________________________________________________________________ Added: svn:executable + *