Description
Exception in Mirror Maker log:
=========================
[2012-06-20 10:56:04,364] DEBUG Getting broker partition info for topic test01 (kafka.producer.BrokerPartitionInfo)
[2012-06-20 10:56:04,365] INFO Fetching metadata for topic test01 (kafka.producer.BrokerPartitionInfo)
[2012-06-20 10:56:04,366] ERROR Error in handling batch of 200 events (kafka.producer.async.ProducerSendThread)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:185)
at scala.None$.get(Option.scala:183)
at kafka.producer.ProducerPool.getAnyProducer(ProducerPool.scala:76)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:73)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:45)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:129)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:95)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:94)
at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:44)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:42)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:94)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:65)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:49)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:96)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:82)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:60)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:59)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:37)
Steps to reproduce
=================
It cannot be reproduced consistently. However, running the following script 2 or 3 times (step 2) will show the error:
1. Apply kafka-306-v2.patch to 0.8 branch (revision 1352192 is used to reproduce this)
2. Under the directory <kafka home>/system_test/broker_failure, execute the following command:
=> $ bin/run-test.sh 5 0
3. Check the log under the directory <kafka home>/system_test/broker_failure:
=> $ grep Exception `ls kafka_mirror_maker*.log`
=> kafka_mirror_maker2.log:java.util.NoSuchElementException: None.get
4. Also the kafka log sizes between source and target will not match:
[/tmp] $ find kafka* -name *.kafka -ls
19400444 6104 rw-rr- 1 jfung eng 6246655 Jun 20 10:56 kafka-source4-logs/test01-0/00000000000000000000.kafka
19400819 5356 rw-rr- 1 jfung eng 5483627 Jun 20 10:56 kafka-target3-logs/test01-0/00000000000000000000.kafka
Notes about the patch kafka-306-v2.patch
===============================
This patch fix the broker_failure test suite to do the followings:
a. Start 4 kafka brokers as source cluster
b. Start 3 kafka brokers as target cluster
c. Start 3 mirror maker to enable mirroring
d. Send n messages to source cluster
e. No bouncing is performed in this test for simplicity
f. After the producer is stopped, validate the data count is matched between source & target