From 56dab3ddd2db9892abe822d50f1afcaaada8eb0d Mon Sep 17 00:00:00 2001 From: abhinigam Date: Mon, 23 Mar 2015 11:53:54 -0700 Subject: [PATCH] Updated the RB with Gwen's comments, Beckett's comments and a subset of Guozhang's comments --- bin/kafka-run-class.sh | 16 +- .../kafka/tools/ContinuousValidationTest.java | 536 +++++++++++++++++++++ .../scala/kafka/utils/ShutdownableThread.scala | 3 +- .../broker_upgrade/bin/test-broker-upgrade.sh | 276 +++++++++++ 4 files changed, 829 insertions(+), 2 deletions(-) create mode 100644 core/src/main/scala/kafka/tools/ContinuousValidationTest.java create mode 100644 system_test/broker_upgrade/bin/test-broker-upgrade.sh diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 881f578..65c27eb 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -20,6 +20,15 @@ then exit 1 fi +KAFKA_JAR_DIR= +if [ "$1" = "-d" ]; then + echo "Command-line arguments are $@" + KAFKA_JAR_DIR=$2 + #skip the first two arguments now that they have been processed + shift + shift +fi + base_dir=$(dirname $0)/.. # create logs directory @@ -71,7 +80,12 @@ do CLASSPATH=$CLASSPATH:$file done -for file in $base_dir/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar; +if [ "$KAFKA_JAR_DIR" = "" ]; then + echo "inside here" + KAFKA_JAR_DIR=$base_dir/core/build/libs +fi + +for file in ${KAFKA_JAR_DIR}/kafka_${SCALA_BINARY_VERSION}*.jar; do CLASSPATH=$CLASSPATH:$file done diff --git a/core/src/main/scala/kafka/tools/ContinuousValidationTest.java b/core/src/main/scala/kafka/tools/ContinuousValidationTest.java new file mode 100644 index 0000000..33af1ab --- /dev/null +++ b/core/src/main/scala/kafka/tools/ContinuousValidationTest.java @@ -0,0 +1,536 @@ +package kafka.tools; + +//import com.google.common.util.concurrent.RateLimiter; +import kafka.serializer.DefaultDecoder; +import kafka.utils.ShutdownableThread; +import org.apache.kafka.clients.producer.Callback; +import org.apache.log4j.Logger; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.KafkaStream; +import kafka.consumer.Consumer; +import kafka.message.MessageAndMetadata; +import kafka.consumer.ConsumerIterator; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.NotLeaderForPartitionException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.clients.producer.KafkaProducer; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.InterruptedException; +import java.lang.Long; +import java.lang.Override; +import java.lang.RuntimeException; +import java.lang.String; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; +import java.nio.ByteBuffer; + +import org.apache.log4j.SimpleLayout; +import org.apache.log4j.FileAppender; +import org.apache.log4j.Level; + + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +/* + * This test manages bringup and shutdown of 3 different threads + * real-time consumer which consumes continuously from the [topic,partition] from the latest offset + * bootstrap consumer which consumes from the beginning of [topic,partition] + * producer which produces sequentially increasing long numbers 1,2, .... Long.MAX_VALUE,1,2,3..... + */ +class ContinuousValidationTest { + class RateLimiter { + long nextAcquired; + double rate; + RateLimiter(double rate) { + this.rate = rate; + nextAcquired = System.currentTimeMillis(); + } + void acquire(int permit) { + //ignore everything but 1 + if(permit != 1) { + logger.fatal("Rate limiter only supports acquiring 1 permit at a time"); + System.exit(1); + } + try { + long sleepTime = nextAcquired - System.currentTimeMillis(); + System.out.println("Started sleeping here"); + Thread.sleep(sleepTime<0?0:sleepTime); + System.out.println("Stopped sleeping here"); + } catch (InterruptedException e) { + //Do nothing. This should only happend during shutdown. + } + //setup the nextAcquisition time in milliseconds since epoch + nextAcquired += 1/rate * 1000L; + } + } + private static final double defaultRate = 1.0; + private static final long defaultTimeVerificationSeconds = 10L; + private static final String defaultGroupId = String.format("ContinuousValidationTestRealTimeConsumer%d", System.currentTimeMillis()); + private static final String defaultGroupIdBootstrap = String.format("ContinuousValidationTestBootstrapConsumer%d", System.currentTimeMillis()); + + private boolean bootstrapConsumerRunning = false; + private long timeVerificationSeconds; + + private Logger logger; + + private ValidatingRealtimeConsumer consumer; + private BaseProducer producer; + private ValidatingRealtimeConsumer bootstrapConsumer; + + // Useful for printing debug messages + void validate() { + logger.debug(String.format("The event id at the receiver %d, the event was produced at %d, the delta was:%d, epoch:%d", + consumer.lastEventSeenSequenceId.get(), consumer.lastEventSeenTimeProduced.get(), consumer.lastEventSeenDelta.get(), consumer.numEpochs.get())); + logger.debug(String.format("The event id at the sender %d, epoch:%d", producer.sequenceId, producer.numEpochs.get())); + logger.debug(String.format("The event id at the bootstrap receiver %d, the event was produced at %d, the delta was:%d, epoch:%d", + bootstrapConsumer.lastEventSeenSequenceId.get(), bootstrapConsumer.lastEventSeenTimeProduced.get(), + bootstrapConsumer.lastEventSeenDelta.get(), bootstrapConsumer.numEpochs.get())); + } + + ContinuousValidationTest(String zookeeperString, String topic, String brokersInfo, double rate, long timeVerificationSeconds, FileAppender appender, + boolean batchSend, Properties consumerProperties, Properties serverProperties, int partitionToSendTo, String groupId, String groupIdBootstrap) { + this.timeVerificationSeconds = timeVerificationSeconds; + logger = Logger.getLogger(ContinuousValidationTest.class.getName()); + logger.addAppender(appender); + logger.setLevel(Level.ALL); + consumer = new ValidatingRealtimeConsumer(zookeeperString, topic, appender, false, groupId, consumerProperties,"realtime-consumer"); + producer = batchSend ? new ValidatingBatchSendProducer(brokersInfo, topic, rate, appender, serverProperties, partitionToSendTo): + new ValidatingProducer(brokersInfo, topic, rate, appender, serverProperties, partitionToSendTo); + bootstrapConsumer = new ValidatingRealtimeConsumer(zookeeperString, topic, appender, true, groupIdBootstrap, consumerProperties, "bootstrap-consumer"); + } + + // Launches the continuous producer and real-time consumer thread. + void init() { + consumer.start(); + producer.start(); + } + + // The following two methods are used for serialization of the custom message used to perform validation. + private byte[] longToBytes(long x) { + ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE); + buffer.putLong(x); + return buffer.array(); + } + + private long bytesToLong(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE); + buffer.put(bytes); + buffer.flip(); + return buffer.getLong(); + } + + // This starts a thread which rewinds to the beginning of the topic and runs indefinitely. + // Call startBootstrap() from the same thread as shutdown() + public void startBootstrap() { + bootstrapConsumerRunning = true; + bootstrapConsumer.start(); + } + + // This shutdown method stops producer and consumer threads and if the bootstrap thread has been + // started it interrupts them. It does validation to make sure the producer/consumer/bootstrap consumer + // threads are in sync. + public void shutDown() { + logger.info("State of the producer, consumer and bootstrap consumer before we start shutdown"); + validate(); + final long interruptIntervalMs=2000; + + //Stop producer. + if (!producer.isAlive()) { + logger.fatal("Producer thread died prematurely"); + return; + } + producer.initiateShutdown(); + try { + Thread.sleep(interruptIntervalMs); + if (producer.isAlive()) { + logger.fatal("Producer thread refuses to die even after interruption"); + return; + } else { + if (bootstrapConsumerRunning) { + //give the bootstrap consumer time to catchup. + if (!bootstrapConsumer.isAlive()) { + logger.fatal("Bootstrap consumer died prematurely"); + return; + } + logger.info("Waiting for the bootstrap consumer to finish for seconds:" + timeVerificationSeconds); + Thread.sleep(timeVerificationSeconds); + if(bootstrapConsumer.lastEventSeenSequenceId.get() != producer.sequenceId || + bootstrapConsumer.numEpochs.get() != producer.numEpochs.get()) + return; + } else { + //give some time for the real-time consumer to catch up + Thread.sleep(interruptIntervalMs); + } + if(!consumer.isAlive()) { + logger.fatal("Realtime consumer died prematurely"); + } + if (((consumer.lastEventSeenSequenceId.get() != producer.sequenceId) || + consumer.numEpochs.get() != producer.numEpochs.get())) { + logger.fatal("Real-time consumer has not caught up"); + return; + } + } + } catch (InterruptedException e) { + logger.fatal("Unexpected interrupted exception"); + } + //make sure everything is in sync + logger.info("State of the producer, consumer and bootstrap consumer after shutdown"); + validate(); + logger.info("SUCCESS"); + } + + /* + * Sends messages using async mode. + */ + class ValidatingBatchSendProducer extends BaseProducer { + /* + * This callback is used to log exceptions during async sends. + */ + class ExceptionLogCallback implements Callback { + private Logger logger = null; + private Long messageKey = -1L; + ExceptionLogCallback(Logger logger, long key) { + this.logger = logger; + this.messageKey = key; + } + + public void onCompletion(RecordMetadata metadata, Exception ex) { + if(ex!=null) { + logger.fatal(String.format("Producer did not send the message out with key:%d", messageKey)); + } + } + } + private KafkaProducer producer; + private String brokersInfo; + private String topic; + private RateLimiter rateLimiter; + private int partitionToSendTo; + ValidatingBatchSendProducer(String brokersInfo, String topic, double rate, FileAppender appender, Properties props, int partitionToSendTo) { + super("validating-batch-send-producer"); + logger.addAppender(appender); + logger.setLevel(Level.ALL); + this.brokersInfo = brokersInfo; + this.topic = topic; + this.partitionToSendTo = partitionToSendTo; + props.put("bootstrap.servers", brokersInfo); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + producer = new KafkaProducer(props); + //rateLimiter = RateLimiter.create(rate); + rateLimiter = new RateLimiter(rate); + } + + public void doWork() { + //Throttle the producer so as not to have any impact on the regular traffic on the broker. + rateLimiter.acquire(1); + //deal with the overflow case + if (sequenceId != Long.MAX_VALUE) + ++sequenceId; + else { + sequenceId = 1; + numEpochs.incrementAndGet(); + } + Event evt = new Event(sequenceId, System.currentTimeMillis()); + //single partition the key does not really matter + ProducerRecord record = new ProducerRecord(topic, partitionToSendTo, longToBytes(evt.sequenceId), longToBytes(evt.eventProducedTimestamp)); + producer.send(record, new ExceptionLogCallback(logger, evt.sequenceId)); + } + } + + abstract class BaseProducer extends ShutdownableThread { + BaseProducer(String threadName) { + super(threadName, true); + } + protected long sequenceId = 0L; + protected AtomicLong numEpochs = new AtomicLong(1L); + } + + /* + This producer generates events with key which is a sequentially increasing number from 1,2,3... Long.LONG_MAX + 1,2,3,..... so on and so forth. Every time we wraparound we bump the number of epochs. + */ + class ValidatingProducer extends BaseProducer { + private KafkaProducer producer; + //These are atomic so that we can read them from another thread w/o much bother. + private String brokersInfo; + private String topic; + private RateLimiter rateLimiter; + private int partitionToSendTo; + + /* + * brokersInfo is a list of IP:port pairs. + * the broker will throttle the rate at which produces the events. + */ + ValidatingProducer(String brokersInfo, String topic, double rate, FileAppender appender, Properties props, int partitionToSendTo) { + super("validating-producer-blocking-send"); + logger.addAppender(appender); + logger.setLevel(Level.ALL); + this.brokersInfo = brokersInfo; + this.topic = topic; + this.partitionToSendTo = partitionToSendTo; + props.put("bootstrap.servers", brokersInfo); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + producer = new KafkaProducer(props); + //rateLimiter = RateLimiter.create(rate); + rateLimiter = new RateLimiter(rate); + } + + //These exceptions are called by shutdownable thread framework. + public void doWork() throws InterruptedException, ExecutionException { + //Throttle the producer so as not to have any impact on the regular traffic on the broker. + rateLimiter.acquire(1); + //deal with the overflow case + if (sequenceId != Long.MAX_VALUE) + ++sequenceId; + else { + sequenceId = 1; + numEpochs.incrementAndGet(); + } + Event evt = new Event(sequenceId, System.currentTimeMillis()); + //single partition the key does not really matter + ProducerRecord record = new ProducerRecord(topic, partitionToSendTo, longToBytes(evt.sequenceId), longToBytes(evt.eventProducedTimestamp)); + //implement backoff logic here + final long sleepIntervalMs = 1000L; + boolean notLeaderForPartitionException; + do { + notLeaderForPartitionException = false; + Future future = producer.send(record); + try { + //Make sure the send succeeds. + future.get(); + } catch (ExecutionException e) { + if (e.getCause() != null && (e.getCause() instanceof NotLeaderForPartitionException || e.getCause() instanceof UnknownTopicOrPartitionException)) { + logger.error("The leader for this partition moved", e); + //wait for some time for the new leader to settle down. + try { + logger.info(String.format("Sleeping for %d milliseconds", sleepIntervalMs)); + Thread.sleep(sleepIntervalMs); + logger.info("Done sleeping"); + notLeaderForPartitionException = true; + } catch (InterruptedException ef) { + logger.error("While waiting for new leader discovery", ef); + throw ef; + } + } else { + throw e; + } + } + } while (notLeaderForPartitionException); + } + } + + class ValidatingRealtimeConsumer extends ShutdownableThread { + Logger logger = Logger.getLogger(ValidatingRealtimeConsumer.class.getName()); + final String inBufferSize = "10000"; + ConsumerConnector consumer; + KafkaStream msgStream; + AtomicLong numEpochs = new AtomicLong(0L); + AtomicLong lastEventSeenSequenceId = new AtomicLong(0L); + AtomicLong lastEventSeenTimeProduced = new AtomicLong(0L); + AtomicLong lastEventSeenDelta = new AtomicLong(0L); + Event evt; + ConsumerIterator msgIter; + ValidatingRealtimeConsumer(String zookeeperString, String topic, FileAppender appender, boolean isBeginning, String groupId, Properties props, String threadName) { + super(threadName, true); + try { + logger.addAppender(appender); + logger.setLevel(Level.ALL); + props.put("group.id", groupId); + props.put("zookeeper.connect", zookeeperString); + props.put("fetch.message.max.bytes", inBufferSize); + if (isBeginning) + props.put("auto.offset.reset", "smallest"); + ConsumerConfig config = new ConsumerConfig(props); + consumer = Consumer.createJavaConsumerConnector(config); + Map mapTopic = new HashMap(1, 1); + mapTopic.put(topic, 1); + DefaultDecoder decoder = new DefaultDecoder(null); + msgStream = consumer.createMessageStreams(mapTopic, decoder, decoder).get(topic).get(0); + msgIter = msgStream.iterator(); + } catch (RuntimeException e) { + System.out.println("Error message " + e.getMessage()); + e.printStackTrace(); + } + } + @Override + public void doWork() { + MessageAndMetadata messageMetadata = msgIter.next(); + evt = new Event(bytesToLong(messageMetadata.key()), bytesToLong(messageMetadata.message())); + if(!validate(evt)) { + logger.fatal("Unexpected event received by real-time consumer" + evt); + System.exit(1); + } + } + + boolean validate(Event evt) { + if (lastEventSeenSequenceId.get() == 0L) { + logger.info("Received first event"); + update(evt); + return true; + } + if(evt.eventProducedTimestamp < lastEventSeenTimeProduced.get() || + (evt.sequenceId= lastEventSeenSequenceId.get() && evt.eventProducedTimestamp == lastEventSeenTimeProduced.get()) { + return true; + } else if (evt.sequenceId == lastEventSeenSequenceId.get() + 1) { + update(evt); + return true; + } else { + return false; + } + } + } + + void update(Event evt) { + if (evt.sequenceId == 1L) + numEpochs.incrementAndGet(); + lastEventSeenSequenceId.set(evt.sequenceId); + lastEventSeenDelta.set(System.currentTimeMillis() - evt.eventProducedTimestamp); + lastEventSeenTimeProduced.set(evt.eventProducedTimestamp); + } + } + + class Event { + long sequenceId; + long eventProducedTimestamp; + Event(long eventId, long eventTimestamp) { + this.sequenceId = eventId; + this.eventProducedTimestamp = eventTimestamp; + } + + @Override + public String toString() { + String msg = "EventId:" + sequenceId + " timestamp:" + eventProducedTimestamp; + System.out.println(msg); + return msg; + } + } + + //start simple only need the broker and the topic name + //arguments should be timed run or infinite running process which will process signal 0. + //log file to write to. + //zookeeper string + //topic + //brokers information + //optional argument on whether to spawn a consumer which consumes from the very beginning. + public static void main(String args[]) { + OptionParser parser = new OptionParser(); + parser.accepts("zookeeperString").withRequiredArg().ofType(String.class); + parser.accepts("brokersInfo").withRequiredArg().ofType(String.class); + parser.accepts("topic").withRequiredArg().ofType(String.class); + parser.accepts("partition").withRequiredArg().ofType(Integer.class); + parser.accepts("fileName").withRequiredArg().ofType(String.class); + parser.accepts("serverProperties").withOptionalArg().ofType(File.class); + parser.accepts("consumerProperties").withOptionalArg().ofType(File.class); + parser.accepts("timedRun").withOptionalArg().ofType(Long.class); + parser.accepts("timeToSpawn").withOptionalArg().ofType(Long.class); + parser.accepts("timeVerificationSeconds").withOptionalArg().ofType(Long.class); + parser.accepts("batchSend").withOptionalArg(); + parser.accepts("realtimeConsumerGroupId").withOptionalArg().ofType(String.class); + parser.accepts("bootstrapConsumerGroupId").withOptionalArg().ofType(String.class); + parser.accepts("signalFileToParent").withOptionalArg().ofType(String.class); + parser.accepts("signalFileFromParent").withOptionalArg().ofType(String.class); + + OptionSet options = parser.parse(args); + //need to keep compiler happy by having the casts. + long timeToRun = options.has("timedRun") ? (Long)options.valueOf("timedRun") : -1; + long timeToSpawn = options.has("timeToSpawn") ? (Long)options.valueOf("timeToSpawn") : -1; + double rate = options.has("rate") ? (Double)options.valueOf("rate") : ContinuousValidationTest.defaultRate; + long timeVerificationSeconds = options.has("timeVerificationSeconds") ? (Long)options.valueOf("timeVerificationSeconds"): ContinuousValidationTest.defaultTimeVerificationSeconds; + + try { + SimpleLayout layout = new SimpleLayout(); + FileAppender appender = new FileAppender(layout, (String)options.valueOf("fileName"), false); + appender.setImmediateFlush(true); + + Properties serverProperties = new Properties(); + Properties consumerProperties = new Properties(); + if (options.has("serverProperties")) + serverProperties.load(new FileInputStream((File)options.valueOf("serverProperties"))); + if (options.has("consumerProperties")) + consumerProperties.load(new FileInputStream((File)options.valueOf("consumerProperties"))); + String realtimeGroupId = options.has("realtimeConsumerGroupId") ? (String)options.valueOf("realtimeConsumerGroupId"):ContinuousValidationTest.defaultGroupId; + String bootstrapGroupId = options.has("bootstrapConsumerGroupId") ? (String)options.valueOf("bootstrapConsumerGroupId"):ContinuousValidationTest.defaultGroupIdBootstrap; + final ContinuousValidationTest test = new ContinuousValidationTest((String)options.valueOf("zookeeperString"), + (String)options.valueOf("topic"), + (String)options.valueOf("brokersInfo"), + rate, + timeVerificationSeconds, + appender, + options.has("batchSend"), + consumerProperties, + serverProperties, + (Integer)options.valueOf("partition"), + realtimeGroupId, + bootstrapGroupId + ); + // We would need to take bootstrap consumer and regular consumer groupIds. Additionally we would need file to signal to parent program. + // and a file to receive signal from parent program. + // I need to signal to the parent program that my consumer has been instantiated. + if (options.has("signalFileToParent")) { + File file = new File((String) options.valueOf("signalFileToParent")); + file.delete(); + System.out.println("Signalled to the parent that my consumer has been created." + Thread.currentThread().getId()); + } + // now I need to wait for my parent process to signal that it is ready to start the producer. + // poll/while this parent files is available + if (options.has("signalFileFromParent")) { + System.out.println("Awaiting parent to signal that it is ok to consume events." + Thread.currentThread().getId()); + File file1 = new File((String) options.valueOf("signalFileFromParent")); + while (!file1.exists()) Thread.sleep(100); + System.out.println("Starting the producer/consumer." + Thread.currentThread().getId()); + } + + test.init(); + if (timeToSpawn != -1) { + Thread.sleep(timeToSpawn); + test.startBootstrap(); + } + if (timeToRun == -1) { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + test.shutDown(); + } + }); + } else { + Thread.sleep(timeToRun); + test.shutDown(); + } + } catch (FileNotFoundException e) { + e.printStackTrace(); + System.exit(1); + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); + } catch (InterruptedException e) { + e.printStackTrace(); + System.exit(1); + } + //some kafka threads are not daemon threads and require explicit shutdown. Simple workaround since + //I will re-build state later and do not care about clean shutdown. + System.exit(0); + } +} diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala index fc226c8..472ecc1 100644 --- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala +++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala @@ -18,7 +18,7 @@ package kafka.utils import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{TimeUnit, CountDownLatch} abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true) extends Thread(name) with Logging { @@ -51,6 +51,7 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean info("Shutdown completed") } + @throws[Throwable] ("This could be any exception but is primarily for java to override this and not run into checked exceptions issue") def doWork(): Unit override def run(): Unit = { diff --git a/system_test/broker_upgrade/bin/test-broker-upgrade.sh b/system_test/broker_upgrade/bin/test-broker-upgrade.sh new file mode 100644 index 0000000..254c1ae --- /dev/null +++ b/system_test/broker_upgrade/bin/test-broker-upgrade.sh @@ -0,0 +1,276 @@ +# ==================================== +# Do not change the followings +# (keep this section at the beginning +# of this script) +# ==================================== +readonly system_test_root=$(dirname $0)/../.. # path of /system_test +readonly common_dir=${system_test_root}/common # common util scripts for system_test +source ${common_dir}/util.sh # include the util script + +readonly base_dir=$(dirname $0)/.. # the base dir of this test suite +# ==================================== +# No need to change the following +# configurations in most cases +# ==================================== +readonly test_topic=Norbert1234 +readonly realtime_groupid=ContinuousValidationTestRealTimeConsumer +readonly bootstrap_groupid=ContinuousValidationTestBootstrapConsumer +# ==================================== +# zookeeper +# ==================================== +pid_zk_source= +zk_source_port=2181 + +# ==================================== +# kafka source +# ==================================== +kafka_pids= +server1_port= +server2_port= +server_properties= +server_log= +kafka_topic_creation_log_file=$base_dir/kafka_topic_creation.log +continuous_validation_test_log_file[1]=$base_dir/validationtest.log +continuous_validation_test_log_file[2]=$base_dir/validationtest1.log + +# =============================================== +# notification files to bind consumer to partition +# =============================================== +readonly signal_from_cvt=/tmp/signalFromCVT +readonly signal_to_cvt=/tmp/signalToCVT + +# ==================================== +# java process +# ==================================== +test_program_logfile= +pid_test_program= + +# =============================================== +# poll for existence of signal_from_cvt file +# =============================================== +wait_for_cvt_init() { + while [ -f ${signal_from_cvt} ] ; + do + echo "file still exists" + sleep 2 + done +} + +# ==================================== +# validate if the test program ran +# successfully +# ==================================== +validate_test_program_logs() { + #grep for fatal in the logs. + for i in `seq 1 2`; + do + grep SUCCESS ${continuous_validation_test_log_file[$i]} + if [ $? -ne 0 ]; then + echo "ContinuousValidationTest failed" + cleanup_procs + exit 1 + fi + done +} + +cleanup_procs() { + kill -9 ${pid_zk_source} + kill -9 ${kafka_pids[1]} + kill -9 ${kafka_pids[2]} +} + +cleanup_logs() { + rm $base_dir/zookeeper_source.log + rm ${server_log[1]} + rm ${server_log[2]} + rm /tmp/ContinuousValidationTest1 + rm /tmp/ContinuousValidationTest2 + rm -rf /tmp/kafka-logs1 + rm -rf /tmp/kafka-logs2 + rm -rf /tmp/zookeeper_source + rm ${signal_from_cvt} + rm ${signal_to_cvt} + rm ${kafka_topic_creation_log_file} + rm ${continuous_validation_test_log_file[1]} + rm ${continuous_validation_test_log_file[2]} +} + +#TODO split this into clean pids and clean log files. +#In case of failure we want to clean pids but leave the log files. +#We also want to cleanup the log files before starting this test. +cleanup() { + cleanup_procs + + cleanup_logs +} + +# ==================================== +# initialize prop and log files +# ==================================== +initialize() { + info "started initialization" + for ((i=1; i<=2; i++)) + do + server_properties[${i}]=$base_dir/configs/server${i}.properties + server_log[${i}]=$base_dir/server${i}.log + done + info "done initialization" +} + + +# ========================================= +# start_zk +# ========================================= +start_zk() { + info "starting zookeepers" + + $base_dir/../../bin/zookeeper-server-start.sh \ + $base_dir/configs/zookeeper_source.properties \ + 2>&1 > $base_dir/zookeeper_source.log & + pid_zk_source=$! + info "done with zookeeper setup $pid_zk_source" +} + +# ========================================= +# start_source_server +# ========================================= +start_source_server() { + info "starting kafka server" + + s_idx=$1 + $base_dir/../../bin/kafka-run-class.sh -d $2 kafka.Kafka \ + ${server_properties[$s_idx]} \ + 2>&1 >> "${server_log[$s_idx]}" & + kafka_pids[${s_idx}]=$! + + info "done with kafka server setup" +} + +# ========================================= +# start_continuous_validation_test +# ========================================= +start_continuous_validation_test() { + info "starting the continuous validation test: $@" + + sourceJars=$1 + cvt_idx=$2 + let "partition = $cvt_idx - 1" + shift + shift + $base_dir/bin/kafka-run-class.sh ${sourceJars} kafka.tools.ContinuousValidationTest $@ -partition $partition -fileName ${continuous_validation_test_log_file[$cvt_idx]} \ + 2>&1 >> "/tmp/ContinuousValidationTest$cvt_idx" & + + pid_test_program[$cvt_idx]=$! +} + +# ========================================= +# create_topic +# ========================================= +create_topic() { + this_zk_conn_str=$2 + this_replica_factor=$3 + this_topic_to_create=$1 + + info "creating topic [$topic_name_test] on [$this_zk_conn_str]" + $base_dir/../../bin/kafka-topics.sh \ + --create \ + --topic $this_topic_to_create \ + --zookeeper $this_zk_conn_str \ + --replication-factor $this_replica_factor \ + --partitions 2 \ + 2> $kafka_topic_creation_log_file +} + +# ========================================= +# await_reentry_isr_num_brokers +# ========================================= +await_reentry_isr_num_brokers() { + #await re-entry ISR. Essentially emulate what the SREs do. + num_retries=10 + brokernumexpected=$1 + echo "Awaiting broker $brokerid to become part of ISR" + while [ ${num_retries} -gt 0 ]; + do + num_retries=$[$num_retries-1] + test=`$base_dir/../../bin/kafka-topics.sh --describe --topic Norbert1234 --zookeeper localhost:2181 | grep Isr` + if [ $? -ne 0 ]; then + continue + fi + #make sure the list of brokers contains the given broker id + broker1=`$base_dir/../../bin/kafka-topics.sh --describe --topic Norbert1234 --zookeeper localhost:2181 | grep Isr | cut -d' ' -f6 | cut -d',' -f1` + broker2=`$base_dir/../../bin/kafka-topics.sh --describe --topic Norbert1234 --zookeeper localhost:2181 | grep Isr | cut -d' ' -f6 | cut -d',' -f2` + brokernumActual=0 + if [ -n ${broker1+1} ]; then + let brokernumActual+=1 + fi + if [ -n ${broker2+1} ]; then + let brokernumActual+=1 + fi + if [ $brokernumActual -lt $brokernumexpected ]; then + #wait before retrying + echo "BrokernumActual $brokernumActual, brokernumexpected $brokernumexpected" + sleep 1 + else + return + fi + done + echo "Not able to get back into ISR even after 10 seconds" + cleanup + exit 1 +} + +initialize +cleanup_logs +start_zk +#serializing the broker bringup order to make management easier." +start_source_server 1 ${1} +echo "Wait for broker 1 to become available." +sleep 10 +echo "Done." +start_source_server 2 ${1} +echo "Wait for broker 2 to become available." +sleep 10 +echo "Done." +create_topic ${test_topic} localhost:${zk_source_port} 2 +#Make sure both the brokers are ready to serve the partition. +await_reentry_isr_num_brokers 2 +touch ${signal_from_cvt} +echo "Starting CVT instance 1" +start_continuous_validation_test ${1} 1 -zookeeperString "localhost:${zk_source_port}" -topic ${test_topic} -brokersInfo "localhost:9092,localhost:9093" -timeToSpawn 5000 -timedRun 20000 -signalFileToParent ${signal_from_cvt} -signalFileFromParent ${signal_to_cvt} -realtimeConsumerGroupId ${realtime_groupid} -bootstrapConsumerGroupId ${bootstrap_groupid} +#TODO wait for file to be created by CVT +echo "Waiting for CVT 1 to initialize consumer" +wait_for_cvt_init +touch ${signal_from_cvt} +echo "Starting CVT instance 2" +start_continuous_validation_test ${1} 2 -zookeeperString "localhost:${zk_source_port}" -topic ${test_topic} -brokersInfo "localhost:9092,localhost:9093" -timeToSpawn 5000 -timedRun 20000 -signalFileToParent ${signal_from_cvt} -signalFileFromParent ${signal_to_cvt} -realtimeConsumerGroupId ${realtime_groupid} -bootstrapConsumerGroupId ${bootstrap_groupid} +echo "Waiting for CVT 2 to initialize consumer" +wait_for_cvt_init +#notify the cvts that they should start running the producers +touch ${signal_to_cvt} +echo "Killing first broker" +kill_child_processes 0 ${kafka_pids[1]} +echo "Waiting for kafka broker to die" +wait ${kafka_pids[1]} +echo "Done." +echo "Deploy broker with a different jar" +start_source_server 1 ${2} +echo "Waiting for broker 1 to become available again." +#waiting for the ISR to include this broker for this partition +await_reentry_isr_num_brokers 2 + +echo "Killing the second broker" +kill_child_processes 0 ${kafka_pids[2]} +echo "Waiting for broker to die" +wait ${kafka_pids[2]} +echo "Done" +echo "Starting the second kafka process with different jar" +start_source_server 2 ${2} +#wait for the continuous validation test process to finish. +echo "Waiting for the continuous validation test process to finish.".${pid_test_program} +wait ${pid_test_program[1]} +wait ${pid_test_program[2]} +echo "Done" +validate_test_program_logs +#TODO add cleanup logic remove the log files, kill the processes +cleanup +exit 0 -- 1.8.2.1