Index: system_test/broker_failure/config/server_target1.properties =================================================================== --- system_test/broker_failure/config/server_target1.properties (revision 1341171) +++ system_test/broker_failure/config/server_target1.properties (working copy) @@ -15,7 +15,7 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=1 +brokerid=0 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost Index: system_test/broker_failure/config/mirror_producer1.properties =================================================================== --- system_test/broker_failure/config/mirror_producer1.properties (revision 1341171) +++ system_test/broker_failure/config/mirror_producer1.properties (working copy) @@ -15,7 +15,8 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9093 +#broker.list=0:localhost:9093 +zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/server_target2.properties =================================================================== --- system_test/broker_failure/config/server_target2.properties (revision 1341171) +++ system_test/broker_failure/config/server_target2.properties (working copy) @@ -15,7 +15,7 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=2 +brokerid=1 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost Index: system_test/broker_failure/config/mirror_producer2.properties =================================================================== --- system_test/broker_failure/config/mirror_producer2.properties (revision 1341171) +++ system_test/broker_failure/config/mirror_producer2.properties (working copy) @@ -15,7 +15,8 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9094 +#broker.list=0:localhost:9094 +zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/server_target3.properties =================================================================== --- system_test/broker_failure/config/server_target3.properties (revision 1341171) +++ system_test/broker_failure/config/server_target3.properties (working copy) @@ -15,7 +15,7 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=3 +brokerid=2 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost Index: system_test/broker_failure/config/mirror_producer3.properties =================================================================== --- system_test/broker_failure/config/mirror_producer3.properties (revision 1341171) +++ system_test/broker_failure/config/mirror_producer3.properties (working copy) @@ -15,7 +15,8 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9095 +#broker.list=0:localhost:9095 +zk.connect=localhost:2182 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/whitelisttest.consumer.properties =================================================================== --- system_test/broker_failure/config/whitelisttest.consumer.properties (revision 1341171) +++ system_test/broker_failure/config/whitelisttest.consumer.properties (working copy) @@ -26,4 +26,5 @@ groupid=group1 mirror.topics.whitelist=test01 +#mirror.topics.whitelist=mytest_1 Index: system_test/broker_failure/config/server_source1.properties =================================================================== --- system_test/broker_failure/config/server_source1.properties (revision 1341171) +++ system_test/broker_failure/config/server_source1.properties (working copy) @@ -15,7 +15,7 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=1 +brokerid=0 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost Index: system_test/broker_failure/config/server_source2.properties =================================================================== --- system_test/broker_failure/config/server_source2.properties (revision 1341171) +++ system_test/broker_failure/config/server_source2.properties (working copy) @@ -15,7 +15,7 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=2 +brokerid=1 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost Index: system_test/broker_failure/config/server_source3.properties =================================================================== --- system_test/broker_failure/config/server_source3.properties (revision 1341171) +++ system_test/broker_failure/config/server_source3.properties (working copy) @@ -15,7 +15,7 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=3 +brokerid=2 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost Index: system_test/broker_failure/config/server_source4.properties =================================================================== --- system_test/broker_failure/config/server_source4.properties (revision 1341171) +++ system_test/broker_failure/config/server_source4.properties (working copy) @@ -15,7 +15,7 @@ # see kafka.server.KafkaConfig for additional details and defaults # the id of the broker -brokerid=4 +brokerid=3 # hostname of broker. If not set, will pick up from the value returned # from getLocalHost. If there are multiple interfaces getLocalHost Index: system_test/broker_failure/config/log4j.properties =================================================================== --- system_test/broker_failure/config/log4j.properties (revision 1341171) +++ system_test/broker_failure/config/log4j.properties (working copy) @@ -26,16 +26,15 @@ # Turn on all our debugging info #log4j.logger.kafka=INFO -log4j.logger.org.I0Itec.zkclient.ZkClient=INFO -log4j.logger.org.apache.zookeeper=INFO +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG log4j.logger.kafka.consumer=DEBUG log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE log4j.logger.kafka.server.KafkaRequestHandlers=TRACE #log4j.logger.kafka.producer.async.AsyncProducer=TRACE #log4j.logger.kafka.producer.async.ProducerSendThread=TRACE log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE -log4j.logger.kafka.tools.VerifyConsumerRebalance=DEBUG # to print message checksum from ProducerPerformance log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG +log4j.logger.kafka.tools.GetZkOffsets$=DEBUG Index: system_test/broker_failure/bin/run-test.sh =================================================================== --- system_test/broker_failure/bin/run-test.sh (revision 1341171) +++ system_test/broker_failure/bin/run-test.sh (working copy) @@ -96,6 +96,8 @@ readonly wait_time_after_killing_broker=0 readonly wait_time_after_restarting_broker=5 +readonly console_consumer_timeout_ms=15000 + background_producer_pid= no_bouncing=$# @@ -132,8 +134,6 @@ producer_performance_crc_sorted_log=$base_dir/producer_performance_crc_sorted.log producer_performance_crc_sorted_uniq_log=$base_dir/producer_performance_crc_sorted_uniq.log -consumer_rebalancing_log=$base_dir/consumer_rebalancing_verification.log - consumer_prop_file=$base_dir/config/whitelisttest.consumer.properties checksum_diff_log=$base_dir/checksum_diff.log @@ -175,17 +175,6 @@ return $(($(($RANDOM % range)) + $lo)) } -verify_consumer_rebalancing() { - - info "Verifying consumer rebalancing operation" - - $base_dir/bin/kafka-run-class.sh \ - kafka.tools.VerifyConsumerRebalance \ - --zk.connect=localhost:2181 \ - --group $consumer_grp \ - 2>&1 >> $consumer_rebalancing_log -} - wait_for_zero_consumer_lags() { # no of times to check for zero lagging @@ -357,6 +346,16 @@ info " -> kafka_source_pids[$s_idx]: ${kafka_source_pids[$s_idx]}" } +create_topic() { + this_topic_to_create=$1 + this_zk_conn_str=$2 + this_replica_factor=$3 + + info "creating topic [$this_topic_to_create] on [$this_zk_conn_str]" + $base_dir/../../bin/kafka-create-topic.sh --topic $this_topic_to_create \ + --zookeeper $this_zk_conn_str --replica $this_replica_factor +} + start_target_servers_cluster() { info "starting mirror cluster" @@ -387,6 +386,7 @@ --topic $topic_1 \ --group $source_console_consumer_grp \ --from-beginning \ + --consumer-timeout-ms $console_consumer_timeout_ms \ --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \ 2>&1 > ${console_consumer_source_log} & console_consumer_source_pid=$! @@ -402,6 +402,7 @@ --topic $topic_1 \ --group $mirror_console_consumer_grp \ --from-beginning \ + --consumer-timeout-ms $console_consumer_timeout_ms \ --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \ 2>&1 > ${console_consumer_mirror_log} & console_consumer_mirror_pid=$! @@ -428,6 +429,11 @@ kill_child_processes 0 ${console_consumer_source_pid}; fi + # ============================================================================== + # workaround for hanging ConsoleConsumer as --consumer-timeout-ms is not working + # ============================================================================== + `ps auxw | grep ChecksumMessageFormatter | grep -v grep | awk '{print $2}' | xargs kill -9` + info "shutting down target servers" for ((i=1; i<=$num_kafka_target_server; i++)) do @@ -447,6 +453,11 @@ info "shutting down zookeeper servers" if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi + + # ============================================================================== + # shutting down any lingering processes + # ============================================================================== + `ps axuw | grep -i "java\|run\-" | grep -v grep | grep Kafka | grep server_target | awk '{print $2}' | xargs kill -9` } start_background_producer() { @@ -558,12 +569,44 @@ return $cmp_result } +start_simple_target_server() { + s_idx=$1 + + $base_dir/bin/kafka-run-class.sh kafka.Kafka \ + ${kafka_target_prop_files[${s_idx}]} \ + 2>&1 >> ${kafka_target_log_files[${s_idx}]} & # append log msg after restarting + kafka_target_pids[$s_idx]=$! + + info " -> kafka_target_pids[$s_idx]: ${kafka_target_pids[$s_idx]}" + info "sleeping for 10 sec" + sleep 10 + + info "killing kafka target server pid [${kafka_target_pids[$s_idx]}]" + kill_child_processes 0 ${kafka_target_pids[$s_idx]} + + info "sleeping for 10 sec" + sleep 10 +} + start_test() { start_zk sleep 2 + start_source_servers_cluster sleep 2 + + create_topic $topic_1 localhost:$zk_source_port 1 + sleep 2 + + # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + # work around for error: + # org.I0Itec.zkclient.exception.ZkNoNodeException: + # org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids + start_simple_target_server 1 + sleep 2 + # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + start_target_servers_cluster sleep 2 @@ -631,7 +674,6 @@ sleep $wait_time_after_restarting_broker fi fi - verify_consumer_rebalancing else info "No bouncing performed" fi @@ -648,6 +690,11 @@ shutdown_producer wait_for_zero_consumer_lags + # =============================================== + # No Consumer Lag info available, so sleep 10 sec + # =============================================== + sleep 10 + } @@ -675,9 +722,11 @@ wait_for_zero_source_console_consumer_lags wait_for_zero_mirror_console_consumer_lags +# =============================================== +# No Consumer Lag info available, so sleep 10 sec +# =============================================== +sleep 10 -verify_consumer_rebalancing - shutdown_servers cmp_checksum