diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index a993e8c..a178101 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -249,22 +249,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness // re-close producer is fine } - /* Temporarily disables the test since it hangs occasionally on the following stacktrace. Also, the test takes too long. -"Test worker" prio=5 tid=7fb23bb48800 nid=0x10dc79000 waiting for monitor entry [10dc76000] - java.lang.Thread.State: BLOCKED (on object monitor) - at java.nio.HeapByteBuffer.slice(HeapByteBuffer.java:80) - at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:165) - at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:191) - at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:145) - at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) - at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) - at scala.collection.Iterator$class.foreach(Iterator.scala:631) - at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) - at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) - at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:399) - at kafka.utils.IteratorTemplate.toList(IteratorTemplate.scala:32) - at kafka.api.ProducerFailureHandlingTest.testBrokerFailure(ProducerFailureHandlingTest.scala:305) - /** * With replication, producer should able able to find new leader after it detects broker failure */ @@ -280,7 +264,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness scheduler.start // rolling bounce brokers - for (i <- 0 until 5) { + for (i <- 0 until 2) { server1.shutdown() server1.awaitShutdown() server1.startup @@ -297,7 +281,9 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness } scheduler.shutdown - leader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition, 500) + + // double check that the leader info has been propagated after consecutive bounces + leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic1, partition) val fetchResponse = if(leader.get == server1.config.brokerId) { consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) @@ -311,7 +297,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize) } - */ private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) { diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index af11a49..3c2bf36 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -246,7 +246,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals("Should have offset 0", 0L, producer.send(record).get.offset) // double check that the topic is created with leader elected - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) } finally { if (producer != null) { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 034f361..a637790 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -506,7 +506,7 @@ object TestUtils extends Logging { oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Option[Int] = { require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader") val startTime = System.currentTimeMillis() - var isLeaderElectedOrChanged = false; + var isLeaderElectedOrChanged = false trace("Waiting for leader to be elected or changed for partition [%s,%d], older leader is %s, new leader is %s" .format(topic, partition, oldLeaderOpt, newLeaderOpt))