Index: system_test/single_host_multi_brokers/bin/run-test.sh =================================================================== --- system_test/single_host_multi_brokers/bin/run-test.sh (revision 1355130) +++ system_test/single_host_multi_brokers/bin/run-test.sh (working copy) @@ -33,10 +33,14 @@ readonly config_dir=${base_dir}/config readonly test_start_time="$(date +%s)" # time starting the test +max_reelection_latency_ms=0 +min_reelection_latency_ms=10000 # ==================================== -# Change the followings as needed +# No need to change the following +# configurations in most cases # ==================================== + readonly num_kafka_server=3 # same no. of property files such as server_{1..n}.properties # will be automatically generated readonly replica_factor=3 # should be less than or equal to "num_kafka_server" @@ -45,10 +49,6 @@ readonly producer_msg_batch_size=200 # batch no. of messsages by producer readonly consumer_timeout_ms=10000 # elapsed time for consumer to timeout and exit -# ==================================== -# No need to change the following -# configurations in most cases -# ==================================== readonly test_topic=mytest readonly max_wait_for_consumer_complete=30 readonly zk_prop_pathname=${config_dir}/zookeeper.properties @@ -88,6 +88,7 @@ pid_zk= kafka_pids= test_failure_counter=0 +leader_elected_timestamp= initialize() { info "initializing ..." @@ -109,6 +110,7 @@ info "kafka $i prop file : ${kafka_prop_pathnames[$i]}" info "kafka $i brokerid : ${kafka_brokerids[$i]}" info "kafka $i socket : ${kafka_sock_ports[$i]}" + echo done info "zookeeper port : $zk_port" @@ -142,11 +144,42 @@ get_leader_brokerid() { log_line=`grep -i -h 'is leader' ${base_dir}/kafka_server_*.log | sort | tail -1` info "found the log line: $log_line" + broker_id=`echo $log_line | awk -F ' ' '{print $5}'` return $broker_id } +get_elected_leader_unix_timestamp() { + log_line=`grep -i -h 'is leader' ${base_dir}/kafka_server_*.log | sort | tail -1` + info "found the log line: $log_line" + + this_timestamp=`echo $log_line | cut -f2 -d '[' | cut -f1 -d ']'` + elected_leader_unix_timestamp_ms=`echo $this_timestamp | cut -f2 -d ','` + elected_leader_unix_timestamp=`date -d "$this_timestamp" +%s` + + info "elected_leader_unix_timestamp : $elected_leader_unix_timestamp" + info "elected_leader_unix_timestamp_ms : $elected_leader_unix_timestamp_ms" + + full_elected_leader_unix_timestamp="${elected_leader_unix_timestamp}.${elected_leader_unix_timestamp_ms}" +} + +get_server_shutdown_unix_timestamp() { + s_idx=$1 + + log_line=`grep -i -h 'shut down completed' ${kafka_log4j_log_pathnames[$s_idx]} | tail -1` + info "found the log line: $log_line" + + this_timestamp=`echo $log_line | cut -f2 -d '[' | cut -f1 -d ']'` + server_shutdown_unix_timestamp_ms=`echo $this_timestamp | cut -f2 -d ','` + server_shutdown_unix_timestamp=`date -d "$this_timestamp" +%s` + + info "server_shutdown_unix_timestamp : $server_shutdown_unix_timestamp" + info "server_shutdown_unix_timestamp_ms : $server_shutdown_unix_timestamp_ms" + + full_server_shutdown_unix_timestamp="${server_shutdown_unix_timestamp}.${server_shutdown_unix_timestamp_ms}" +} + start_zk() { info "starting zookeeper" $base_dir/../../bin/zookeeper-server-start.sh $zk_prop_pathname \ @@ -228,16 +261,19 @@ info "shutting down zookeeper servers" if [ "x${pid_zk}" != "x" ]; then kill_child_processes 0 ${pid_zk}; fi + + force_shutdown_producer + force_shutdown_consumer } force_shutdown_producer() { info "force shutdown producer" - `ps auxw | grep ProducerPerformance | awk '{print $2}' | xargs kill -9` + `ps auxw | grep ProducerPerformance | awk '{print $2}' | xargs kill -9 2> /dev/null` } force_shutdown_consumer() { info "force shutdown consumer" - `ps auxw | grep ConsoleConsumer | awk '{print $2}' | xargs kill -9` + `ps auxw | grep ConsoleConsumer | awk '{print $2}' | xargs kill -9 2> /dev/null` } create_topic() { @@ -265,11 +301,11 @@ first_data_file_pathname=${first_data_file_dir}/$first_data_file kafka_first_data_file_sizes[$i]=`stat -c%s ${first_data_file_pathname}` kafka_first_data_file_checksums[$i]=`cksum ${first_data_file_pathname} | awk '{print $1}'` - info "## broker[$i] data file: ${first_data_file_pathname} : [${kafka_first_data_file_sizes[$i]}]" + info "## broker[$i] data file: ${first_data_file_pathname} size : [${kafka_first_data_file_sizes[$i]}]" info "## ==> crc ${kafka_first_data_file_checksums[$i]}" done - # get the checksums from messages produced and consumed + # get the checksums from messages produced and consumed, sort them and output to log files grep checksum $console_consumer_log_pathname | tr -d ' ' | awk -F ':' '{print $2}' > $console_consumer_crc_log_pathname grep checksum $producer_perf_log_pathname | tr ' ' '\n' | grep checksum | awk -F ':' '{print $2}' > $producer_perf_crc_log_pathname @@ -296,11 +332,14 @@ validation_start_unix_ts=`date +%s` curr_unix_ts=`date +%s` size_unmatched_idx=1 + + # do a while-loop to check every 5 sec if the replica file sizes are matched + # (up to the value of $max_wait_for_consumer_complete) while [[ $(( $curr_unix_ts - $validation_start_unix_ts )) -le $max_wait_for_consumer_complete && $size_unmatched_idx -gt 0 ]] do info "wait 5s (up to ${max_wait_for_consumer_complete}s) and check replicas data sizes" sleep 5 - + first_element_value=${kafka_first_data_file_sizes[1]} for ((i=2; i<=${#kafka_first_data_file_sizes[@]}; i++)) do @@ -315,13 +354,19 @@ curr_unix_ts=`date +%s` done + echo + # validate that sizes of all replicas should match first_element_value=${kafka_first_data_file_sizes[1]} for ((i=2; i<=${#kafka_first_data_file_sizes[@]}; i++)) do - if [ $first_element_value -ne ${kafka_first_data_file_sizes[$i]} ]; then + if [ ${kafka_first_data_file_sizes[$i]} -eq 0 ]; then + info "## FAILURE: File[$i] zero file size found" + elif [ $first_element_value -ne ${kafka_first_data_file_sizes[$i]} ]; then info "## FAILURE: Unmatched size found" test_failure_counter=$(( $test_failure_counter + 1 )) + else + info "## PASSED: Data files sizes matched" fi done @@ -329,17 +374,31 @@ first_element_value=${kafka_first_data_file_checksums[1]} for ((i=2; i<=${#kafka_first_data_file_checksums[@]}; i++)) do - if [ $first_element_value -ne ${kafka_first_data_file_checksums[$i]} ]; then + if [ ${kafka_first_data_file_sizes[$i]} -eq 0 ]; then + info "## FAILURE: Checksum cannot be validated because file[$i] zero file size found" + elif [ $first_element_value -ne ${kafka_first_data_file_checksums[$i]} ]; then info "## FAILURE: Unmatched checksum found" test_failure_counter=$(( $test_failure_counter + 1 )) + else + info "## PASSED: Data files checksums matched" fi done # validate that there is no data loss if [ $uniq_msg_count_from_producer_perf -ne $uniq_msg_count_from_console_consumer ]; then + info "## FAILURE: Data loss found" test_failure_counter=$(( $test_failure_counter + 1 )) + else + info "## PASSED: Message counts matched" fi + info "## Max latency : $max_reelection_latency_ms ms" + info "## Min latency : $min_reelection_latency_ms ms" + + # ===================================================== + # !!! This test will fail until KAFKA-350 is fixed !!! + # ===================================================== + # report PASSED or FAILED info "========================================================" if [ $test_failure_counter -eq 0 ]; then @@ -382,21 +441,46 @@ for ((i=1; i<=$num_kafka_server; i++)) do - info "kafka server [$i] - reading leader" + echo + info "=======================================" + info "Iteration $i of $num_kafka_server" + info "=======================================" + echo + info "looking up leader" get_leader_brokerid ldr_bkr_id=$? - info "leader broker id: $ldr_bkr_id" + info "current leader's broker id : $ldr_bkr_id" svr_idx=$(($ldr_bkr_id + 1)) - # ========================================================== - # If KAFKA-350 is fixed, uncomment the following 3 lines to - # STOP the server for failure test - # ========================================================== - #stop_server $svr_idx - #info "sleeping for 10s" - #sleep 10 + stop_server $svr_idx + info "sleeping 10s before looking up the server shutdown unix timestamp" + sleep 10 + get_server_shutdown_unix_timestamp $svr_idx + get_elected_leader_unix_timestamp + + info "full_elected_leader_unix_timestamp : $full_elected_leader_unix_timestamp" + info "full_server_shutdown_unix_timestamp : $full_server_shutdown_unix_timestamp" + + reelected_leader_latency=`echo "$full_elected_leader_unix_timestamp - $full_server_shutdown_unix_timestamp" | bc` + reelected_leader_latency_ms_float=`echo "$reelected_leader_latency * 1000" | bc` + reelected_leader_latency_ms=${reelected_leader_latency_ms_float/.*} + + info "---------------------------------------" + info "leader re-election latency : $reelected_leader_latency_ms ms" + info "---------------------------------------" + + # update $max_reelection_latency_ms + if [ $reelected_leader_latency_ms -gt $max_reelection_latency_ms ]; then + max_reelection_latency_ms=$reelected_leader_latency_ms + fi + + # update $min_reelection_latency_ms + if [ $reelected_leader_latency_ms -le $min_reelection_latency_ms ]; then + min_reelection_latency_ms=$reelected_leader_latency_ms + fi + start_console_consumer $test_topic localhost:$zk_port info "sleeping for 5s" sleep 5 @@ -404,15 +488,11 @@ start_producer_perf $test_topic localhost:$zk_port $producer_msg_batch_size info "sleeping for 15s" sleep 15 - echo - # ========================================================== - # If KAFKA-350 is fixed, uncomment the following 3 lines to - # START the server for failure test - # ========================================================== - #start_server $svr_idx - #info "sleeping for 30s" - #sleep 30 + info "starting previously stopped kafka broker [$svr_idx]" + start_server $svr_idx + info "sleeping for 30s" + sleep 30 done validate_results