Index: system_test/broker_failure/config/server_source1.properties =================================================================== --- system_test/broker_failure/config/server_source1.properties (revision 1353356) +++ system_test/broker_failure/config/server_source1.properties (working copy) @@ -41,7 +41,8 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=10000000 +#log.file.size=1000000 +log.file.size=10000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/bin/kafka-run-class.sh =================================================================== --- system_test/broker_failure/bin/kafka-run-class.sh (revision 1353356) +++ system_test/broker_failure/bin/kafka-run-class.sh (working copy) @@ -28,6 +28,11 @@ CLASSPATH=$CLASSPATH:$file done +for file in $kafka_inst_dir/examples/target/scala_2.8.0/*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + for file in $kafka_inst_dir/core/target/scala_2.8.0/*.jar; do CLASSPATH=$CLASSPATH:$file Index: system_test/broker_failure/bin/run-test-debug.sh =================================================================== --- system_test/broker_failure/bin/run-test-debug.sh (revision 0) +++ system_test/broker_failure/bin/run-test-debug.sh (revision 0) @@ -0,0 +1,268 @@ +#!/bin/bash + +# ==================================== +# Do not change the followings +# (keep this section at the beginning +# of this script) +# ==================================== +readonly system_test_root=$(dirname $0)/../.. # path of /system_test +readonly common_dir=${system_test_root}/common # common util scripts for system_test +source ${common_dir}/util.sh # include the util script +readonly base_dir=$(dirname $0)/.. # the base dir of this test suite + +# ==================================== +# Change the followings as needed +# ==================================== +readonly num_kafka_source_server=1 # requires same no. of property files such as: + # $base_dir/config/server_source{1..4}.properties +# ==================================== +# No need to change the following +# configurations in most cases +# ==================================== +readonly zk_source_port=2181 # source zk port +readonly zk_target_port=2182 # target zk port +readonly test_topic=test01 # topic used in this test +readonly source_console_consumer_grp=source +readonly target_console_consumer_grp=target +readonly console_consumer_timeout_ms=15000 + +# ==================================== +# zookeeper +# ==================================== +pid_zk_source= + +# ==================================== +# kafka source +# ==================================== +kafka_source_pids= +kafka_source_prop_files= +kafka_source_log_files= +kafka_topic_creation_log_file=$base_dir/kafka_topic_creation.log + +# ==================================== +# console consumer source +# ==================================== +console_consumer_source_pid= +console_consumer_source_log=$base_dir/console_consumer_source.log +console_consumer_source_crc_log=$base_dir/console_consumer_source_crc.log +console_consumer_source_crc_sorted_log=$base_dir/console_consumer_source_crc_sorted.log +console_consumer_source_crc_sorted_uniq_log=$base_dir/console_consumer_source_crc_sorted_uniq.log + +# ==================================== +# producer +# ==================================== +background_producer_pid= +producer_performance_log=$base_dir/producer_performance.log +producer_performance_crc_log=$base_dir/producer_performance_crc.log +producer_performance_crc_sorted_log=$base_dir/producer_performance_crc_sorted.log +producer_performance_crc_sorted_uniq_log=$base_dir/producer_performance_crc_sorted_uniq.log +tmp_file_to_stop_background_producer=/tmp/tmp_file_to_stop_background_producer + +# ==================================== +# test reports +# ==================================== +checksum_diff_log=$base_dir/checksum_diff.log + + +# ==================================== +# initialize prop and log files +# ==================================== +initialize() { + for ((i=1; i<=$num_kafka_source_server; i++)) + do + kafka_source_prop_files[${i}]=$base_dir/config/server_source${i}.properties + kafka_source_log_files[${i}]=$base_dir/kafka_source${i}.log + done +} + + +# ========================================= +# cleanup +# ========================================= +cleanup() { + info "cleaning up" + + rm -rf $tmp_file_to_stop_background_producer + rm -rf $kafka_topic_creation_log_file + + rm -rf /tmp/zookeeper_source + rm -rf /tmp/zookeeper_target + + rm -rf /tmp/kafka-source{1..4}-logs + rm -rf /tmp/kafka-target{1..3}-logs + + rm -f $base_dir/zookeeper_source.log + rm -f $base_dir/zookeeper_target.log + rm -f $base_dir/kafka_source{1..4}.log + + rm -f $producer_performance_log + rm -f $producer_performance_crc_log + rm -f $producer_performance_crc_sorted_log + rm -f $producer_performance_crc_sorted_uniq_log + + rm -f $console_consumer_source_log + rm -f $console_consumer_source_crc_log + + rm -f $checksum_diff_log + + rm -f $console_consumer_source_crc_sorted_log + rm -f $console_consumer_source_crc_sorted_uniq_log +} + +# ========================================= +# shutdown_producer +# ========================================= +shutdown_producer() { + info "shutting down producer" + if [ "x${background_producer_pid}" != "x" ]; then + # kill_child_processes 0 ${background_producer_pid}; + kill -TERM ${background_producer_pid} 2> /dev/null; + fi +} + +# ========================================= +# shutdown_servers +# ========================================= +shutdown_servers() { + info "shutting down source console consumer" + if [ "x${console_consumer_source_pid}" != "x" ]; then + kill_child_processes 0 ${console_consumer_source_pid}; + fi + + # ============================================================================== + # workaround for hanging ConsoleConsumer as --consumer-timeout-ms is not working + # ============================================================================== + `ps auxw | grep ChecksumMessageFormatter | grep -v grep | awk '{print $2}' | xargs kill -9` + `ps auxw | grep NewlineMessageFormatter | grep -v grep | awk '{print $2}' | xargs kill -9` + + info "shutting down source servers" + for ((i=1; i<=$num_kafka_source_server; i++)) + do + if [ "x${kafka_source_pids[$i]}" != "x" ]; then + kill_child_processes 0 ${kafka_source_pids[$i]}; + fi + done + + info "shutting down zookeeper servers" + if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi + + # ============================================================================== + # shutting down any lingering processes + # ============================================================================== + `ps axuw | grep -i "java\|run\-" | grep -v grep | grep Kafka | grep server_target | awk '{print $2}' | xargs kill -9` + + # ============================================================================== + # workaround for hanging MirrorMaker + # ============================================================================== + `ps auxw | grep MirrorMaker | grep -v grep | awk '{print $2}' | xargs kill -9` +} + +# ========================================= +# cmp_checksum +# ========================================= +cmp_checksum() { + + cmp_result=0 + + grep ^checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log + grep checksum $producer_performance_log | tr ' ' '\n' | grep checksum | awk -F ':' '{print $2}' > $producer_performance_crc_log + + sort $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_log + sort $producer_performance_crc_log > $producer_performance_crc_sorted_log + + sort -u $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_uniq_log + sort -u $producer_performance_crc_log > $producer_performance_crc_sorted_uniq_log + + msg_count_from_source_consumer=`cat $console_consumer_source_crc_log | wc -l | tr -d ' '` + uniq_msg_count_from_source_consumer=`cat $console_consumer_source_crc_sorted_uniq_log | wc -l | tr -d ' '` + + uniq_msg_count_from_producer=`cat $producer_performance_crc_sorted_uniq_log | wc -l | tr -d ' '` + + total_msg_published=`cat $producer_performance_crc_log | wc -l | tr -d ' '` + + duplicate_msg_in_producer=$(( $total_msg_published - $uniq_msg_count_from_producer )) + + crc_only_in_producer=`comm -23 $producer_performance_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log` + + echo "" + echo "========================================================" + echo "no. of messages published : $total_msg_published" + echo "producer unique msg rec'd : $uniq_msg_count_from_producer" + echo "source consumer msg rec'd : $msg_count_from_source_consumer" + echo "source consumer unique msg rec'd : $uniq_msg_count_from_source_consumer" + echo "========================================================" + echo "" + + return $cmp_result +} + +# ========================================= +# start_test +# ========================================= +start_test() { + + msg_length=$1 + + info "starting zookeeper" + $base_dir/../../bin/zookeeper-server-start.sh \ + $base_dir/config/zookeeper_source.properties \ + 2>&1 > $base_dir/zookeeper_source.log & + pid_zk_source=$! + sleep 2 + + info "starting kafka" + $base_dir/bin/kafka-run-class.sh kafka.Kafka \ + ${kafka_source_prop_files[1]} \ + 2>&1 > ${kafka_source_log_files[1]} & + kafka_source_pids[1]=$! + sleep 2 + + info "producing 300 messages on topic '$test_topic'" + + $base_dir/bin/kafka-run-class.sh kafka.examples.QaProducer \ + -h localhost:2181 -l $msg_length -t test01 -s 50 -n 300 \ + 2>&1 >> $base_dir/producer_performance.log + + echo + info "sleeping 10 sec" + sleep 10 + + info "starting console consumers for source" + $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \ + --zookeeper localhost:$zk_source_port \ + --topic $test_topic \ + --group $source_console_consumer_grp \ + --from-beginning \ + --consumer-timeout-ms $console_consumer_timeout_ms \ + --formatter "kafka.consumer.ConsoleConsumer\$NewlineMessageFormatter" \ + 2>&1 > ${console_consumer_source_log} & + + info "sleeping 20 sec for consumer to catch up" + sleep 20 +} + + +# ========================================= +# main test begins here +# ========================================= + +if [ $# -eq 1 ]; then + msg_len=$1 +else + msg_len=50 +fi + +# initializing and cleanup +initialize +cleanup +sleep 5 + +# Ctrl-c trap. Catches INT signal +trap "shutdown_producer; shutdown_servers; cmp_checksum; exit 0" INT + +# starting the test +start_test $msg_len + +shutdown_servers + +cmp_checksum Property changes on: system_test/broker_failure/bin/run-test-debug.sh ___________________________________________________________________ Added: svn:executable + * Index: examples/src/main/java/kafka/examples/QaProducer.java =================================================================== --- examples/src/main/java/kafka/examples/QaProducer.java (revision 0) +++ examples/src/main/java/kafka/examples/QaProducer.java (revision 0) @@ -0,0 +1,107 @@ +package kafka.examples; + +import kafka.javaapi.producer.ProducerData; +import kafka.producer.ProducerConfig; +import java.util.Properties; + +import kafka.message.Message; + +public class QaProducer extends Thread implements KafkaProperties +{ + private String largeData = "x"; + private kafka.javaapi.producer.Producer producer; + private String topic = "test01"; + private String hostPort = "localhost:2181"; + private String groupId = "mygroup"; + private int sleepMs = 500; + private int numMsg = 100; + private int msgLength = 100; + private Properties props = new Properties(); + + public QaProducer(String[] args) + { + readCmdLine(args); + + StringBuilder sb = new StringBuilder(msgLength); + for (int i = 0; i < msgLength; i++) { + sb.append('x'); + } + + largeData = sb.toString(); + + System.out.println("#### topic : " + topic); + System.out.println("#### hostPort : " + hostPort); + System.out.println("#### numMsg : " + numMsg); + System.out.println("#### msgLength : " + msgLength); + + props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("zk.connect", hostPort); + + // Use random partitioner. Don't need the key type. Just set it to Integer. + // The message is of type String. + producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props)); + } + + public static void main(String[] args) { + QaProducer producerThread = new QaProducer(args); + producerThread.start(); + } + + public void run() { + for (int i = 1; i <= this.numMsg; i++) { + String messageStr = new String("checksum:" + i + ":" + largeData); + producer.send(new ProducerData(this.topic, messageStr)); + System.out.println("Message no. [" + i + "] {" + messageStr + "} sent"); + + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + producer.close(); + } + + public void readCmdLine(String args[]) { + + int i = 0; + + while(i < args.length && args[i].startsWith("-")) { + if (args[i].toLowerCase().equals("-t")) { + this.topic = args[i+1]; + i += 2; + } + else if (args[i].toLowerCase().equals("-g")) { + this.groupId = args[i+1]; + i += 2; + } + else if (args[i].toLowerCase().equals("-h")) { + this.hostPort = args[i+1]; + i += 2; + } + else if (args[i].toLowerCase().equals("-s")) { + this.sleepMs = Integer.parseInt(args[i+1]); + i += 2; + } + else if (args[i].toLowerCase().equals("-n")) { + this.numMsg = Integer.parseInt(args[i+1]); + i += 2; + } + else if (args[i].toLowerCase().equals("-l")) { + this.msgLength = Integer.parseInt(args[i+1]); + i += 2; + } + else { + System.out.println("\nUnregcognized cmd line option "+args[i]); + showHelp(); + System.exit(-1); + } + } + } + + public static void showHelp() { + System.out.println("usage: java Producer -t -h "); + System.out.println("eg: java Producer -t tests -h 127.0.0.1:2181"); + } +}