diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index a993e8c..143d7b0 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -249,22 +249,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness // re-close producer is fine } - /* Temporarily disables the test since it hangs occasionally on the following stacktrace. Also, the test takes too long. -"Test worker" prio=5 tid=7fb23bb48800 nid=0x10dc79000 waiting for monitor entry [10dc76000] - java.lang.Thread.State: BLOCKED (on object monitor) - at java.nio.HeapByteBuffer.slice(HeapByteBuffer.java:80) - at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:165) - at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:191) - at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:145) - at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) - at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) - at scala.collection.Iterator$class.foreach(Iterator.scala:631) - at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) - at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) - at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:399) - at kafka.utils.IteratorTemplate.toList(IteratorTemplate.scala:32) - at kafka.api.ProducerFailureHandlingTest.testBrokerFailure(ProducerFailureHandlingTest.scala:305) - /** * With replication, producer should able able to find new leader after it detects broker failure */ @@ -280,7 +264,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness scheduler.start // rolling bounce brokers - for (i <- 0 until 5) { + for (i <- 0 until 2) { server1.shutdown() server1.awaitShutdown() server1.startup @@ -297,7 +281,9 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness } scheduler.shutdown - leader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition, 500) + + // double check that the leader info has been propagated after consecutive bounces + leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic1, partition) val fetchResponse = if(leader.get == server1.config.brokerId) { consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) @@ -311,7 +297,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize) } - */ private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) { @@ -319,13 +304,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness var sent = 0 var failed = false - val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put(ProducerConfig.ACKS_CONFIG, (-1).toString) - producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) - producerProps.put(ProducerConfig.RETRIES_CONFIG, 10.toString) - producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString) - val producer = new KafkaProducer(producerProps) + val producer = TestUtils.createNewProducer(brokerList, bufferSize = bufferSize, retries = 10) override def doWork(): Unit = { val responses = diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index af11a49..3c2bf36 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -246,7 +246,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals("Should have offset 0", 0L, producer.send(record).get.offset) // double check that the topic is created with leader elected - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) } finally { if (producer != null) { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 034f361..e1d82aa 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -506,7 +506,7 @@ object TestUtils extends Logging { oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Option[Int] = { require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader") val startTime = System.currentTimeMillis() - var isLeaderElectedOrChanged = false; + var isLeaderElectedOrChanged = false trace("Waiting for leader to be elected or changed for partition [%s,%d], older leader is %s, new leader is %s" .format(topic, partition, oldLeaderOpt, newLeaderOpt)) @@ -603,7 +603,8 @@ object TestUtils extends Logging { byteBuffer } - def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long = 5000L) = { + def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long = 5000L): Int = { + var leader = -1 TestUtils.waitUntilTrue(() => servers.foldLeft(true) { (result, server) => @@ -611,11 +612,14 @@ object TestUtils extends Logging { partitionStateOpt match { case None => false case Some(partitionState) => - result && Request.isValidBrokerId(partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader) + leader = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader + result && Request.isValidBrokerId(leader) } }, "Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout), waitTime = timeout) + + leader } def writeNonsenseToFile(fileName: File, position: Long, size: Int) { diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index de02e47..6917ea1 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -750,7 +750,6 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): kafkaHome + "/bin/kafka-run-class.sh kafka.tools.MirrorMaker", "--consumer.config " + configPathName + "/" + mmConsumerConfigFile, "--producer.config " + configPathName + "/" + mmProducerConfigFile, - "--new.producer", "--whitelist=\".*\" >> ", logPathName + "/" + logFile + " & echo pid:$! > ", logPathName + "/entity_" + entityId + "_pid'"]