Index: system_test/broker_failure/config/server_target1.properties =================================================================== --- system_test/broker_failure/config/server_target1.properties (revision 1241267) +++ system_test/broker_failure/config/server_target1.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9093 +port=9081 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/mirror_producer1.properties =================================================================== --- system_test/broker_failure/config/mirror_producer1.properties (revision 1241267) +++ system_test/broker_failure/config/mirror_producer1.properties (working copy) @@ -15,7 +15,7 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9093 +broker.list=0:localhost:9081 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/server_target2.properties =================================================================== --- system_test/broker_failure/config/server_target2.properties (revision 1241267) +++ system_test/broker_failure/config/server_target2.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9094 +port=9082 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/mirror_producer2.properties =================================================================== --- system_test/broker_failure/config/mirror_producer2.properties (revision 1241267) +++ system_test/broker_failure/config/mirror_producer2.properties (working copy) @@ -15,7 +15,7 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9094 +broker.list=0:localhost:9082 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/server_target3.properties =================================================================== --- system_test/broker_failure/config/server_target3.properties (revision 1241267) +++ system_test/broker_failure/config/server_target3.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9095 +port=9083 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/mirror_producer3.properties =================================================================== --- system_test/broker_failure/config/mirror_producer3.properties (revision 1241267) +++ system_test/broker_failure/config/mirror_producer3.properties (working copy) @@ -15,7 +15,7 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9095 +broker.list=0:localhost:9083 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/whitelisttest.consumer.properties =================================================================== --- system_test/broker_failure/config/whitelisttest.consumer.properties (revision 1241267) +++ system_test/broker_failure/config/whitelisttest.consumer.properties (working copy) @@ -25,5 +25,5 @@ #consumer group id groupid=group1 -mirror.topics.whitelist=test01 - +mirror.topics.whitelist=test_1,test_2 +autooffset.reset=smallest Index: system_test/broker_failure/config/server_source1.properties =================================================================== --- system_test/broker_failure/config/server_source1.properties (revision 1241267) +++ system_test/broker_failure/config/server_source1.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9092 +port=9091 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/server_source2.properties =================================================================== --- system_test/broker_failure/config/server_source2.properties (revision 1241267) +++ system_test/broker_failure/config/server_source2.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9091 +port=9092 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/server_source3.properties =================================================================== --- system_test/broker_failure/config/server_source3.properties (revision 1241267) +++ system_test/broker_failure/config/server_source3.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9090 +port=9093 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/server_source4.properties =================================================================== --- system_test/broker_failure/config/server_source4.properties (revision 1241267) +++ system_test/broker_failure/config/server_source4.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9096 +port=9094 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/log4j.properties =================================================================== --- system_test/broker_failure/config/log4j.properties (revision 1241267) +++ system_test/broker_failure/config/log4j.properties (working copy) @@ -12,30 +12,74 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=INFO, stdout +log4j.rootLogger=INFO, stdout, kafkaAppender + +# ==================================== +# messages going to kafkaAppender +# ==================================== +log4j.logger.kafka=DEBUG, kafkaAppender +log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG, kafkaAppender +log4j.logger.org.apache.zookeeper=DEBUG, kafkaAppender + +# ==================================== +# messages going to zookeeperAppender +# ==================================== +# (comment out this line to redirect ZK-related messages to kafkaAppender +# to allow reading both Kafka and ZK debugging messages in a single file) +#log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender + +# ==================================== +# stdout +# ==================================== log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n -#log4j.appender.fileAppender=org.apache.log4j.FileAppender -#log4j.appender.fileAppender.File=kafka-request.log -#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout -#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n +# ==================================== +# fileAppender +# ==================================== +log4j.appender.fileAppender=org.apache.log4j.FileAppender +log4j.appender.fileAppender.File=/tmp/kafka_all_request.log +log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.fileAppender.layout.ConversionPattern=[%d] %p %m (%c)%n +# ==================================== +# kafkaAppender +# ==================================== +log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.kafkaAppender.File=/tmp/kafka.log +log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n +log4j.additivity.kafka=true -# Turn on all our debugging info -#log4j.logger.kafka=INFO -log4j.logger.org.I0Itec.zkclient.ZkClient=INFO -log4j.logger.org.apache.zookeeper=INFO -log4j.logger.kafka.consumer=DEBUG -log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE -log4j.logger.kafka.server.KafkaRequestHandlers=TRACE +# ==================================== +# zookeeperAppender +# ==================================== +log4j.appender.zookeeperAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.zookeeperAppender.File=/tmp/zookeeper.log +log4j.appender.zookeeperAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.zookeeperAppender.layout.ConversionPattern=[%d] %p %m (%c)%n +log4j.additivity.org.apache.zookeeper=false + +# ==================================== +# other available debugging info +# ==================================== +#log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE +#log4j.logger.kafka.server.KafkaRequestHandlers=TRACE #log4j.logger.kafka.producer.async.AsyncProducer=TRACE #log4j.logger.kafka.producer.async.ProducerSendThread=TRACE -log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE +#log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE + +log4j.logger.kafka.consumer=DEBUG log4j.logger.kafka.tools.VerifyConsumerRebalance=DEBUG # to print message checksum from ProducerPerformance -log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG +log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG +# to print socket buffer size validated by Kafka broker +log4j.logger.kafka.network.Acceptor=DEBUG + +# to print socket buffer size validated by SimpleConsumer +log4j.logger.kafka.consumer.SimpleConsumer=TRACE + Index: system_test/broker_failure/bin/run-test.sh =================================================================== --- system_test/broker_failure/bin/run-test.sh (revision 1241267) +++ system_test/broker_failure/bin/run-test.sh (working copy) @@ -68,12 +68,14 @@ readonly num_msg_per_batch=500 readonly batches_per_iteration=5 -readonly num_iterations=12 +readonly num_iterations=10 readonly zk_source_port=2181 readonly zk_mirror_port=2182 -readonly topic_1=test01 +readonly topic_prefix=test +readonly max_topic_id=2 +readonly unbalanced_start_id=2 readonly consumer_grp=group1 readonly source_console_consumer_grp=source readonly mirror_console_consumer_grp=mirror @@ -96,10 +98,16 @@ readonly wait_time_after_killing_broker=0 readonly wait_time_after_restarting_broker=5 -background_producer_pid= +readonly producer_4_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093,4:localhost:9094" +readonly producer_3_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093" + +background_producer_pid_1= +background_producer_pid_2= + no_bouncing=$# iter=1 +abort_test=false pid_zk_source= pid_zk_target= @@ -177,17 +185,29 @@ verify_consumer_rebalancing() { - info "Verifying consumer rebalancing operation" + info "Verifying consumer rebalancing operation" - $base_dir/bin/kafka-run-class.sh \ - kafka.tools.VerifyConsumerRebalance \ - --zk.connect=localhost:2181 \ - --group $consumer_grp \ - 2>&1 >> $consumer_rebalancing_log + CONSUMER_REBALANCING_RESULT=`$base_dir/bin/kafka-run-class.sh \ + kafka.tools.VerifyConsumerRebalance \ + --zk.connect=localhost:2181 \ + --group $consumer_grp` + echo "$CONSUMER_REBALANCING_RESULT" >> $consumer_rebalancing_log + + REBALANCE_STATUS_LINE=`grep "Rebalance operation" $consumer_rebalancing_log | tail -1` + # info "REBALANCE_STATUS_LINE: $REBALANCE_STATUS_LINE" + REBALANCE_STATUS=`echo $REBALANCE_STATUS_LINE | grep "Rebalance operation successful" || echo -n "Rebalance operation failed"` + info "REBALANCE_STATUS: $REBALANCE_STATUS" + + if [ "${REBALANCE_STATUS}_x" == "Rebalance operation failed_x" ]; then + info "setting abort_test to true due to Rebalance operation failed" + abort_test="true" + fi } wait_for_zero_consumer_lags() { + topic_id=$1 + # no of times to check for zero lagging no_of_zero_to_verify=3 @@ -196,7 +216,7 @@ TOTAL_LAG=0 CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ --group $consumer_grp --zkconnect localhost:$zk_source_port \ - --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` + --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` for lag in $CONSUMER_LAGS; do @@ -217,6 +237,8 @@ wait_for_zero_source_console_consumer_lags() { + topic_id=$1 + # no of times to check for zero lagging no_of_zero_to_verify=3 @@ -225,7 +247,7 @@ TOTAL_LAG=0 CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ --group $source_console_consumer_grp --zkconnect localhost:$zk_source_port \ - --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` + --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` for lag in $CONSUMER_LAGS; do @@ -246,6 +268,8 @@ wait_for_zero_mirror_console_consumer_lags() { + topic_id=$1 + # no of times to check for zero lagging no_of_zero_to_verify=3 @@ -254,7 +278,7 @@ TOTAL_LAG=0 CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ --group $mirror_console_consumer_grp --zkconnect localhost:$zk_mirror_port \ - --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` + --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` for lag in $CONSUMER_LAGS; do @@ -321,6 +345,8 @@ rm -f $console_consumer_source_crc_sorted_log rm -f $console_consumer_mirror_crc_sorted_uniq_log rm -f $console_consumer_source_crc_sorted_uniq_log + + rm -f $consumer_rebalancing_log } start_zk() { @@ -380,41 +406,66 @@ } start_console_consumer_for_source_producer() { - info "starting console consumers for source producer" + topic_id=$1 + + info "starting console consumers for source producer on topic id [$topic_id]" + $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \ --zookeeper localhost:$zk_source_port \ - --topic $topic_1 \ + --topic ${topic_prefix}_${topic_id} \ --group $source_console_consumer_grp \ - --from-beginning \ + --from-beginning --consumer-timeout-ms 5000 \ --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \ - 2>&1 > ${console_consumer_source_log} & - console_consumer_source_pid=$! - - info " -> console consumer source pid: $console_consumer_source_pid" + --property topic=${topic_prefix}_${topic_id} \ + 2>&1 >> ${console_consumer_source_log} } start_console_consumer_for_mirror_producer() { - info "starting console consumers for mirroring producer" + topic_id=$1 + + info "starting console consumers for mirroring producer on topic id [$topic_id]" + $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \ --zookeeper localhost:$zk_mirror_port \ - --topic $topic_1 \ + --topic ${topic_prefix}_${topic_id} \ --group $mirror_console_consumer_grp \ - --from-beginning \ + --from-beginning --consumer-timeout-ms 5000 \ --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \ - 2>&1 > ${console_consumer_mirror_log} & - console_consumer_mirror_pid=$! + --property topic=${topic_prefix}_${topic_id} \ + 2>&1 >> ${console_consumer_mirror_log} +} - info " -> console consumer mirror pid: $console_consumer_mirror_pid" +consume_source_producer_messages() { + consumer_counter=1 + while [ $consumer_counter -le $max_topic_id ] + do + start_console_consumer_for_source_producer $consumer_counter + consumer_counter=$(( $consumer_counter + 1 )) + done } +consume_mirror_producer_messages() { + consumer_counter=1 + while [ $consumer_counter -le $max_topic_id ] + do + start_console_consumer_for_mirror_producer $consumer_counter + consumer_counter=$(( $consumer_counter + 1 )) + done +} + shutdown_producer() { info "shutting down producer" - if [ "x${background_producer_pid}" != "x" ]; then - # kill_child_processes 0 ${background_producer_pid}; - kill -TERM ${background_producer_pid} 2> /dev/null; + if [ "x${background_producer_pid_1}" != "x" ]; then + # kill_child_processes 0 ${background_producer_pid_1}; + kill -TERM ${background_producer_pid_1} 2> /dev/null; fi + + if [ "x${background_producer_pid_2}" != "x" ]; then + # kill_child_processes 0 ${background_producer_pid_2}; + kill -TERM ${background_producer_pid_2} 2> /dev/null; + fi } shutdown_servers() { @@ -450,13 +501,15 @@ } start_background_producer() { + bkrinfo_str=$1 + start_topic_id=$2 + end_topic_id=$3 batch_no=0 - curr_iter=0 + topic_id=${start_topic_id} - while [ $num_iterations -gt $curr_iter ] + while [ 'x' == 'x' ] do - topic=$1 sleeptime= get_random_range $sleep_min $sleep_max @@ -464,19 +517,24 @@ batch_no=$(($batch_no + 1)) + if [ $topic_id -gt $end_topic_id ]; then + topic_id=${start_topic_id} + fi + $base_dir/bin/kafka-run-class.sh \ kafka.perf.ProducerPerformance \ - --brokerinfo zk.connect=localhost:2181 \ - --topic $topic \ + --brokerinfo $bkrinfo_str \ + --topic ${topic_prefix}_${topic_id} \ --messages $num_msg_per_batch \ --message-size $message_size \ --batch-size 50 \ --vary-message-size \ --threads 1 \ - --reporting-interval $num_msg_per_batch \ - --async \ + --reporting-interval $num_msg_per_batch --async \ 2>&1 >> $base_dir/producer_performance.log # appending all producers' msgs + topic_id=$(( $topic_id + 1 )) + sleep $sleeptime done } @@ -485,9 +543,9 @@ cmp_result=0 - grep ^checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log - grep ^checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log - grep ^checksum $producer_performance_log | tr -d ' ' | cut -f2 -d ':' > $producer_performance_crc_log + grep checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log + grep checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log + grep checksum $producer_performance_log | tr -d ' ' | cut -f2 -d ':' > $producer_performance_crc_log sort $console_consumer_mirror_crc_log > $console_consumer_mirror_crc_sorted_log sort $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_log @@ -555,6 +613,37 @@ echo "========================================================" >> $checksum_diff_log echo "${duplicate_mirror_crc}" >> $checksum_diff_log + topic_chksum_counter=1 + while [ $topic_chksum_counter -le $max_topic_id ] + do + # get producer topic counts + this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $producer_performance_log` + echo "PRODUCER topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}" + + topic_chksum_counter=$(($topic_chksum_counter + 1)) + done + echo + + topic_chksum_counter=1 + while [ $topic_chksum_counter -le $max_topic_id ] + do + this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_source_log` + echo "SOURCE consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}" + + topic_chksum_counter=$(($topic_chksum_counter + 1)) + done + echo + + topic_chksum_counter=1 + while [ $topic_chksum_counter -le $max_topic_id ] + do + this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_mirror_log` + echo "MIRROR consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}" + + topic_chksum_counter=$(($topic_chksum_counter + 1)) + done + echo + return $cmp_result } @@ -567,15 +656,32 @@ start_target_servers_cluster sleep 2 - start_background_producer $topic_1 & - background_producer_pid=$! + start_background_producer $producer_4_brokerinfo_str 1 $(( $unbalanced_start_id - 1 )) & + background_producer_pid_1=$! info "==========================================" - info "Started background producer pid [${background_producer_pid}]" + info "Started background producer pid [${background_producer_pid_1}]" info "==========================================" - sleep 5 - + sleep 10 + + start_background_producer $producer_3_brokerinfo_str $unbalanced_start_id $max_topic_id & + background_producer_pid_2=$! + + info "==========================================" + info "Started background producer pid [${background_producer_pid_2}]" + info "==========================================" + + sleep 10 + + verify_consumer_rebalancing + + info "abort_test: [${abort_test}]" + if [ "${abort_test}_x" == "true_x" ]; then + info "aborting test" + iter=$((${num_iterations} + 1)) + fi + while [ $num_iterations -ge $iter ] do echo @@ -592,7 +698,6 @@ # even iterations -> bounce target kafka borker get_random_range 1 $num_kafka_target_server idx=$? - if [ "x${kafka_target_pids[$idx]}" != "x" ]; then echo info "#### Bouncing kafka TARGET broker ####" @@ -631,7 +736,15 @@ sleep $wait_time_after_restarting_broker fi fi + verify_consumer_rebalancing + + info "abort_test: [${abort_test}]" + if [ "${abort_test}_x" == "true_x" ]; then + info "aborting test" + iter=$((${num_iterations} + 1)) + fi + else info "No bouncing performed" fi @@ -670,8 +783,8 @@ start_test -start_console_consumer_for_source_producer -start_console_consumer_for_mirror_producer +consume_source_producer_messages +consume_mirror_producer_messages wait_for_zero_source_console_consumer_lags wait_for_zero_mirror_console_consumer_lags Index: core/src/main/scala/kafka/consumer/ConsoleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (revision 1241267) +++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (working copy) @@ -210,9 +210,19 @@ } class ChecksumMessageFormatter extends MessageFormatter { + private var topicStr: String = _ + + override def init(props: Properties) { + topicStr = props.getProperty("topic") + if (topicStr != null) + topicStr = topicStr + "-" + else + topicStr = "" + } + def writeTo(message: Message, output: PrintStream) { val chksum = message.checksum - output.println("checksum:" + chksum) + output.println(topicStr + "checksum:" + chksum) } } Index: perf/src/main/scala/kafka/perf/ProducerPerformance.scala =================================================================== --- perf/src/main/scala/kafka/perf/ProducerPerformance.scala (revision 1241267) +++ perf/src/main/scala/kafka/perf/ProducerPerformance.scala (working copy) @@ -203,11 +203,11 @@ rand.nextBytes(messageBytes) val message = new Message(messageBytes) producer.send(new ProducerData[Message,Message](config.topic, message)) - if(logger.isDebugEnabled) println("checksum:" + message.checksum) + if(logger.isDebugEnabled) println(config.topic + "-checksum:" + message.checksum) bytesSent += message.payloadSize }else { producer.send(new ProducerData[Message,Message](config.topic, message)) - if(logger.isDebugEnabled) println("checksum:" + message.checksum) + if(logger.isDebugEnabled) println(config.topic + "-checksum:" + message.checksum) bytesSent += message.payloadSize } nSends += 1