diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index a993e8c..cd4ca2f 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -249,22 +249,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness // re-close producer is fine } - /* Temporarily disables the test since it hangs occasionally on the following stacktrace. Also, the test takes too long. -"Test worker" prio=5 tid=7fb23bb48800 nid=0x10dc79000 waiting for monitor entry [10dc76000] - java.lang.Thread.State: BLOCKED (on object monitor) - at java.nio.HeapByteBuffer.slice(HeapByteBuffer.java:80) - at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:165) - at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:191) - at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:145) - at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) - at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) - at scala.collection.Iterator$class.foreach(Iterator.scala:631) - at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) - at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) - at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:399) - at kafka.utils.IteratorTemplate.toList(IteratorTemplate.scala:32) - at kafka.api.ProducerFailureHandlingTest.testBrokerFailure(ProducerFailureHandlingTest.scala:305) - /** * With replication, producer should able able to find new leader after it detects broker failure */ @@ -273,14 +257,13 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness // create topic val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers) val partition = 0 - var leader = leaders(partition) - assertTrue("Leader of partition 0 of the topic should exist", leader.isDefined) + assertTrue("Leader of partition 0 of the topic should exist", leaders(partition).isDefined) val scheduler = new ProducerScheduler() scheduler.start // rolling bounce brokers - for (i <- 0 until 5) { + for (i <- 0 until 2) { server1.shutdown() server1.awaitShutdown() server1.startup @@ -293,13 +276,24 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness Thread.sleep(2000) + // Make sure the producer do not see any exception + // in returned metadata due to broker failures assertTrue(scheduler.failed == false) + + // Make sure the leader still exists after bouncing brokers + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition) } scheduler.shutdown - leader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition, 500) - val fetchResponse = if(leader.get == server1.config.brokerId) { + // Make sure the producer do not see any exception + // when draining the left messages on shutdown + assertTrue(scheduler.failed == false) + + // double check that the leader info has been propagated after consecutive bounces + val leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic1, partition) + + val fetchResponse = if(leader == server1.config.brokerId) { consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) } else { consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) @@ -311,7 +305,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize) } - */ private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) { @@ -319,13 +312,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness var sent = 0 var failed = false - val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put(ProducerConfig.ACKS_CONFIG, (-1).toString) - producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) - producerProps.put(ProducerConfig.RETRIES_CONFIG, 10.toString) - producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString) - val producer = new KafkaProducer(producerProps) + val producer = TestUtils.createNewProducer(brokerList, bufferSize = bufferSize, retries = 10) override def doWork(): Unit = { val responses = diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index af11a49..3c2bf36 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -246,7 +246,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals("Should have offset 0", 0L, producer.send(record).get.offset) // double check that the topic is created with leader elected - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) } finally { if (producer != null) { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 034f361..4da0f2c 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -506,7 +506,7 @@ object TestUtils extends Logging { oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Option[Int] = { require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader") val startTime = System.currentTimeMillis() - var isLeaderElectedOrChanged = false; + var isLeaderElectedOrChanged = false trace("Waiting for leader to be elected or changed for partition [%s,%d], older leader is %s, new leader is %s" .format(topic, partition, oldLeaderOpt, newLeaderOpt)) @@ -603,7 +603,18 @@ object TestUtils extends Logging { byteBuffer } - def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long = 5000L) = { + + /** + * Wait until a valid leader is propagated to the metadata cache in each broker. + * It assumes that the leader propagated to each broker is the same. + * @param servers The list of servers that the metadata should reach to + * @param topic The topic name + * @param partition The partition Id + * @param timeout The amount of time waiting on this condition before assert to fail + * @return The leader of the partition. + */ + def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long = 5000L): Int = { + var leader: Int = -1 TestUtils.waitUntilTrue(() => servers.foldLeft(true) { (result, server) => @@ -611,11 +622,14 @@ object TestUtils extends Logging { partitionStateOpt match { case None => false case Some(partitionState) => - result && Request.isValidBrokerId(partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader) + leader = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader + result && Request.isValidBrokerId(leader) } }, "Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout), waitTime = timeout) + + leader } def writeNonsenseToFile(fileName: File, position: Long, size: Int) {