Index: system_test/broker_failure/config/server_target1.properties =================================================================== --- system_test/broker_failure/config/server_target1.properties (revision 1356376) +++ system_test/broker_failure/config/server_target1.properties (working copy) @@ -41,7 +41,7 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.file.size=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/config/mirror_producer1.properties =================================================================== --- system_test/broker_failure/config/mirror_producer1.properties (revision 1356376) +++ system_test/broker_failure/config/mirror_producer1.properties (working copy) @@ -15,7 +15,8 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9081 +#broker.list=0:localhost:9081 +zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/server_target2.properties =================================================================== --- system_test/broker_failure/config/server_target2.properties (revision 1356376) +++ system_test/broker_failure/config/server_target2.properties (working copy) @@ -41,7 +41,7 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.file.size=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/config/mirror_producer2.properties =================================================================== --- system_test/broker_failure/config/mirror_producer2.properties (revision 1356376) +++ system_test/broker_failure/config/mirror_producer2.properties (working copy) @@ -15,7 +15,8 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9082 +#broker.list=0:localhost:9082 +zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/server_target3.properties =================================================================== --- system_test/broker_failure/config/server_target3.properties (revision 1356376) +++ system_test/broker_failure/config/server_target3.properties (working copy) @@ -41,7 +41,7 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.file.size=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/config/mirror_producer3.properties =================================================================== --- system_test/broker_failure/config/mirror_producer3.properties (revision 1356376) +++ system_test/broker_failure/config/mirror_producer3.properties (working copy) @@ -15,7 +15,8 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9083 +#broker.list=0:localhost:9083 +zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/server_source1.properties =================================================================== --- system_test/broker_failure/config/server_source1.properties (revision 1356376) +++ system_test/broker_failure/config/server_source1.properties (working copy) @@ -75,7 +75,7 @@ log.default.flush.scheduler.interval.ms=1000 # set sendBufferSize -send.buffer.size=10000 +send.buffer.size=500000 # set receiveBufferSize -receive.buffer.size=10000 +receive.buffer.size=500000 Index: system_test/broker_failure/config/server_source2.properties =================================================================== --- system_test/broker_failure/config/server_source2.properties (revision 1356376) +++ system_test/broker_failure/config/server_source2.properties (working copy) @@ -41,7 +41,7 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.file.size=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/config/server_source3.properties =================================================================== --- system_test/broker_failure/config/server_source3.properties (revision 1356376) +++ system_test/broker_failure/config/server_source3.properties (working copy) @@ -41,7 +41,7 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.file.size=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/config/server_source4.properties =================================================================== --- system_test/broker_failure/config/server_source4.properties (revision 1356376) +++ system_test/broker_failure/config/server_source4.properties (working copy) @@ -41,7 +41,7 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.file.size=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/config/log4j.properties =================================================================== --- system_test/broker_failure/config/log4j.properties (revision 1356376) +++ system_test/broker_failure/config/log4j.properties (working copy) @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=INFO, stdout, kafkaAppender +log4j.rootLogger=INFO, stdout # ==================================== # messages going to kafkaAppender @@ -27,7 +27,7 @@ # ==================================== # (comment out this line to redirect ZK-related messages to kafkaAppender # to allow reading both Kafka and ZK debugging messages in a single file) -#log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender +log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender # ==================================== # stdout @@ -73,6 +73,7 @@ log4j.logger.kafka.consumer=DEBUG log4j.logger.kafka.tools.VerifyConsumerRebalance=DEBUG +log4j.logger.kafka.tools.ConsumerOffsetChecker=DEBUG # to print message checksum from ProducerPerformance log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG Index: system_test/broker_failure/bin/run-test.sh =================================================================== --- system_test/broker_failure/bin/run-test.sh (revision 1356376) +++ system_test/broker_failure/bin/run-test.sh (working copy) @@ -14,151 +14,134 @@ # See the License for the specific language governing permissions and # limitations under the License. -# ================================================================= +# =========== # run-test.sh # =========== -# -# This script performs broker failure tests with the following -# setup in a single local machine: -# -# 1. A cluster of Kafka source brokers -# 2. A cluster of Kafka mirror brokers with embedded consumers in -# point-to-point mode -# 3. An independent ConsoleConsumer in publish/subcribe mode to -# consume messages from the SOURCE brokers cluster -# 4. An independent ConsoleConsumer in publish/subcribe mode to -# consume messages from the MIRROR brokers cluster -# 5. A producer produces batches of messages to the SOURCE brokers -# 6. One of the Kafka SOURCE or MIRROR brokers in the cluster will -# be randomly terminated and waiting for the consumer to catch up. -# 7. Repeat Step 4 & 5 as many times as specified in the script -# -# Expected results: -# ================== -# There should not be any discrepancies by comparing the unique -# message checksums from the source ConsoleConsumer and the -# mirror ConsoleConsumer. -# -# Notes: -# ================== -# The number of Kafka SOURCE brokers can be increased as follows: -# 1. Update the value of $num_kafka_source_server in this script -# 2. Make sure that there are corresponding number of prop files: -# $base_dir/config/server_source{1..4}.properties -# -# The number of Kafka TARGET brokers can be increased as follows: -# 1. Update the value of $num_kafka_target_server in this script -# 2. Make sure that there are corresponding number of prop files: -# $base_dir/config/server_target{1..3}.properties -# -# Quick Start: -# ================== -# Execute this script as follows: -# /system_test/broker_failure $ bin/run-test.sh -# -# The expected output is given in bin/expected.out. -# -# In the event of failure, by default the brokers and zookeepers -# remain running to make it easier to debug the issue - hit Ctrl-C -# to shut them down. -# ================================================================= + +# ==================================== +# Do not change the followings +# (keep this section at the beginning +# of this script) +# ==================================== +readonly system_test_root=$(dirname $0)/../.. # path of /system_test +readonly common_dir=${system_test_root}/common # common util scripts for system_test +source ${common_dir}/util.sh # include the util script -readonly base_dir=$(dirname $0)/.. -readonly test_start_time="$(date +%s)" +readonly base_dir=$(dirname $0)/.. # the base dir of this test suite +readonly test_start_time="$(date +%s)" # time starting this test +readonly bounce_source_id=1 +readonly bounce_mir_mkr_id=2 +readonly bounce_target_id=3 +readonly log4j_prop_file=$base_dir/config/log4j.properties -readonly num_msg_per_batch=500 -readonly batches_per_iteration=5 -readonly num_iterations=5 +iter=1 # init a counter to keep track of iterations +num_iterations=5 # total no. of iterations to run +svr_to_bounce=0 # servers to bounce: 1-source 2-mirror_maker 3-target + # 12 - source & mirror_maker + # 13 - source & target -readonly zk_source_port=2181 -readonly zk_mirror_port=2182 - -readonly topic_prefix=test -readonly max_topic_id=2 -readonly unbalanced_start_id=2 -readonly consumer_grp=group1 +# ==================================== +# No need to change the following +# configurations in most cases +# ==================================== +readonly zk_source_port=2181 # source zk port +readonly zk_target_port=2182 # target zk port +readonly test_topic=test01 # topic used in this test +readonly consumer_grp=group1 # consumer group readonly source_console_consumer_grp=source -readonly mirror_console_consumer_grp=mirror -readonly message_size=5000 +readonly target_console_consumer_grp=target +readonly message_size=100 +readonly console_consumer_timeout_ms=15000 +readonly num_kafka_source_server=4 # requires same no. of property files such as: + # $base_dir/config/server_source{1..4}.properties +readonly num_kafka_target_server=3 # requires same no. of property files such as: + # $base_dir/config/server_target{1..3}.properties +readonly num_kafka_mirror_maker=3 # any values greater than 0 +readonly wait_time_after_killing_broker=0 # wait after broker is stopped but before starting again +readonly wait_time_after_restarting_broker=10 -# sleep time between each batch of messages published -# from producer - it will be randomly generated -# within the range of sleep_min & sleep_max -readonly sleep_min=3 -readonly sleep_max=3 +# ==================================== +# Change the followings as needed +# ==================================== +num_msg_per_batch=500 # no. of msg produced in each calling of ProducerPerformance +num_producer_threads=5 # no. of producer threads to send msg +producer_sleep_min=5 # min & max sleep time (in sec) between each +producer_sleep_max=5 # batch of messages sent from producer -# requires same no. of property files such as: -# $base_dir/config/server_source{1..4}.properties -readonly num_kafka_source_server=4 - -# requires same no. of property files such as: -# $base_dir/config/server_target{1..3}.properties -readonly num_kafka_target_server=3 - -readonly wait_time_after_killing_broker=0 -readonly wait_time_after_restarting_broker=5 - -readonly producer_4_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093,4:localhost:9094" -readonly producer_3_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093" - -background_producer_pid_1= -background_producer_pid_2= - -no_bouncing=$# - -iter=1 -abort_test=false - +# ==================================== +# zookeeper +# ==================================== pid_zk_source= pid_zk_target= +zk_log4j_log= +# ==================================== +# kafka source +# ==================================== kafka_source_pids= kafka_source_prop_files= kafka_source_log_files= +kafka_topic_creation_log_file=$base_dir/kafka_topic_creation.log +kafka_log4j_log= +# ==================================== +# kafka target +# ==================================== kafka_target_pids= kafka_target_prop_files= kafka_target_log_files= + +# ==================================== +# mirror maker +# ==================================== +kafka_mirror_maker_pids= +kafka_mirror_maker_log_files= +consumer_prop_file=$base_dir/config/whitelisttest.consumer.properties mirror_producer_prop_files= +# ==================================== +# console consumer source +# ==================================== console_consumer_source_pid= -console_consumer_mirror_pid= - console_consumer_source_log=$base_dir/console_consumer_source.log -console_consumer_mirror_log=$base_dir/console_consumer_mirror.log -producer_performance_log=$base_dir/producer_performance.log +console_consumer_source_mid_log=$base_dir/console_consumer_source_mid.log +console_consumer_source_mid_sorted_log=$base_dir/console_consumer_source_mid_sorted.log +console_consumer_source_mid_sorted_uniq_log=$base_dir/console_consumer_source_mid_sorted_uniq.log -console_consumer_source_crc_log=$base_dir/console_consumer_source_crc.log -console_consumer_source_crc_sorted_log=$base_dir/console_consumer_source_crc_sorted.log -console_consumer_source_crc_sorted_uniq_log=$base_dir/console_consumer_source_crc_sorted_uniq.log +# ==================================== +# console consumer target +# ==================================== +console_consumer_target_pid= +console_consumer_target_log=$base_dir/console_consumer_target.log +console_consumer_target_mid_log=$base_dir/console_consumer_target_mid.log +console_consumer_target_mid_sorted_log=$base_dir/console_consumer_target_mid_sorted.log +console_consumer_target_mid_sorted_uniq_log=$base_dir/console_consumer_target_mid_sorted_uniq.log -console_consumer_mirror_crc_log=$base_dir/console_consumer_mirror_crc.log -console_consumer_mirror_crc_sorted_log=$base_dir/console_consumer_mirror_crc_sorted.log -console_consumer_mirror_crc_sorted_uniq_log=$base_dir/console_consumer_mirror_crc_sorted_uniq.log +# ==================================== +# producer +# ==================================== +background_producer_pid= +producer_performance_log=$base_dir/producer_performance.log +producer_performance_mid_log=$base_dir/producer_performance_mid.log +producer_performance_mid_sorted_log=$base_dir/producer_performance_mid_sorted.log +producer_performance_mid_sorted_uniq_log=$base_dir/producer_performance_mid_sorted_uniq.log +tmp_file_to_stop_background_producer=/tmp/tmp_file_to_stop_background_producer -producer_performance_crc_log=$base_dir/producer_performance_crc.log -producer_performance_crc_sorted_log=$base_dir/producer_performance_crc_sorted.log -producer_performance_crc_sorted_uniq_log=$base_dir/producer_performance_crc_sorted_uniq.log - -consumer_rebalancing_log=$base_dir/consumer_rebalancing_verification.log - -consumer_prop_file=$base_dir/config/whitelisttest.consumer.properties +# ==================================== +# test reports +# ==================================== checksum_diff_log=$base_dir/checksum_diff.log -info() { - echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*" -} -info_no_newline() { - echo -e -n "$(date +"%Y-%m-%d %H:%M:%S") $*" -} - +# ==================================== +# initialize prop and log files +# ==================================== initialize() { for ((i=1; i<=$num_kafka_target_server; i++)) do kafka_target_prop_files[${i}]=$base_dir/config/server_target${i}.properties kafka_target_log_files[${i}]=$base_dir/kafka_target${i}.log - mirror_producer_prop_files[${i}]=$base_dir/config/mirror_producer${i}.properties + kafka_mirror_maker_log_files[${i}]=$base_dir/kafka_mirror_maker${i}.log done for ((i=1; i<=$num_kafka_source_server; i++)) @@ -166,109 +149,69 @@ kafka_source_prop_files[${i}]=$base_dir/config/server_source${i}.properties kafka_source_log_files[${i}]=$base_dir/kafka_source${i}.log done + + for ((i=1; i<=$num_kafka_mirror_maker; i++)) + do + mirror_producer_prop_files[${i}]=$base_dir/config/mirror_producer${i}.properties + done + + zk_log4j_log=`grep "log4j.appender.zookeeperAppender.File=" $log4j_prop_file | awk -F '=' '{print $2}'` + kafka_log4j_log=`grep "log4j.appender.kafkaAppender.File=" $log4j_prop_file | awk -F '=' '{print $2}'` } # ========================================= -# get_random_range - return a random number -# between the lower & upper bounds -# usage: -# get_random_range $lower $upper -# random_no=$? +# cleanup # ========================================= -get_random_range() { - lo=$1 - up=$2 - range=$(($up - $lo + 1)) +cleanup() { + info "cleaning up" - return $(($(($RANDOM % range)) + $lo)) -} + rm -rf $tmp_file_to_stop_background_producer + rm -rf $kafka_topic_creation_log_file -verify_consumer_rebalancing() { + rm -rf /tmp/zookeeper_source + rm -rf /tmp/zookeeper_target - info "Verifying consumer rebalancing operation" + rm -rf /tmp/kafka-source{1..4}-logs + rm -rf /tmp/kafka-target{1..3}-logs - CONSUMER_REBALANCING_RESULT=`$base_dir/bin/kafka-run-class.sh \ - kafka.tools.VerifyConsumerRebalance \ - --zk.connect=localhost:2181 \ - --group $consumer_grp` - echo "$CONSUMER_REBALANCING_RESULT" >> $consumer_rebalancing_log + rm -rf $zk_log4j_log + rm -rf $kafka_log4j_log - REBALANCE_STATUS_LINE=`grep "Rebalance operation" $consumer_rebalancing_log | tail -1` - # info "REBALANCE_STATUS_LINE: $REBALANCE_STATUS_LINE" - REBALANCE_STATUS=`echo $REBALANCE_STATUS_LINE | grep "Rebalance operation successful" || echo -n "Rebalance operation failed"` - info "REBALANCE_STATUS: $REBALANCE_STATUS" - - if [ "${REBALANCE_STATUS}_x" == "Rebalance operation failed_x" ]; then - info "setting abort_test to true due to Rebalance operation failed" - abort_test="true" - fi -} - -wait_for_zero_consumer_lags() { - - topic_id=$1 - - # no of times to check for zero lagging - no_of_zero_to_verify=3 - - while [ 'x' == 'x' ] + for ((i=1; i<=$num_kafka_target_server; i++)) do - TOTAL_LAG=0 - CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ - --group $consumer_grp --zkconnect localhost:$zk_source_port \ - --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` - - for lag in $CONSUMER_LAGS; - do - TOTAL_LAG=$(($TOTAL_LAG + $lag)) - done - - info "mirror TOTAL_LAG = $TOTAL_LAG" - if [ $TOTAL_LAG -eq 0 ]; then - if [ $no_of_zero_to_verify -eq 0 ]; then - echo - return 0 - fi - no_of_zero_to_verify=$(($no_of_zero_to_verify - 1)) - fi - sleep 1 + rm -rf ${kafka_target_log_files[${i}]} + rm -rf ${kafka_mirror_maker_log_files[${i}]} done -} -wait_for_zero_source_console_consumer_lags() { + rm -f $base_dir/zookeeper_source.log + rm -f $base_dir/zookeeper_target.log + rm -f $base_dir/kafka_source{1..4}.log - topic_id=$1 + rm -f $producer_performance_log + rm -f $producer_performance_mid_log + rm -f $producer_performance_mid_sorted_log + rm -f $producer_performance_mid_sorted_uniq_log - # no of times to check for zero lagging - no_of_zero_to_verify=3 + rm -f $console_consumer_target_log + rm -f $console_consumer_source_log + rm -f $console_consumer_target_mid_log + rm -f $console_consumer_source_mid_log - while [ 'x' == 'x' ] - do - TOTAL_LAG=0 - CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ - --group $source_console_consumer_grp --zkconnect localhost:$zk_source_port \ - --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` + rm -f $checksum_diff_log - for lag in $CONSUMER_LAGS; - do - TOTAL_LAG=$(($TOTAL_LAG + $lag)) - done - - info "source console consumer TOTAL_LAG = $TOTAL_LAG" - if [ $TOTAL_LAG -eq 0 ]; then - if [ $no_of_zero_to_verify -eq 0 ]; then - echo - return 0 - fi - no_of_zero_to_verify=$(($no_of_zero_to_verify - 1)) - fi - sleep 1 - done + rm -f $console_consumer_target_mid_sorted_log + rm -f $console_consumer_source_mid_sorted_log + rm -f $console_consumer_target_mid_sorted_uniq_log + rm -f $console_consumer_source_mid_sorted_uniq_log } -wait_for_zero_mirror_console_consumer_lags() { +# ========================================= +# wait_for_zero_consumer_lags +# ========================================= +wait_for_zero_consumer_lags() { - topic_id=$1 + this_group_name=$1 + this_zk_port=$2 # no of times to check for zero lagging no_of_zero_to_verify=3 @@ -277,8 +220,10 @@ do TOTAL_LAG=0 CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ - --group $mirror_console_consumer_grp --zkconnect localhost:$zk_mirror_port \ - --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` + --group $target_console_consumer_grp \ + --zkconnect localhost:$zk_target_port \ + --topic $test_topic \ + | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` for lag in $CONSUMER_LAGS; do @@ -297,58 +242,25 @@ done } -kill_child_processes() { - isTopmost=$1 - curPid=$2 - childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}') +# ========================================= +# create_topic +# ========================================= +create_topic() { + this_topic_to_create=$1 + this_zk_conn_str=$2 + this_replica_factor=$3 - for childPid in $childPids - do - kill_child_processes 0 $childPid - done - if [ $isTopmost -eq 0 ]; then - kill -15 $curPid 2> /dev/null - fi + info "creating topic [$this_topic_to_create] on [$this_zk_conn_str]" + $base_dir/../../bin/kafka-create-topic.sh \ + --topic $this_topic_to_create \ + --zookeeper $this_zk_conn_str \ + --replica $this_replica_factor \ + 2> $kafka_topic_creation_log_file } -cleanup() { - info "cleaning up" - - rm -rf /tmp/zookeeper_source - rm -rf /tmp/zookeeper_target - - rm -rf /tmp/kafka-source{1..4}-logs - rm -rf /tmp/kafka-target{1..3}-logs - - for ((i=1; i<=$num_kafka_target_server; i++)) - do - rm -rf ${kafka_target_log_files[${i}]} - done - - rm -f $base_dir/zookeeper_source.log - rm -f $base_dir/zookeeper_target.log - rm -f $base_dir/kafka_source{1..4}.log - - rm -f $producer_performance_log - rm -f $producer_performance_crc_log - rm -f $producer_performance_crc_sorted_log - rm -f $producer_performance_crc_sorted_uniq_log - - rm -f $console_consumer_mirror_log - rm -f $console_consumer_source_log - rm -f $console_consumer_mirror_crc_log - rm -f $console_consumer_source_crc_log - - rm -f $checksum_diff_log - - rm -f $console_consumer_mirror_crc_sorted_log - rm -f $console_consumer_source_crc_sorted_log - rm -f $console_consumer_mirror_crc_sorted_uniq_log - rm -f $console_consumer_source_crc_sorted_uniq_log - - rm -f $consumer_rebalancing_log -} - +# ========================================= +# start_zk +# ========================================= start_zk() { info "starting zookeepers" @@ -363,6 +275,9 @@ pid_zk_target=$! } +# ========================================= +# start_source_servers_cluster +# ========================================= start_source_servers_cluster() { info "starting source cluster" @@ -372,112 +287,130 @@ done } +# ========================================= +# start_source_server +# ========================================= start_source_server() { s_idx=$1 $base_dir/bin/kafka-run-class.sh kafka.Kafka \ ${kafka_source_prop_files[$s_idx]} \ - 2>&1 >> ${kafka_source_log_files[$s_idx]} & # append log msg after restarting + 2>&1 >> ${kafka_source_log_files[$s_idx]} & kafka_source_pids[${s_idx}]=$! info " -> kafka_source_pids[$s_idx]: ${kafka_source_pids[$s_idx]}" } +# ========================================= +# start_target_servers_cluster +# ========================================= start_target_servers_cluster() { info "starting mirror cluster" for ((i=1; i<=$num_kafka_target_server; i++)) do - start_embedded_consumer_server $i + start_target_server $i done } -start_embedded_consumer_server() { +# ========================================= +# start_target_server +# ========================================= +start_target_server() { s_idx=$1 $base_dir/bin/kafka-run-class.sh kafka.Kafka \ ${kafka_target_prop_files[${s_idx}]} \ - $consumer_prop_file \ - ${mirror_producer_prop_files[${s_idx}]} \ - 2>&1 >> ${kafka_target_log_files[${s_idx}]} & # append log msg after restarting + 2>&1 >> ${kafka_target_log_files[${s_idx}]} & kafka_target_pids[$s_idx]=$! info " -> kafka_target_pids[$s_idx]: ${kafka_target_pids[$s_idx]}" } -start_console_consumer_for_source_producer() { +# ========================================= +# start_target_mirror_maker +# ========================================= +start_target_mirror_maker() { + info "starting mirror maker" - topic_id=$1 + for ((i=1; i<=$num_kafka_mirror_maker; i++)) + do + start_mirror_maker $i + done +} - info "starting console consumers for source producer on topic id [$topic_id]" +# ========================================= +# start_mirror_maker +# ========================================= +start_mirror_maker() { + s_idx=$1 - $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \ - --zookeeper localhost:$zk_source_port \ - --topic ${topic_prefix}_${topic_id} \ - --group $source_console_consumer_grp \ - --from-beginning --consumer-timeout-ms 5000 \ - --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \ - --property topic=${topic_prefix}_${topic_id} \ - 2>&1 >> ${console_consumer_source_log} + $base_dir/bin/kafka-run-class.sh kafka.tools.MirrorMaker \ + --consumer.config $consumer_prop_file \ + --producer.config ${mirror_producer_prop_files[${s_idx}]} \ + --whitelist=\".*\" \ + 2>&1 >> ${kafka_mirror_maker_log_files[$s_idx]} & + kafka_mirror_maker_pids[${s_idx}]=$! + + info " -> kafka_mirror_maker_pids[$s_idx]: ${kafka_mirror_maker_pids[$s_idx]}" } -start_console_consumer_for_mirror_producer() { +# ========================================= +# start_console_consumer +# ========================================= +start_console_consumer() { - topic_id=$1 + this_consumer_grp=$1 + this_consumer_zk_port=$2 + this_consumer_log=$3 + this_msg_formatter=$4 - info "starting console consumers for mirroring producer on topic id [$topic_id]" + info "starting console consumers for $this_consumer_grp" $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \ - --zookeeper localhost:$zk_mirror_port \ - --topic ${topic_prefix}_${topic_id} \ - --group $mirror_console_consumer_grp \ - --from-beginning --consumer-timeout-ms 5000 \ - --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \ - --property topic=${topic_prefix}_${topic_id} \ - 2>&1 >> ${console_consumer_mirror_log} -} + --zookeeper localhost:$this_consumer_zk_port \ + --topic $test_topic \ + --group $this_consumer_grp \ + --from-beginning \ + --consumer-timeout-ms $console_consumer_timeout_ms \ + --formatter "kafka.consumer.ConsoleConsumer\$${this_msg_formatter}" \ + 2>&1 > ${this_consumer_log} & + console_consumer_pid=$! -consume_source_producer_messages() { - consumer_counter=1 - while [ $consumer_counter -le $max_topic_id ] - do - start_console_consumer_for_source_producer $consumer_counter - consumer_counter=$(( $consumer_counter + 1 )) - done + info " -> console consumer pid: $console_consumer_pid" } -consume_mirror_producer_messages() { - consumer_counter=1 - while [ $consumer_counter -le $max_topic_id ] - do - start_console_consumer_for_mirror_producer $consumer_counter - consumer_counter=$(( $consumer_counter + 1 )) - done +# ========================================= +# force_shutdown_background_producer +# - to be called when user press Ctrl-C +# ========================================= +force_shutdown_background_producer() { + info "force shutting down producer" + `ps auxw | grep "run\-test\|ProducerPerformance" | grep -v grep | awk '{print $2}' | xargs kill -9` } -shutdown_producer() { - info "shutting down producer" - if [ "x${background_producer_pid_1}" != "x" ]; then - # kill_child_processes 0 ${background_producer_pid_1}; - kill -TERM ${background_producer_pid_1} 2> /dev/null; - fi - - if [ "x${background_producer_pid_2}" != "x" ]; then - # kill_child_processes 0 ${background_producer_pid_2}; - kill -TERM ${background_producer_pid_2} 2> /dev/null; - fi +# ========================================= +# force_shutdown_consumer +# - to be called when user press Ctrl-C +# ========================================= +force_shutdown_consumer() { + info "force shutting down consumer" + `ps auxw | grep ChecksumMessageFormatter | grep -v grep | awk '{print $2}' | xargs kill -9` } +# ========================================= +# shutdown_servers +# ========================================= shutdown_servers() { - info "shutting down mirror console consumer" - if [ "x${console_consumer_mirror_pid}" != "x" ]; then - kill_child_processes 0 ${console_consumer_mirror_pid}; - fi - info "shutting down source console consumer" - if [ "x${console_consumer_source_pid}" != "x" ]; then - kill_child_processes 0 ${console_consumer_source_pid}; - fi + info "shutting down mirror makers" + for ((i=1; i<=$num_kafka_mirror_maker; i++)) + do + #info "stopping mm pid: ${kafka_mirror_maker_pids[$i]}" + if [ "x${kafka_mirror_maker_pids[$i]}" != "x" ]; then + kill_child_processes 0 ${kafka_mirror_maker_pids[$i]}; + fi + done info "shutting down target servers" for ((i=1; i<=$num_kafka_target_server; i++)) @@ -500,84 +433,82 @@ if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi } +# ========================================= +# start_background_producer +# ========================================= start_background_producer() { - bkrinfo_str=$1 - start_topic_id=$2 - end_topic_id=$3 + topic=$1 + batch_no=0 - topic_id=${start_topic_id} - while [ 'x' == 'x' ] + while [ ! -e $tmp_file_to_stop_background_producer ] do sleeptime= - get_random_range $sleep_min $sleep_max + get_random_range $producer_sleep_min $producer_sleep_max sleeptime=$? - batch_no=$(($batch_no + 1)) - - if [ $topic_id -gt $end_topic_id ]; then - topic_id=${start_topic_id} - fi - + info "producing $num_msg_per_batch messages on topic '$topic'" $base_dir/bin/kafka-run-class.sh \ kafka.perf.ProducerPerformance \ - --brokerinfo $bkrinfo_str \ - --topic ${topic_prefix}_${topic_id} \ + --brokerinfo zk.connect=localhost:2181 \ + --topic $topic \ --messages $num_msg_per_batch \ --message-size $message_size \ - --batch-size 50 \ - --vary-message-size \ - --threads 1 \ - --reporting-interval $num_msg_per_batch --async \ + --threads $num_producer_threads \ + --seq-id-starting-from $batch_no \ 2>&1 >> $base_dir/producer_performance.log # appending all producers' msgs - topic_id=$(( $topic_id + 1 )) - + batch_no=$(($batch_no + $num_msg_per_batch)) sleep $sleeptime done } +# ========================================= +# cmp_checksum +# ========================================= cmp_checksum() { cmp_result=0 - grep checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log - grep checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log - grep checksum $producer_performance_log | tr -d ' ' | cut -f4 -d ':' | cut -f1 -d '(' > $producer_performance_crc_log + grep MessageID $console_consumer_source_log | sed s'/^.*MessageID://g' | awk -F ':' '{print $1}' > $console_consumer_source_mid_log + grep MessageID $console_consumer_target_log | sed s'/^.*MessageID://g' | awk -F ':' '{print $1}' > $console_consumer_target_mid_log + grep MessageID $producer_performance_log | sed s'/^.*MessageID://g' | awk -F ':' '{print $1}' > $producer_performance_mid_log - sort $console_consumer_mirror_crc_log > $console_consumer_mirror_crc_sorted_log - sort $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_log - sort $producer_performance_crc_log > $producer_performance_crc_sorted_log + sort $console_consumer_target_mid_log > $console_consumer_target_mid_sorted_log + sort $console_consumer_source_mid_log > $console_consumer_source_mid_sorted_log + sort $producer_performance_mid_log > $producer_performance_mid_sorted_log - sort -u $console_consumer_mirror_crc_log > $console_consumer_mirror_crc_sorted_uniq_log - sort -u $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_uniq_log - sort -u $producer_performance_crc_log > $producer_performance_crc_sorted_uniq_log + sort -u $console_consumer_target_mid_log > $console_consumer_target_mid_sorted_uniq_log + sort -u $console_consumer_source_mid_log > $console_consumer_source_mid_sorted_uniq_log + sort -u $producer_performance_mid_log > $producer_performance_mid_sorted_uniq_log - msg_count_from_source_consumer=`cat $console_consumer_source_crc_log | wc -l | tr -d ' '` - uniq_msg_count_from_source_consumer=`cat $console_consumer_source_crc_sorted_uniq_log | wc -l | tr -d ' '` + msg_count_from_source_consumer=`cat $console_consumer_source_mid_log | wc -l | tr -d ' '` + uniq_msg_count_from_source_consumer=`cat $console_consumer_source_mid_sorted_uniq_log | wc -l | tr -d ' '` - msg_count_from_mirror_consumer=`cat $console_consumer_mirror_crc_log | wc -l | tr -d ' '` - uniq_msg_count_from_mirror_consumer=`cat $console_consumer_mirror_crc_sorted_uniq_log | wc -l | tr -d ' '` + msg_count_from_mirror_consumer=`cat $console_consumer_target_mid_log | wc -l | tr -d ' '` + uniq_msg_count_from_mirror_consumer=`cat $console_consumer_target_mid_sorted_uniq_log | wc -l | tr -d ' '` - uniq_msg_count_from_producer=`cat $producer_performance_crc_sorted_uniq_log | wc -l | tr -d ' '` + uniq_msg_count_from_producer=`cat $producer_performance_mid_sorted_uniq_log | wc -l | tr -d ' '` - total_msg_published=`cat $producer_performance_crc_log | wc -l | tr -d ' '` + total_msg_published=`cat $producer_performance_mid_log | wc -l | tr -d ' '` duplicate_msg_in_producer=$(( $total_msg_published - $uniq_msg_count_from_producer )) - crc_only_in_mirror_consumer=`comm -23 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log` - crc_only_in_source_consumer=`comm -13 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log` - crc_common_in_both_consumer=`comm -12 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log` + crc_only_in_mirror_consumer=`comm -23 $console_consumer_target_mid_sorted_uniq_log $console_consumer_source_mid_sorted_uniq_log` + crc_only_in_source_consumer=`comm -13 $console_consumer_target_mid_sorted_uniq_log $console_consumer_source_mid_sorted_uniq_log` + crc_common_in_both_consumer=`comm -12 $console_consumer_target_mid_sorted_uniq_log $console_consumer_source_mid_sorted_uniq_log` - crc_only_in_producer=`comm -23 $producer_performance_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log` + crc_only_in_producer=`comm -23 $producer_performance_mid_sorted_uniq_log $console_consumer_source_mid_sorted_uniq_log` - duplicate_mirror_crc=`comm -23 $console_consumer_mirror_crc_sorted_log $console_consumer_mirror_crc_sorted_uniq_log` + duplicate_mirror_mid=`comm -23 $console_consumer_target_mid_sorted_log $console_consumer_target_mid_sorted_uniq_log` no_of_duplicate_msg=$(( $msg_count_from_mirror_consumer - $uniq_msg_count_from_mirror_consumer \ + $msg_count_from_source_consumer - $uniq_msg_count_from_source_consumer - \ 2*$duplicate_msg_in_producer )) + source_mirror_uniq_msg_diff=$(($uniq_msg_count_from_source_consumer - $uniq_msg_count_from_mirror_consumer)) + echo "" echo "========================================================" echo "no. of messages published : $total_msg_published" @@ -587,8 +518,7 @@ echo "mirror consumer msg rec'd : $msg_count_from_mirror_consumer" echo "mirror consumer unique msg rec'd : $uniq_msg_count_from_mirror_consumer" echo "total source/mirror duplicate msg : $no_of_duplicate_msg" - echo "source/mirror uniq msg count diff : $(($uniq_msg_count_from_source_consumer - \ - $uniq_msg_count_from_mirror_consumer))" + echo "source/mirror uniq msg count diff : $source_mirror_uniq_msg_diff" echo "========================================================" echo "(Please refer to $checksum_diff_log for more details)" echo "" @@ -611,96 +541,76 @@ echo "========================================================" >> $checksum_diff_log echo "duplicate crc in mirror consumer" >> $checksum_diff_log echo "========================================================" >> $checksum_diff_log - echo "${duplicate_mirror_crc}" >> $checksum_diff_log + echo "${duplicate_mirror_mid}" >> $checksum_diff_log - topic_chksum_counter=1 - while [ $topic_chksum_counter -le $max_topic_id ] - do - # get producer topic counts - this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $producer_performance_log` - echo "PRODUCER topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}" - - topic_chksum_counter=$(($topic_chksum_counter + 1)) - done + echo "=================" + if [[ $source_mirror_uniq_msg_diff -eq 0 && $uniq_msg_count_from_source_consumer -gt 0 ]]; then + echo "## Test PASSED" + else + echo "## Test FAILED" + fi + echo "=================" echo - topic_chksum_counter=1 - while [ $topic_chksum_counter -le $max_topic_id ] - do - this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_source_log` - echo "SOURCE consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}" - - topic_chksum_counter=$(($topic_chksum_counter + 1)) - done - echo - - topic_chksum_counter=1 - while [ $topic_chksum_counter -le $max_topic_id ] - do - this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_mirror_log` - echo "MIRROR consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}" - - topic_chksum_counter=$(($topic_chksum_counter + 1)) - done - echo - return $cmp_result } +# ========================================= +# start_test +# ========================================= start_test() { + echo + info "===========================================================" + info "#### Starting Kafka Broker / Mirror Maker Failure Test ####" + info "===========================================================" + echo + start_zk sleep 2 + start_source_servers_cluster sleep 2 + + create_topic $test_topic localhost:$zk_source_port 1 + sleep 2 + start_target_servers_cluster sleep 2 - start_background_producer $producer_4_brokerinfo_str 1 $(( $unbalanced_start_id - 1 )) & - background_producer_pid_1=$! + start_target_mirror_maker + sleep 2 - info "==========================================" - info "Started background producer pid [${background_producer_pid_1}]" - info "==========================================" + start_background_producer $test_topic & + background_producer_pid=$! - sleep 10 + info "Started background producer pid [${background_producer_pid}]" + sleep 5 - start_background_producer $producer_3_brokerinfo_str $unbalanced_start_id $max_topic_id & - background_producer_pid_2=$! - - info "==========================================" - info "Started background producer pid [${background_producer_pid_2}]" - info "==========================================" - - sleep 10 - - verify_consumer_rebalancing - - info "abort_test: [${abort_test}]" - if [ "${abort_test}_x" == "true_x" ]; then - info "aborting test" - iter=$((${num_iterations} + 1)) - fi - + # loop for no. of iterations specified in $num_iterations while [ $num_iterations -ge $iter ] do - echo - info "==========================================" - info "Iteration $iter of ${num_iterations}" - info "==========================================" + # if $svr_to_bounce is '0', it means no bouncing + if [[ $num_iterations -ge $iter && $svr_to_bounce -gt 0 ]]; then + idx= - # terminate the broker if not the last iteration: - if [[ $num_iterations -gt $iter && $no_bouncing -eq 0 ]]; then + # check which type of broker bouncing is requested: source, mirror_maker or target - idx= + # $svr_to_bounce contains $bounce_target_id - eg. '3', '123', ... etc + svr_idx=`expr index $svr_to_bounce $bounce_target_id` + if [[ $num_iterations -ge $iter && $svr_idx -gt 0 ]]; then + echo + info "==========================================" + info "Iteration $iter of ${num_iterations}" + info "==========================================" - if [ $(( $iter % 2 )) -eq 0 ]; then - # even iterations -> bounce target kafka borker + # bounce target kafka broker get_random_range 1 $num_kafka_target_server idx=$? + if [ "x${kafka_target_pids[$idx]}" != "x" ]; then echo - info "#### Bouncing kafka TARGET broker ####" + info "#### Bouncing Kafka TARGET Broker ####" info "terminating kafka target[$idx] with process id ${kafka_target_pids[$idx]}" kill_child_processes 0 ${kafka_target_pids[$idx]} @@ -709,19 +619,58 @@ sleep $wait_time_after_killing_broker info "starting kafka target server" - start_embedded_consumer_server $idx + start_target_server $idx + fi + iter=$(($iter+1)) + info "sleeping for ${wait_time_after_restarting_broker}s" + sleep $wait_time_after_restarting_broker + fi - info "sleeping for ${wait_time_after_restarting_broker}s" - sleep $wait_time_after_restarting_broker + # $svr_to_bounce contains $bounce_mir_mkr_id - eg. '2', '123', ... etc + svr_idx=`expr index $svr_to_bounce $bounce_mir_mkr_id` + if [[ $num_iterations -ge $iter && $svr_idx -gt 0 ]]; then + echo + info "==========================================" + info "Iteration $iter of ${num_iterations}" + info "==========================================" + + # bounce mirror maker + get_random_range 1 $num_kafka_mirror_maker + idx=$? + + if [ "x${kafka_mirror_maker_pids[$idx]}" != "x" ]; then + echo + info "#### Bouncing Kafka Mirror Maker ####" + + info "terminating kafka mirror maker [$idx] with process id ${kafka_mirror_maker_pids[$idx]}" + kill_child_processes 0 ${kafka_mirror_maker_pids[$idx]} + + info "sleeping for ${wait_time_after_killing_broker}s" + sleep $wait_time_after_killing_broker + + info "starting kafka mirror maker" + start_mirror_maker $idx fi - else - # odd iterations -> bounce source kafka broker + iter=$(($iter+1)) + info "sleeping for ${wait_time_after_restarting_broker}s" + sleep $wait_time_after_restarting_broker + fi + + # $svr_to_bounce contains $bounce_source_id - eg. '1', '123', ... etc + svr_idx=`expr index $svr_to_bounce $bounce_source_id` + if [[ $num_iterations -ge $iter && $svr_idx -gt 0 ]]; then + echo + info "==========================================" + info "Iteration $iter of ${num_iterations}" + info "==========================================" + + # bounce source kafka broker get_random_range 1 $num_kafka_source_server idx=$? if [ "x${kafka_source_pids[$idx]}" != "x" ]; then echo - info "#### Bouncing kafka SOURCE broker ####" + info "#### Bouncing Kafka SOURCE Broker ####" info "terminating kafka source[$idx] with process id ${kafka_source_pids[$idx]}" kill_child_processes 0 ${kafka_source_pids[$idx]} @@ -731,69 +680,142 @@ info "starting kafka source server" start_source_server $idx - - info "sleeping for ${wait_time_after_restarting_broker}s" - sleep $wait_time_after_restarting_broker fi - fi + iter=$(($iter+1)) + info "sleeping for ${wait_time_after_restarting_broker}s" + sleep $wait_time_after_restarting_broker + fi + else + echo + info "==========================================" + info "Iteration $iter of ${num_iterations}" + info "==========================================" - verify_consumer_rebalancing - - info "abort_test: [${abort_test}]" - if [ "${abort_test}_x" == "true_x" ]; then - info "aborting test" - iter=$((${num_iterations} + 1)) - fi - - else info "No bouncing performed" + iter=$(($iter+1)) + info "sleeping for ${wait_time_after_restarting_broker}s" + sleep $wait_time_after_restarting_broker fi + done - info "sleeping for 10 sec" - sleep 10 + # notify background producer to stop + `touch $tmp_file_to_stop_background_producer` - iter=$(($iter+1)) - done - echo info "Tests completed. Waiting for consumers to catch up " - - shutdown_producer - wait_for_zero_consumer_lags + # ======================================================= + # remove the following 'sleep 30' when KAFKA-313 is fixed + # ======================================================= + info "sleeping 30 sec" + sleep 30 } +# ========================================= +# print_usage +# ========================================= +print_usage() { + echo + echo "Error : invalid no. of arguments" + echo "Usage : $0 -n -s " + echo + echo " num of iterations - the number of iterations that the test runs" + echo + echo " servers to bounce - the servers to be bounced in a round-robin fashion" + echo " Values of the servers:" + echo " 0 - no bouncing" + echo " 1 - source broker" + echo " 2 - mirror maker" + echo " 3 - target broker" + echo " Example:" + echo " * To bounce only mirror maker and target broker" + echo " in turns, enter the value 23" + echo " * To bounce only mirror maker, enter the value 2" + echo " * To run the test without bouncing, enter 0" + echo + echo "Usage Example : $0 -n 10 -s 12" + echo " (run 10 iterations and bounce source broker (1) + mirror maker (2) in turn)" + echo +} -# ===================== -# main test begins here -# ===================== -echo -info "============================================" -info "#### Starting Kafka Broker Failure Test ####" -info "============================================" -echo +# ========================================= +# +# Main test begins here +# +# ========================================= +# get command line arguments +while getopts "hb:i:n:s:x:" opt +do + case $opt in + b) + num_msg_per_batch=$OPTARG + ;; + h) + print_usage + exit + ;; + i) + producer_sleep_min=$OPTARG + ;; + n) + num_iterations=$OPTARG + ;; + s) + svr_to_bounce=$OPTARG + ;; + x) + producer_sleep_max=$OPTARG + ;; + ?) + print_usage + exit + ;; + esac +done + +# initialize and cleanup initialize cleanup sleep 5 # Ctrl-c trap. Catches INT signal -trap "shutdown_producer; shutdown_servers; cmp_checksum; exit 0" INT +trap "shutdown_servers; force_shutdown_consumer; force_shutdown_background_producer; cmp_checksum; exit 0" INT +# starting the test start_test -consume_source_producer_messages -consume_mirror_producer_messages +# starting consumer to consume data in source +start_console_consumer $source_console_consumer_grp $zk_source_port $console_consumer_source_log DecodedMessageFormatter -wait_for_zero_source_console_consumer_lags -wait_for_zero_mirror_console_consumer_lags +# starting consumer to consume data in target +start_console_consumer $target_console_consumer_grp $zk_target_port $console_consumer_target_log DecodedMessageFormatter -verify_consumer_rebalancing +# wait for zero source consumer lags +wait_for_zero_consumer_lags $source_console_consumer_grp $zk_source_port +# wait for zero target consumer lags +wait_for_zero_consumer_lags $target_console_consumer_grp $zk_target_port + +# ======================================================= +# remove the following 'sleep 30' when KAFKA-313 is fixed +# ======================================================= +info "sleeping 30 sec" +sleep 30 + shutdown_servers cmp_checksum result=$? +# =============================================== +# Report the time taken +# =============================================== +test_end_time="$(date +%s)" +total_test_time_sec=$(( $test_end_time - $test_start_time )) +total_test_time_min=$(( $total_test_time_sec / 60 )) +info "Total time taken: $total_test_time_min min for $num_iterations iterations" +echo + exit $result Index: system_test/broker_failure/README =================================================================== --- system_test/broker_failure/README (revision 1356376) +++ system_test/broker_failure/README (working copy) @@ -1,23 +1,34 @@ -This script performs broker failure tests with the following -setup in a single local machine: +** Please note that the following commands should be executed + after downloading the kafka source code to build all the + required binaries: + 1. / $ ./sbt update + 2. / $ ./sbt package -1. A cluster of Kafka source brokers -2. A cluster of Kafka mirror brokers with embedded consumers in - point-to-point mode -3. An independent ConsoleConsumer in publish/subcribe mode to + Now you are ready to follow the steps below. + +This script performs broker failure tests in an environment with +Mirrored Source & Target clusters in a single machine: + +1. Start a cluster of Kafka source brokers +2. Start a cluster of Kafka target brokers +3. Start one or more Mirror Maker to create mirroring between + source and target clusters +4. A producer produces batches of messages to the SOURCE brokers + in the background +5. The Kafka SOURCE, TARGET brokers and Mirror Maker will be + terminated in a round-robin fashion and wait for the consumer + to catch up. +6. Repeat step 5 as many times as specified in the script +7. An independent ConsoleConsumer in publish/subcribe mode to consume messages from the SOURCE brokers cluster -4. An independent ConsoleConsumer in publish/subcribe mode to - consume messages from the MIRROR brokers cluster -5. A producer produces batches of messages to the SOURCE brokers -6. One of the Kafka SOURCE or MIRROR brokers in the cluster will - be randomly terminated and waiting for the consumer to catch up. -7. Repeat Step 4 & 5 as many times as specified in the script +8. An independent ConsoleConsumer in publish/subcribe mode to + consume messages from the TARGET brokers cluster Expected results: ================== There should not be any discrepancies by comparing the unique message checksums from the source ConsoleConsumer and the -mirror ConsoleConsumer. +target ConsoleConsumer. Notes: ================== @@ -26,17 +37,36 @@ 2. Make sure that there are corresponding number of prop files: $base_dir/config/server_source{1..4}.properties -The number of Kafka MIRROR brokers can be increased as follows: +The number of Kafka TARGET brokers can be increased as follows: 1. Update the value of $num_kafka_target_server in this script 2. Make sure that there are corresponding number of prop files: $base_dir/config/server_target{1..3}.properties Quick Start: ================== -Execute this script as follows: - /system_test/broker_failure $ bin/run-test.sh +In the directory /system_test/broker_failure, +execute this script as following: + $ bin/run-test.sh -n -s +num of iterations - the number of iterations that the test runs + +servers to bounce - the servers to be bounced in a round-robin fashion. + + Values to be entered: + 1 - source broker + 2 - mirror maker + 3 - target broker + + Example: + * To bounce only mirror maker and target broker + in turns, enter the value 23. + * To bounce only mirror maker, enter the value 2. + * To run the test without bouncing, enter 0. + +At the end of the test, the received messages checksums in both +SOURCE & TARGET will be compared. If all checksums are matched, +the test is PASSED. Otherwise, the test is FAILED. + In the event of failure, by default the brokers and zookeepers remain running to make it easier to debug the issue - hit Ctrl-C to shut them down. - Index: core/src/main/scala/kafka/consumer/ConsoleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (revision 1356376) +++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (working copy) @@ -27,6 +27,7 @@ import kafka.message._ import kafka.utils.{Utils, Logging} import kafka.utils.ZKStringSerializer +import kafka.serializer.StringDecoder /** * Consumer that dumps messages out to standard out. @@ -235,7 +236,7 @@ override def init(props: Properties) { topicStr = props.getProperty("topic") if (topicStr != null) - topicStr = topicStr + "-" + topicStr = topicStr + ":" else topicStr = "" } @@ -246,6 +247,27 @@ } } + class DecodedMessageFormatter extends MessageFormatter { + var topicStr: String = _ + val decoder = new StringDecoder() + + override def init(props: Properties) { + topicStr = props.getProperty("topic") + if (topicStr != null) + topicStr = topicStr + ":" + else + topicStr = "" + } + + def writeTo(message: Message, output: PrintStream) { + try { + output.println(topicStr + decoder.toEvent(message) + ":payloadsize:" + message.payloadSize) + } catch { + case e => e.printStackTrace() + } + } + } + def tryCleanupZookeeper(zkUrl: String, groupId: String) { try { val dir = "/consumers/" + groupId Index: perf/src/main/scala/kafka/perf/ProducerPerformance.scala =================================================================== --- perf/src/main/scala/kafka/perf/ProducerPerformance.scala (revision 1356376) +++ perf/src/main/scala/kafka/perf/ProducerPerformance.scala (working copy) @@ -97,6 +97,15 @@ .describedAs("compression codec ") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) + val seqIdStartFromOpt = parser.accepts("seq-id-starting-from", "If set, messages will be tagged with an " + + "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + + "starting with 'Message:0000000001:xxxxxxx...' and padded with zeros to make up the required no. of " + + "digits for the max value of a 32 bit int for the messageid. After the message id, it will append " + + "extra characters to make up the required message string length") + .withRequiredArg() + .describedAs("begin sequence id") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) val options = parser.parse(args : _*) for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) { @@ -114,11 +123,21 @@ val hideHeader = options.has(hideHeaderOpt) val brokerInfo = options.valueOf(brokerInfoOpt) val messageSize = options.valueOf(messageSizeOpt).intValue - val isFixSize = !options.has(varyMessageSizeOpt) - val isAsync = options.has(asyncOpt) + var isFixSize = !options.has(varyMessageSizeOpt) + var isAsync = options.has(asyncOpt) var batchSize = options.valueOf(batchSizeOpt).intValue - val numThreads = options.valueOf(numThreadsOpt).intValue + var numThreads = options.valueOf(numThreadsOpt).intValue val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue) + val seqIdStartFrom = options.valueOf(seqIdStartFromOpt).intValue() + val seqIdMode = options.has(seqIdStartFromOpt) + + // override necessary flags in seqIdMode + if (seqIdMode) { + if (!options.has(numThreadsOpt)) { numThreads = 1 } + if (!options.has(asyncOpt)) { isAsync = false } + batchSize = 1 + isFixSize = true + } } private def getStringOfLength(len: Int) : String = { @@ -157,18 +176,35 @@ } val producerConfig = new ProducerConfig(props) val producer = new Producer[Message, Message](producerConfig) + val leftPaddedSeqIdNumDigit = 10 // no. of digits for max int value override def run { var bytesSent = 0L var lastBytesSent = 0L var nSends = 0 var lastNSends = 0 - val message = new Message(new Array[Byte](config.messageSize)) + var message = new Message(new Array[Byte](config.messageSize)) var reportTime = System.currentTimeMillis() var lastReportTime = reportTime val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads / config.batchSize else config.numMessages / config.numThreads debug("Messages per thread = " + messagesPerThread) + + // generate the sequential message ID + val msgIDPrefix = "MessageID:" + var leftPaddedSeqId : String = "" + var sb = new StringBuilder() + sb.append(":") + + // add extra characters up to the required message size + if (config.seqIdMode) { + val extraNumChar = config.messageSize - leftPaddedSeqIdNumDigit - msgIDPrefix.length() - 1 // minus 1 due to the last ':' appended + for (i <- 0 until (extraNumChar) ) { + sb.append('x') + } + } + val msgString = sb.toString() + var messageSet: List[Message] = Nil if(config.isFixSize) { for(k <- 0 until config.batchSize) { @@ -178,6 +214,17 @@ var j: Long = 0L while(j < messagesPerThread) { var strLength = config.messageSize + + // each thread gets a range of sequential no. for the ids + val msgId = config.seqIdStartFrom + (messagesPerThread * threadId) + j + leftPaddedSeqId = String.format("%0"+leftPaddedSeqIdNumDigit+"d", long2Long(msgId)) + + if (config.seqIdMode) { + val seqMsgString = msgIDPrefix + leftPaddedSeqId + msgString + debug(config.topic+":ThreadID:"+threadId+":"+seqMsgString) + message = new Message(seqMsgString.getBytes()) + } + if (!config.isFixSize) { for(k <- 0 until config.batchSize) { strLength = rand.nextInt(config.messageSize) @@ -189,8 +236,14 @@ bytesSent += config.batchSize*message.payloadSize } try { + debug(config.topic + "-checksum:" + message.checksum) if(!config.isAsync) { - producer.send(new ProducerData[Message,Message](config.topic, null, messageSet)) + if (config.seqIdMode) { + producer.send(new ProducerData[Message,Message](config.topic, null, message)) + } + else { + producer.send(new ProducerData[Message,Message](config.topic, null, messageSet)) + } if(!config.isFixSize) messageSet = Nil nSends += config.batchSize }else { @@ -200,11 +253,9 @@ rand.nextBytes(messageBytes) val message = new Message(messageBytes) producer.send(new ProducerData[Message,Message](config.topic, message)) - debug(config.topic + "-checksum:" + message.checksum) bytesSent += message.payloadSize }else { producer.send(new ProducerData[Message,Message](config.topic, message)) - debug(config.topic + "-checksum:" + message.checksum) bytesSent += message.payloadSize } nSends += 1