Index: system_test/broker_failure/config/server_target1.properties =================================================================== --- system_test/broker_failure/config/server_target1.properties (revision 1352297) +++ system_test/broker_failure/config/server_target1.properties (working copy) @@ -41,7 +41,7 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.file.size=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/config/mirror_producer1.properties =================================================================== --- system_test/broker_failure/config/mirror_producer1.properties (revision 1352297) +++ system_test/broker_failure/config/mirror_producer1.properties (working copy) @@ -15,7 +15,8 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9081 +#broker.list=0:localhost:9081 +zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/server_target2.properties =================================================================== --- system_test/broker_failure/config/server_target2.properties (revision 1352297) +++ system_test/broker_failure/config/server_target2.properties (working copy) @@ -41,7 +41,7 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.file.size=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/config/mirror_producer2.properties =================================================================== --- system_test/broker_failure/config/mirror_producer2.properties (revision 1352297) +++ system_test/broker_failure/config/mirror_producer2.properties (working copy) @@ -15,7 +15,8 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9082 +#broker.list=0:localhost:9082 +zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/server_target3.properties =================================================================== --- system_test/broker_failure/config/server_target3.properties (revision 1352297) +++ system_test/broker_failure/config/server_target3.properties (working copy) @@ -41,7 +41,7 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.file.size=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/config/mirror_producer3.properties =================================================================== --- system_test/broker_failure/config/mirror_producer3.properties (revision 1352297) +++ system_test/broker_failure/config/mirror_producer3.properties (working copy) @@ -15,7 +15,8 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9083 +#broker.list=0:localhost:9083 +zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/server_source1.properties =================================================================== --- system_test/broker_failure/config/server_source1.properties (revision 1352297) +++ system_test/broker_failure/config/server_source1.properties (working copy) @@ -75,7 +75,7 @@ log.default.flush.scheduler.interval.ms=1000 # set sendBufferSize -send.buffer.size=10000 +send.buffer.size=500000 # set receiveBufferSize -receive.buffer.size=10000 +receive.buffer.size=500000 Index: system_test/broker_failure/config/server_source2.properties =================================================================== --- system_test/broker_failure/config/server_source2.properties (revision 1352297) +++ system_test/broker_failure/config/server_source2.properties (working copy) @@ -41,7 +41,7 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.file.size=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/config/server_source3.properties =================================================================== --- system_test/broker_failure/config/server_source3.properties (revision 1352297) +++ system_test/broker_failure/config/server_source3.properties (working copy) @@ -41,7 +41,7 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.file.size=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/config/server_source4.properties =================================================================== --- system_test/broker_failure/config/server_source4.properties (revision 1352297) +++ system_test/broker_failure/config/server_source4.properties (working copy) @@ -41,7 +41,7 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=536870912 +log.file.size=10000000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/config/log4j.properties =================================================================== --- system_test/broker_failure/config/log4j.properties (revision 1352297) +++ system_test/broker_failure/config/log4j.properties (working copy) @@ -27,7 +27,7 @@ # ==================================== # (comment out this line to redirect ZK-related messages to kafkaAppender # to allow reading both Kafka and ZK debugging messages in a single file) -#log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender +log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender # ==================================== # stdout Index: system_test/broker_failure/bin/run-test.sh =================================================================== --- system_test/broker_failure/bin/run-test.sh (revision 1352297) +++ system_test/broker_failure/bin/run-test.sh (working copy) @@ -18,26 +18,28 @@ # run-test.sh # =========== # -# This script performs broker failure tests with the following -# setup in a single local machine: +# This script performs broker failure tests in an environment with +# Mirrored Source & Target clusters in a single machine: # -# 1. A cluster of Kafka source brokers -# 2. A cluster of Kafka mirror brokers with embedded consumers in -# point-to-point mode -# 3. An independent ConsoleConsumer in publish/subcribe mode to +# 1. Start a cluster of Kafka source brokers +# 2. Start a cluster of Kafka target brokers +# 3. Start one or more Mirror Maker to create mirroring between +# source and target clusters +# 4. A producer produces batches of messages to the SOURCE brokers +# in the background +# 5. One of the Kafka SOURCE or TARGET brokers in the cluster will +# be randomly terminated and waiting for the consumer to catch up. +# 6. Repeat step 5 as many times as specified in the script +# 7. An independent ConsoleConsumer in publish/subcribe mode to # consume messages from the SOURCE brokers cluster -# 4. An independent ConsoleConsumer in publish/subcribe mode to -# consume messages from the MIRROR brokers cluster -# 5. A producer produces batches of messages to the SOURCE brokers -# 6. One of the Kafka SOURCE or MIRROR brokers in the cluster will -# be randomly terminated and waiting for the consumer to catch up. -# 7. Repeat Step 4 & 5 as many times as specified in the script +# 8. An independent ConsoleConsumer in publish/subcribe mode to +# consume messages from the TARGET brokers cluster # # Expected results: # ================== # There should not be any discrepancies by comparing the unique # message checksums from the source ConsoleConsumer and the -# mirror ConsoleConsumer. +# target ConsoleConsumer. # # Notes: # ================== @@ -63,102 +65,126 @@ # to shut them down. # ================================================================= -readonly base_dir=$(dirname $0)/.. -readonly test_start_time="$(date +%s)" +# ==================================== +# Do not change the followings +# (keep this section at the beginning +# of this script) +# ==================================== +readonly system_test_root=$(dirname $0)/../.. # path of /system_test +readonly common_dir=${system_test_root}/common # common util scripts for system_test +source ${common_dir}/util.sh # include the util script +readonly base_dir=$(dirname $0)/.. # the base dir of this test suite +readonly test_start_time="$(date +%s)" # time starting this test +readonly num_iterations=$1 # total no. of iterations to run +readonly svr_to_bounce=$2 # servers to bounce: 1-source 2-mirror_maker 3-target + # 12 - source & mirror_maker + # 13 - source & target +readonly bounce_source_id=1 +readonly bounce_mir_mkr_id=2 +readonly bounce_target_id=3 +iter=1 # init a counter to keep track of iterations + + +# ==================================== +# Change the followings as needed +# ==================================== readonly num_msg_per_batch=500 -readonly batches_per_iteration=5 -readonly num_iterations=5 +readonly producer_sleep_min=5 # min & max sleep time (in sec) between each +readonly producer_sleep_max=5 # batch of messages sent from producer +readonly num_kafka_source_server=4 # requires same no. of property files such as: + # $base_dir/config/server_source{1..4}.properties +readonly num_kafka_target_server=3 # requires same no. of property files such as: + # $base_dir/config/server_target{1..3}.properties +readonly num_kafka_mirror_maker=3 # any values greater than 0 +readonly wait_time_after_killing_broker=0 # wait after broker is stopped but before starting again +readonly wait_time_after_restarting_broker=10 -readonly zk_source_port=2181 -readonly zk_mirror_port=2182 - -readonly topic_prefix=test -readonly max_topic_id=2 -readonly unbalanced_start_id=2 -readonly consumer_grp=group1 +# ==================================== +# No need to change the following +# configurations in most cases +# ==================================== +readonly zk_source_port=2181 # source zk port +readonly zk_target_port=2182 # target zk port +readonly test_topic=test01 # topic used in this test +readonly consumer_grp=group1 # consumer group readonly source_console_consumer_grp=source -readonly mirror_console_consumer_grp=mirror +readonly target_console_consumer_grp=target readonly message_size=5000 +readonly console_consumer_timeout_ms=15000 -# sleep time between each batch of messages published -# from producer - it will be randomly generated -# within the range of sleep_min & sleep_max -readonly sleep_min=3 -readonly sleep_max=3 - -# requires same no. of property files such as: -# $base_dir/config/server_source{1..4}.properties -readonly num_kafka_source_server=4 - -# requires same no. of property files such as: -# $base_dir/config/server_target{1..3}.properties -readonly num_kafka_target_server=3 - -readonly wait_time_after_killing_broker=0 -readonly wait_time_after_restarting_broker=5 - -readonly producer_4_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093,4:localhost:9094" -readonly producer_3_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093" - -background_producer_pid_1= -background_producer_pid_2= - -no_bouncing=$# - -iter=1 -abort_test=false - +# ==================================== +# zookeeper +# ==================================== pid_zk_source= pid_zk_target= +# ==================================== +# kafka source +# ==================================== kafka_source_pids= kafka_source_prop_files= kafka_source_log_files= +kafka_topic_creation_log_file=$base_dir/kafka_topic_creation.log +# ==================================== +# kafka target +# ==================================== kafka_target_pids= kafka_target_prop_files= kafka_target_log_files= + +# ==================================== +# mirror maker +# ==================================== +kafka_mirror_maker_pids= +kafka_mirror_maker_log_files= +consumer_prop_file=$base_dir/config/whitelisttest.consumer.properties mirror_producer_prop_files= +# ==================================== +# console consumer source +# ==================================== console_consumer_source_pid= -console_consumer_mirror_pid= - console_consumer_source_log=$base_dir/console_consumer_source.log -console_consumer_mirror_log=$base_dir/console_consumer_mirror.log -producer_performance_log=$base_dir/producer_performance.log - console_consumer_source_crc_log=$base_dir/console_consumer_source_crc.log console_consumer_source_crc_sorted_log=$base_dir/console_consumer_source_crc_sorted.log console_consumer_source_crc_sorted_uniq_log=$base_dir/console_consumer_source_crc_sorted_uniq.log -console_consumer_mirror_crc_log=$base_dir/console_consumer_mirror_crc.log -console_consumer_mirror_crc_sorted_log=$base_dir/console_consumer_mirror_crc_sorted.log -console_consumer_mirror_crc_sorted_uniq_log=$base_dir/console_consumer_mirror_crc_sorted_uniq.log +# ==================================== +# console consumer target +# ==================================== +console_consumer_target_pid= +console_consumer_target_log=$base_dir/console_consumer_target.log +console_consumer_target_crc_log=$base_dir/console_consumer_target_crc.log +console_consumer_target_crc_sorted_log=$base_dir/console_consumer_target_crc_sorted.log +console_consumer_target_crc_sorted_uniq_log=$base_dir/console_consumer_target_crc_sorted_uniq.log +# ==================================== +# producer +# ==================================== +background_producer_pid= +producer_performance_log=$base_dir/producer_performance.log producer_performance_crc_log=$base_dir/producer_performance_crc.log producer_performance_crc_sorted_log=$base_dir/producer_performance_crc_sorted.log producer_performance_crc_sorted_uniq_log=$base_dir/producer_performance_crc_sorted_uniq.log +tmp_file_to_stop_background_producer=/tmp/tmp_file_to_stop_background_producer -consumer_rebalancing_log=$base_dir/consumer_rebalancing_verification.log - -consumer_prop_file=$base_dir/config/whitelisttest.consumer.properties +# ==================================== +# test reports +# ==================================== checksum_diff_log=$base_dir/checksum_diff.log -info() { - echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*" -} -info_no_newline() { - echo -e -n "$(date +"%Y-%m-%d %H:%M:%S") $*" -} - +# ==================================== +# initialize prop and log files +# ==================================== initialize() { for ((i=1; i<=$num_kafka_target_server; i++)) do kafka_target_prop_files[${i}]=$base_dir/config/server_target${i}.properties kafka_target_log_files[${i}]=$base_dir/kafka_target${i}.log - mirror_producer_prop_files[${i}]=$base_dir/config/mirror_producer${i}.properties + kafka_mirror_maker_log_files[${i}]=$base_dir/kafka_mirror_maker${i}.log done for ((i=1; i<=$num_kafka_source_server; i++)) @@ -166,48 +192,18 @@ kafka_source_prop_files[${i}]=$base_dir/config/server_source${i}.properties kafka_source_log_files[${i}]=$base_dir/kafka_source${i}.log done -} -# ========================================= -# get_random_range - return a random number -# between the lower & upper bounds -# usage: -# get_random_range $lower $upper -# random_no=$? -# ========================================= -get_random_range() { - lo=$1 - up=$2 - range=$(($up - $lo + 1)) - - return $(($(($RANDOM % range)) + $lo)) + for ((i=1; i<=$num_kafka_mirror_maker; i++)) + do + mirror_producer_prop_files[${i}]=$base_dir/config/mirror_producer${i}.properties + done } -verify_consumer_rebalancing() { - - info "Verifying consumer rebalancing operation" - - CONSUMER_REBALANCING_RESULT=`$base_dir/bin/kafka-run-class.sh \ - kafka.tools.VerifyConsumerRebalance \ - --zk.connect=localhost:2181 \ - --group $consumer_grp` - echo "$CONSUMER_REBALANCING_RESULT" >> $consumer_rebalancing_log - - REBALANCE_STATUS_LINE=`grep "Rebalance operation" $consumer_rebalancing_log | tail -1` - # info "REBALANCE_STATUS_LINE: $REBALANCE_STATUS_LINE" - REBALANCE_STATUS=`echo $REBALANCE_STATUS_LINE | grep "Rebalance operation successful" || echo -n "Rebalance operation failed"` - info "REBALANCE_STATUS: $REBALANCE_STATUS" - - if [ "${REBALANCE_STATUS}_x" == "Rebalance operation failed_x" ]; then - info "setting abort_test to true due to Rebalance operation failed" - abort_test="true" - fi -} - +# ==================================== +# wait_for_zero_consumer_lags +# ==================================== wait_for_zero_consumer_lags() { - topic_id=$1 - # no of times to check for zero lagging no_of_zero_to_verify=3 @@ -216,7 +212,7 @@ TOTAL_LAG=0 CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ --group $consumer_grp --zkconnect localhost:$zk_source_port \ - --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` + --topic $test_topic | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` for lag in $CONSUMER_LAGS; do @@ -235,10 +231,11 @@ done } +# ========================================= +# wait_for_zero_source_console_consumer_lags +# ========================================= wait_for_zero_source_console_consumer_lags() { - topic_id=$1 - # no of times to check for zero lagging no_of_zero_to_verify=3 @@ -247,7 +244,7 @@ TOTAL_LAG=0 CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ --group $source_console_consumer_grp --zkconnect localhost:$zk_source_port \ - --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` + --topic $test_topic | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` for lag in $CONSUMER_LAGS; do @@ -266,10 +263,11 @@ done } +# ========================================= +# wait_for_zero_mirror_console_consumer_lags +# ========================================= wait_for_zero_mirror_console_consumer_lags() { - topic_id=$1 - # no of times to check for zero lagging no_of_zero_to_verify=3 @@ -277,8 +275,8 @@ do TOTAL_LAG=0 CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ - --group $mirror_console_consumer_grp --zkconnect localhost:$zk_mirror_port \ - --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` + --group $target_console_consumer_grp --zkconnect localhost:$zk_target_port \ + --topic $test_topic | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` for lag in $CONSUMER_LAGS; do @@ -297,23 +295,15 @@ done } -kill_child_processes() { - isTopmost=$1 - curPid=$2 - childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}') - - for childPid in $childPids - do - kill_child_processes 0 $childPid - done - if [ $isTopmost -eq 0 ]; then - kill -15 $curPid 2> /dev/null - fi -} - +# ========================================= +# cleanup +# ========================================= cleanup() { info "cleaning up" + rm -rf $tmp_file_to_stop_background_producer + rm -rf $kafka_topic_creation_log_file + rm -rf /tmp/zookeeper_source rm -rf /tmp/zookeeper_target @@ -323,6 +313,7 @@ for ((i=1; i<=$num_kafka_target_server; i++)) do rm -rf ${kafka_target_log_files[${i}]} + rm -rf ${kafka_mirror_maker_log_files[${i}]} done rm -f $base_dir/zookeeper_source.log @@ -334,21 +325,22 @@ rm -f $producer_performance_crc_sorted_log rm -f $producer_performance_crc_sorted_uniq_log - rm -f $console_consumer_mirror_log + rm -f $console_consumer_target_log rm -f $console_consumer_source_log - rm -f $console_consumer_mirror_crc_log + rm -f $console_consumer_target_crc_log rm -f $console_consumer_source_crc_log rm -f $checksum_diff_log - rm -f $console_consumer_mirror_crc_sorted_log + rm -f $console_consumer_target_crc_sorted_log rm -f $console_consumer_source_crc_sorted_log - rm -f $console_consumer_mirror_crc_sorted_uniq_log + rm -f $console_consumer_target_crc_sorted_uniq_log rm -f $console_consumer_source_crc_sorted_uniq_log - - rm -f $consumer_rebalancing_log } +# ========================================= +# start_zk +# ========================================= start_zk() { info "starting zookeepers" @@ -363,6 +355,9 @@ pid_zk_target=$! } +# ========================================= +# start_source_servers_cluster +# ========================================= start_source_servers_cluster() { info "starting source cluster" @@ -372,6 +367,9 @@ done } +# ========================================= +# start_source_server +# ========================================= start_source_server() { s_idx=$1 @@ -383,95 +381,129 @@ info " -> kafka_source_pids[$s_idx]: ${kafka_source_pids[$s_idx]}" } +# ========================================= +# create_topic +# ========================================= +create_topic() { + this_topic_to_create=$1 + this_zk_conn_str=$2 + this_replica_factor=$3 + + info "creating topic [$this_topic_to_create] on [$this_zk_conn_str]" + $base_dir/../../bin/kafka-create-topic.sh --topic $this_topic_to_create \ + --zookeeper $this_zk_conn_str --replica $this_replica_factor 2> $kafka_topic_creation_log_file +} + +# ========================================= +# start_target_servers_cluster +# ========================================= start_target_servers_cluster() { info "starting mirror cluster" for ((i=1; i<=$num_kafka_target_server; i++)) do - start_embedded_consumer_server $i + start_target_server $i done } -start_embedded_consumer_server() { +# ========================================= +# start_target_server +# ========================================= +start_target_server() { s_idx=$1 $base_dir/bin/kafka-run-class.sh kafka.Kafka \ ${kafka_target_prop_files[${s_idx}]} \ - $consumer_prop_file \ - ${mirror_producer_prop_files[${s_idx}]} \ 2>&1 >> ${kafka_target_log_files[${s_idx}]} & # append log msg after restarting kafka_target_pids[$s_idx]=$! info " -> kafka_target_pids[$s_idx]: ${kafka_target_pids[$s_idx]}" } -start_console_consumer_for_source_producer() { +# ========================================= +# start_target_mirror_maker +# ========================================= +start_target_mirror_maker() { + info "starting mirror maker" - topic_id=$1 + for ((i=1; i<=$num_kafka_mirror_maker; i++)) + do + start_mirror_maker $i + done +} - info "starting console consumers for source producer on topic id [$topic_id]" +# ========================================= +# start_mirror_maker +# ========================================= +start_mirror_maker() { + s_idx=$1 + $base_dir/bin/kafka-run-class.sh kafka.tools.MirrorMaker \ + --consumer.config $consumer_prop_file \ + --producer.config ${mirror_producer_prop_files[${s_idx}]} \ + --whitelist=\".*\" \ + 2>&1 >> ${kafka_mirror_maker_log_files[$s_idx]} & # append log msg after restarting + kafka_mirror_maker_pids[${s_idx}]=$! + + info " -> kafka_mirror_maker_pids[$s_idx]: ${kafka_mirror_maker_pids[$s_idx]}" +} + +# ========================================= +# start_console_consumer_for_source +# ========================================= +start_console_consumer_for_source() { + info "starting console consumers for source" + $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \ --zookeeper localhost:$zk_source_port \ - --topic ${topic_prefix}_${topic_id} \ + --topic $test_topic \ --group $source_console_consumer_grp \ - --from-beginning --consumer-timeout-ms 5000 \ + --from-beginning \ + --consumer-timeout-ms $console_consumer_timeout_ms \ --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \ - --property topic=${topic_prefix}_${topic_id} \ - 2>&1 >> ${console_consumer_source_log} + 2>&1 > ${console_consumer_source_log} & + console_consumer_source_pid=$! + + info " -> console consumer source pid: $console_consumer_source_pid" } -start_console_consumer_for_mirror_producer() { +# ========================================= +# start_console_consumer_for_target +# ========================================= +start_console_consumer_for_target() { + info "starting console consumers for target" - topic_id=$1 - - info "starting console consumers for mirroring producer on topic id [$topic_id]" - $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \ - --zookeeper localhost:$zk_mirror_port \ - --topic ${topic_prefix}_${topic_id} \ - --group $mirror_console_consumer_grp \ - --from-beginning --consumer-timeout-ms 5000 \ + --zookeeper localhost:$zk_target_port \ + --topic $test_topic \ + --group $target_console_consumer_grp \ + --from-beginning \ + --consumer-timeout-ms $console_consumer_timeout_ms \ --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \ - --property topic=${topic_prefix}_${topic_id} \ - 2>&1 >> ${console_consumer_mirror_log} -} + 2>&1 > ${console_consumer_target_log} & + console_consumer_target_pid=$! -consume_source_producer_messages() { - consumer_counter=1 - while [ $consumer_counter -le $max_topic_id ] - do - start_console_consumer_for_source_producer $consumer_counter - consumer_counter=$(( $consumer_counter + 1 )) - done + info " -> console consumer mirror pid: $console_consumer_target_pid" } -consume_mirror_producer_messages() { - consumer_counter=1 - while [ $consumer_counter -le $max_topic_id ] - do - start_console_consumer_for_mirror_producer $consumer_counter - consumer_counter=$(( $consumer_counter + 1 )) - done -} - +# ========================================= +# shutdown_producer +# ========================================= shutdown_producer() { info "shutting down producer" - if [ "x${background_producer_pid_1}" != "x" ]; then - # kill_child_processes 0 ${background_producer_pid_1}; - kill -TERM ${background_producer_pid_1} 2> /dev/null; + if [ "x${background_producer_pid}" != "x" ]; then + # kill_child_processes 0 ${background_producer_pid}; + kill -TERM ${background_producer_pid} 2> /dev/null; fi - - if [ "x${background_producer_pid_2}" != "x" ]; then - # kill_child_processes 0 ${background_producer_pid_2}; - kill -TERM ${background_producer_pid_2} 2> /dev/null; - fi } +# ========================================= +# shutdown_servers +# ========================================= shutdown_servers() { info "shutting down mirror console consumer" - if [ "x${console_consumer_mirror_pid}" != "x" ]; then - kill_child_processes 0 ${console_consumer_mirror_pid}; + if [ "x${console_consumer_target_pid}" != "x" ]; then + kill_child_processes 0 ${console_consumer_target_pid}; fi info "shutting down source console consumer" @@ -479,6 +511,20 @@ kill_child_processes 0 ${console_consumer_source_pid}; fi + # ============================================================================== + # workaround for hanging ConsoleConsumer as --consumer-timeout-ms is not working + # ============================================================================== + `ps auxw | grep ChecksumMessageFormatter | grep -v grep | awk '{print $2}' | xargs kill -9` + + info "shutting down mirror makers" + for ((i=1; i<=$num_kafka_mirror_maker; i++)) + do + #info "stopping mm pid: ${kafka_mirror_maker_pids[$i]}" + if [ "x${kafka_mirror_maker_pids[$i]}" != "x" ]; then + kill_child_processes 0 ${kafka_mirror_maker_pids[$i]}; + fi + done + info "shutting down target servers" for ((i=1; i<=$num_kafka_target_server; i++)) do @@ -498,68 +544,78 @@ info "shutting down zookeeper servers" if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi + + # ============================================================================== + # shutting down any lingering processes + # ============================================================================== + `ps axuw | grep -i "java\|run\-" | grep -v grep | grep Kafka | grep server_target | awk '{print $2}' | xargs kill -9` + + # ============================================================================== + # workaround for hanging MirrorMaker + # ============================================================================== + `ps auxw | grep MirrorMaker | grep -v grep | awk '{print $2}' | xargs kill -9` } +# ========================================= +# start_background_producer +# ========================================= start_background_producer() { - bkrinfo_str=$1 - start_topic_id=$2 - end_topic_id=$3 + topic=$1 + batch_no=0 - topic_id=${start_topic_id} - while [ 'x' == 'x' ] + while [ ! -e $tmp_file_to_stop_background_producer ] do sleeptime= - get_random_range $sleep_min $sleep_max + get_random_range $producer_sleep_min $producer_sleep_max sleeptime=$? batch_no=$(($batch_no + 1)) - if [ $topic_id -gt $end_topic_id ]; then - topic_id=${start_topic_id} - fi - + info "producing $num_msg_per_batch messages on topic '$topic'" $base_dir/bin/kafka-run-class.sh \ kafka.perf.ProducerPerformance \ - --brokerinfo $bkrinfo_str \ - --topic ${topic_prefix}_${topic_id} \ + --brokerinfo zk.connect=localhost:2181 \ + --topic $topic \ --messages $num_msg_per_batch \ --message-size $message_size \ --batch-size 50 \ --vary-message-size \ --threads 1 \ - --reporting-interval $num_msg_per_batch --async \ + --reporting-interval $num_msg_per_batch \ + --async \ 2>&1 >> $base_dir/producer_performance.log # appending all producers' msgs - topic_id=$(( $topic_id + 1 )) - sleep $sleeptime done } +# ========================================= +# cmp_checksum +# ========================================= cmp_checksum() { cmp_result=0 - grep checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log - grep checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log - grep checksum $producer_performance_log | tr -d ' ' | cut -f4 -d ':' | cut -f1 -d '(' > $producer_performance_crc_log + grep ^checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log + grep ^checksum $console_consumer_target_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_target_crc_log + grep checksum $producer_performance_log | tr ' ' '\n' | grep checksum | awk -F ':' '{print $2}' > $producer_performance_crc_log - sort $console_consumer_mirror_crc_log > $console_consumer_mirror_crc_sorted_log + sort $console_consumer_target_crc_log > $console_consumer_target_crc_sorted_log sort $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_log sort $producer_performance_crc_log > $producer_performance_crc_sorted_log - sort -u $console_consumer_mirror_crc_log > $console_consumer_mirror_crc_sorted_uniq_log + sort -u $console_consumer_target_crc_log > $console_consumer_target_crc_sorted_uniq_log sort -u $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_uniq_log sort -u $producer_performance_crc_log > $producer_performance_crc_sorted_uniq_log msg_count_from_source_consumer=`cat $console_consumer_source_crc_log | wc -l | tr -d ' '` uniq_msg_count_from_source_consumer=`cat $console_consumer_source_crc_sorted_uniq_log | wc -l | tr -d ' '` - msg_count_from_mirror_consumer=`cat $console_consumer_mirror_crc_log | wc -l | tr -d ' '` - uniq_msg_count_from_mirror_consumer=`cat $console_consumer_mirror_crc_sorted_uniq_log | wc -l | tr -d ' '` + msg_count_from_mirror_consumer=`cat $console_consumer_target_crc_log | wc -l | tr -d ' '` + uniq_msg_count_from_mirror_consumer=`cat $console_consumer_target_crc_sorted_uniq_log | wc -l | tr -d ' '` uniq_msg_count_from_producer=`cat $producer_performance_crc_sorted_uniq_log | wc -l | tr -d ' '` @@ -567,17 +623,19 @@ duplicate_msg_in_producer=$(( $total_msg_published - $uniq_msg_count_from_producer )) - crc_only_in_mirror_consumer=`comm -23 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log` - crc_only_in_source_consumer=`comm -13 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log` - crc_common_in_both_consumer=`comm -12 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log` + crc_only_in_mirror_consumer=`comm -23 $console_consumer_target_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log` + crc_only_in_source_consumer=`comm -13 $console_consumer_target_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log` + crc_common_in_both_consumer=`comm -12 $console_consumer_target_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log` crc_only_in_producer=`comm -23 $producer_performance_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log` - duplicate_mirror_crc=`comm -23 $console_consumer_mirror_crc_sorted_log $console_consumer_mirror_crc_sorted_uniq_log` + duplicate_mirror_crc=`comm -23 $console_consumer_target_crc_sorted_log $console_consumer_target_crc_sorted_uniq_log` no_of_duplicate_msg=$(( $msg_count_from_mirror_consumer - $uniq_msg_count_from_mirror_consumer \ + $msg_count_from_source_consumer - $uniq_msg_count_from_source_consumer - \ 2*$duplicate_msg_in_producer )) + source_mirror_uniq_msg_diff=$(($uniq_msg_count_from_source_consumer - $uniq_msg_count_from_mirror_consumer)) + echo "" echo "========================================================" echo "no. of messages published : $total_msg_published" @@ -587,8 +645,7 @@ echo "mirror consumer msg rec'd : $msg_count_from_mirror_consumer" echo "mirror consumer unique msg rec'd : $uniq_msg_count_from_mirror_consumer" echo "total source/mirror duplicate msg : $no_of_duplicate_msg" - echo "source/mirror uniq msg count diff : $(($uniq_msg_count_from_source_consumer - \ - $uniq_msg_count_from_mirror_consumer))" + echo "source/mirror uniq msg count diff : $source_mirror_uniq_msg_diff" echo "========================================================" echo "(Please refer to $checksum_diff_log for more details)" echo "" @@ -613,94 +670,68 @@ echo "========================================================" >> $checksum_diff_log echo "${duplicate_mirror_crc}" >> $checksum_diff_log - topic_chksum_counter=1 - while [ $topic_chksum_counter -le $max_topic_id ] - do - # get producer topic counts - this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $producer_performance_log` - echo "PRODUCER topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}" - - topic_chksum_counter=$(($topic_chksum_counter + 1)) - done + echo "=================" + if [[ $source_mirror_uniq_msg_diff -eq 0 && $uniq_msg_count_from_source_consumer -gt 0 ]]; then + echo "## Test PASSED" + else + echo "## Test FAILED" + fi + echo "=================" echo - topic_chksum_counter=1 - while [ $topic_chksum_counter -le $max_topic_id ] - do - this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_source_log` - echo "SOURCE consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}" - - topic_chksum_counter=$(($topic_chksum_counter + 1)) - done - echo - - topic_chksum_counter=1 - while [ $topic_chksum_counter -le $max_topic_id ] - do - this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_mirror_log` - echo "MIRROR consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}" - - topic_chksum_counter=$(($topic_chksum_counter + 1)) - done - echo - return $cmp_result } +# ========================================= +# start_test +# ========================================= start_test() { start_zk sleep 2 + start_source_servers_cluster sleep 2 + + create_topic $test_topic localhost:$zk_source_port 1 + sleep 2 + start_target_servers_cluster sleep 2 - start_background_producer $producer_4_brokerinfo_str 1 $(( $unbalanced_start_id - 1 )) & - background_producer_pid_1=$! + start_target_mirror_maker + sleep 10 - info "==========================================" - info "Started background producer pid [${background_producer_pid_1}]" - info "==========================================" + start_background_producer $test_topic & + background_producer_pid=$! - sleep 10 + info "Started background producer pid [${background_producer_pid}]" + sleep 5 - start_background_producer $producer_3_brokerinfo_str $unbalanced_start_id $max_topic_id & - background_producer_pid_2=$! - - info "==========================================" - info "Started background producer pid [${background_producer_pid_2}]" - info "==========================================" - - sleep 10 - - verify_consumer_rebalancing - - info "abort_test: [${abort_test}]" - if [ "${abort_test}_x" == "true_x" ]; then - info "aborting test" - iter=$((${num_iterations} + 1)) - fi - + # loop for no. of iterations specified in $num_iterations while [ $num_iterations -ge $iter ] do - echo - info "==========================================" - info "Iteration $iter of ${num_iterations}" - info "==========================================" + # if $svr_to_bounce is '0', it means no bouncing + if [[ $num_iterations -ge $iter && $svr_to_bounce -gt 0 ]]; then + idx= - # terminate the broker if not the last iteration: - if [[ $num_iterations -gt $iter && $no_bouncing -eq 0 ]]; then + # check which type of broker bouncing is requested: source, mirror_maker or target - idx= + # $svr_to_bounce contains $bounce_target_id - eg. '3', '123', ... etc + svr_idx=`expr index $svr_to_bounce $bounce_target_id` + if [[ $num_iterations -ge $iter && $svr_idx -gt 0 ]]; then + echo + info "==========================================" + info "Iteration $iter of ${num_iterations}" + info "==========================================" - if [ $(( $iter % 2 )) -eq 0 ]; then - # even iterations -> bounce target kafka borker + # bounce target kafka broker get_random_range 1 $num_kafka_target_server idx=$? + if [ "x${kafka_target_pids[$idx]}" != "x" ]; then echo - info "#### Bouncing kafka TARGET broker ####" + info "#### Bouncing Kafka TARGET Broker ####" info "terminating kafka target[$idx] with process id ${kafka_target_pids[$idx]}" kill_child_processes 0 ${kafka_target_pids[$idx]} @@ -709,19 +740,58 @@ sleep $wait_time_after_killing_broker info "starting kafka target server" - start_embedded_consumer_server $idx + start_target_server $idx + fi + iter=$(($iter+1)) + info "sleeping for ${wait_time_after_restarting_broker}s" + sleep $wait_time_after_restarting_broker + fi - info "sleeping for ${wait_time_after_restarting_broker}s" - sleep $wait_time_after_restarting_broker + # $svr_to_bounce contains $bounce_mir_mkr_id - eg. '2', '123', ... etc + svr_idx=`expr index $svr_to_bounce $bounce_mir_mkr_id` + if [[ $num_iterations -ge $iter && $svr_idx -gt 0 ]]; then + echo + info "==========================================" + info "Iteration $iter of ${num_iterations}" + info "==========================================" + + # bounce mirror maker + get_random_range 1 $num_kafka_mirror_maker + idx=$? + + if [ "x${kafka_mirror_maker_pids[$idx]}" != "x" ]; then + echo + info "#### Bouncing Kafka Mirror Maker ####" + + info "terminating kafka mirror maker [$idx] with process id ${kafka_mirror_maker_pids[$idx]}" + kill_child_processes 0 ${kafka_mirror_maker_pids[$idx]} + + info "sleeping for ${wait_time_after_killing_broker}s" + sleep $wait_time_after_killing_broker + + info "starting kafka mirror maker" + start_mirror_maker $idx fi - else - # odd iterations -> bounce source kafka broker + iter=$(($iter+1)) + info "sleeping for ${wait_time_after_restarting_broker}s" + sleep $wait_time_after_restarting_broker + fi + + # $svr_to_bounce contains $bounce_source_id - eg. '1', '123', ... etc + svr_idx=`expr index $svr_to_bounce $bounce_source_id` + if [[ $num_iterations -ge $iter && $svr_idx -gt 0 ]]; then + echo + info "==========================================" + info "Iteration $iter of ${num_iterations}" + info "==========================================" + + # bounce source kafka broker get_random_range 1 $num_kafka_source_server idx=$? if [ "x${kafka_source_pids[$idx]}" != "x" ]; then echo - info "#### Bouncing kafka SOURCE broker ####" + info "#### Bouncing Kafka SOURCE Broker ####" info "terminating kafka source[$idx] with process id ${kafka_source_pids[$idx]}" kill_child_processes 0 ${kafka_source_pids[$idx]} @@ -731,49 +801,76 @@ info "starting kafka source server" start_source_server $idx - - info "sleeping for ${wait_time_after_restarting_broker}s" - sleep $wait_time_after_restarting_broker fi - fi + iter=$(($iter+1)) + info "sleeping for ${wait_time_after_restarting_broker}s" + sleep $wait_time_after_restarting_broker + fi + else + echo + info "==========================================" + info "Iteration $iter of ${num_iterations}" + info "==========================================" - verify_consumer_rebalancing - - info "abort_test: [${abort_test}]" - if [ "${abort_test}_x" == "true_x" ]; then - info "aborting test" - iter=$((${num_iterations} + 1)) - fi - - else info "No bouncing performed" + iter=$(($iter+1)) + info "sleeping for ${wait_time_after_restarting_broker}s" + sleep $wait_time_after_restarting_broker fi + done - info "sleeping for 10 sec" - sleep 10 + # notify background producer to stop + `touch $tmp_file_to_stop_background_producer` - iter=$(($iter+1)) - done - echo info "Tests completed. Waiting for consumers to catch up " - - shutdown_producer wait_for_zero_consumer_lags + # =============================================== + # No Consumer Lag info available, so sleep 10 sec + # =============================================== + info "sleeping for 30 sec" + sleep 30 } -# ===================== +# ========================================= # main test begins here -# ===================== +# ========================================= +# check correct number of arguments +if [ $# -lt 2 ]; then + echo + echo "Error : invalid no. of arguments" + echo "Usage : $0 " + echo + echo " num of iterations - the number of iterations that the test runs" + echo + echo " servers to bounce - the servers to be bounced in a round-robin fashion" + echo " Values of the servers:" + echo " 0 - no bouncing" + echo " 1 - source broker" + echo " 2 - mirror maker" + echo " 3 - target broker" + echo " Example:" + echo " * To bounce only mirror maker and target broker" + echo " in turns, enter the value 23" + echo " * To bounce only mirror maker, enter the value 2" + echo " * To run the test without bouncing, enter 0" + echo + echo "Usage Example : $0 10 12" + echo " (run 10 iterations and bounce source broker (1) + mirror maker (2) in turn)" + echo + exit +fi + echo info "============================================" info "#### Starting Kafka Broker Failure Test ####" info "============================================" echo +# initializing and cleanup initialize cleanup sleep 5 @@ -781,19 +878,34 @@ # Ctrl-c trap. Catches INT signal trap "shutdown_producer; shutdown_servers; cmp_checksum; exit 0" INT +# starting the test start_test -consume_source_producer_messages -consume_mirror_producer_messages +# starting consumer to consume data in source +start_console_consumer_for_source +# starting consumer to consume data in target +start_console_consumer_for_target + wait_for_zero_source_console_consumer_lags wait_for_zero_mirror_console_consumer_lags +# =============================================== +# No Consumer Lag info available, so sleep 10 sec +# =============================================== +sleep 10 -verify_consumer_rebalancing - shutdown_servers cmp_checksum result=$? +# =============================================== +# Report the time taken +# =============================================== +test_end_time="$(date +%s)" +total_test_time_sec=$(( $test_end_time - $test_start_time )) +total_test_time_min=$(( $total_test_time_sec / 60 )) +info "Total time taken: $total_test_time_min min for $num_iterations iterations" +echo + exit $result Index: system_test/broker_failure/README =================================================================== --- system_test/broker_failure/README (revision 1352297) +++ system_test/broker_failure/README (working copy) @@ -1,23 +1,26 @@ -This script performs broker failure tests with the following -setup in a single local machine: +This script performs broker failure tests in an environment with +Mirrored Source & Target clusters in a single machine: -1. A cluster of Kafka source brokers -2. A cluster of Kafka mirror brokers with embedded consumers in - point-to-point mode -3. An independent ConsoleConsumer in publish/subcribe mode to +1. Start a cluster of Kafka source brokers +2. Start a cluster of Kafka target brokers +3. Start one or more Mirror Maker to create mirroring between + source and target clusters +4. A producer produces batches of messages to the SOURCE brokers + in the background +5. The Kafka SOURCE, TARGET brokers and Mirror Maker will be + terminated in a round-robin fashion and wait for the consumer + to catch up. +6. Repeat step 5 as many times as specified in the script +7. An independent ConsoleConsumer in publish/subcribe mode to consume messages from the SOURCE brokers cluster -4. An independent ConsoleConsumer in publish/subcribe mode to - consume messages from the MIRROR brokers cluster -5. A producer produces batches of messages to the SOURCE brokers -6. One of the Kafka SOURCE or MIRROR brokers in the cluster will - be randomly terminated and waiting for the consumer to catch up. -7. Repeat Step 4 & 5 as many times as specified in the script +8. An independent ConsoleConsumer in publish/subcribe mode to + consume messages from the TARGET brokers cluster Expected results: ================== There should not be any discrepancies by comparing the unique message checksums from the source ConsoleConsumer and the -mirror ConsoleConsumer. +target ConsoleConsumer. Notes: ================== @@ -26,17 +29,36 @@ 2. Make sure that there are corresponding number of prop files: $base_dir/config/server_source{1..4}.properties -The number of Kafka MIRROR brokers can be increased as follows: +The number of Kafka TARGET brokers can be increased as follows: 1. Update the value of $num_kafka_target_server in this script 2. Make sure that there are corresponding number of prop files: $base_dir/config/server_target{1..3}.properties Quick Start: ================== -Execute this script as follows: - /system_test/broker_failure $ bin/run-test.sh +In the directory /system_test/broker_failure, +execute this script as following: + $ bin/run-test.sh +num of iterations - the number of iterations that the test runs + +servers to bounce - the servers to be bounced in a round-robin fashion. + + Values to be entered: + 1 - source broker + 2 - mirror maker + 3 - target broker + + Example: + * To bounce only mirror maker and target broker + in turns, enter the value 23. + * To bounce only mirror maker, enter the value 2. + * To run the test without bouncing, enter 0. + +At the end of the test, the received messages checksums in both +SOURCE & TARGET will be compared. If all checksums are matched, +the test is PASSED. Otherwise, the test is FAILED. + In the event of failure, by default the brokers and zookeepers remain running to make it easier to debug the issue - hit Ctrl-C to shut them down. -