diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 3a6c5ff..a8d92f6 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -156,16 +156,18 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, - Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; - }, 1000) + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; + }, + "Partition reassignment should complete") val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) // in sync replicas should not have any replica that is not in the new assigned replicas checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) - assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000)) + TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, + "New replicas should exist on brokers") servers.foreach(_.shutdown()) } @@ -185,15 +187,18 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, - Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; - }, 1000) + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; + }, + "Partition reassignment should complete") val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) - assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000)) + TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, + "New replicas should exist on brokers") + servers.foreach(_.shutdown()) } @@ -213,15 +218,17 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, - Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; - }, 2000) + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; + }, + "Partition reassignment should complete") val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) - assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000)) + TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, + "New replicas should exist on brokers") servers.foreach(_.shutdown()) } @@ -256,13 +263,17 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { reassignPartitionsCommand.reassignPartitions // create brokers val servers = TestUtils.createBrokerConfigs(2).map(b => TestUtils.createServer(new KafkaConfig(b))) - TestUtils.waitUntilTrue(() => checkIfReassignPartitionPathExists(zkClient), 1000) + + // wait until reassignment completes + TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient), + "Partition reassignment should complete") val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) // ensure that there are no under replicated partitions ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) - assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000)) + TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, + "New replicas should exist on brokers") servers.foreach(_.shutdown()) } @@ -318,8 +329,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { var activeServers = servers.filter(s => s.config.brokerId != 2) try { // wait for the update metadata request to trickle to the brokers - assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() => - activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000)) + TestUtils.waitUntilTrue(() => + activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), + "Topic test not created after timeout") assertEquals(0, partitionsRemaining.size) var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition) var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 6d489ad..97e3b14 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -234,17 +234,24 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with } // wait until the messages are published - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test1", 0)).get.logEndOffset == 2 }, 1000) - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test2", 0)).get.logEndOffset == 2 }, 1000) - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test3", 0)).get.logEndOffset == 2 }, 1000) - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test4", 0)).get.logEndOffset == 2 }, 1000) + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test1", 0)).get.logEndOffset == 2 }, + "Published messages should be in the log") + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test2", 0)).get.logEndOffset == 2 }, + "Published messages should be in the log") + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test3", 0)).get.logEndOffset == 2 }, + "Published messages should be in the log") + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test4", 0)).get.logEndOffset == 2 }, + "Published messages should be in the log") val replicaId = servers.head.config.brokerId - val hwWaitMs = config.replicaHighWatermarkCheckpointIntervalMs - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark == 2 }, hwWaitMs) - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark == 2 }, hwWaitMs) - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark == 2 }, hwWaitMs) - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark == 2 }, hwWaitMs) + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark == 2 }, + "High watermark should equal to log end offset") + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark == 2 }, + "High watermark should equal to log end offset") + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark == 2 }, + "High watermark should equal to log end offset") + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark == 2 }, + "High watermark should equal to log end offset") // test if the consumer received the messages in the correct order when producer has enabled request pipelining val request = builder.build() diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index c3b1ac4..75bd41b 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -119,10 +119,9 @@ class SocketServerTest extends JUnitSuite { server.requestChannel.sendResponse(new RequestChannel.Response(0, request, null)) // After the response is sent to the client (which is async and may take a bit of time), the socket key should be available for reads. - Assert.assertTrue( - TestUtils.waitUntilTrue( - () => { (request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ }, - 5000)) + TestUtils.waitUntilTrue( + () => { (request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ }, + "Socket key should be available for reads") } @Test(expected = classOf[SocketException]) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index dc6a5ea..c8f8f2c 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -313,8 +313,10 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // create topic AdminUtils.createTopic(zkClient, "new-topic", 2, 1) - assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => - AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) + TestUtils.waitUntilTrue(() => + AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, + "Topic new-topic not created after timeout", + waitTime = zookeeper.tickTime) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0) producer.send(new KeyedMessage[String, String]("new-topic", "key", null)) diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index b278bb6..25dffcf 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -132,7 +132,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { staleControllerEpoch, 0, "") controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback) - TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000) + TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, + "Controller epoch should be stale") assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected) controllerChannelManager.shutdown() diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 3fb08e6..9556ed9 100644 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -82,8 +82,8 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.createTopic(zkClient, topic, 1, 1) val logManager = server.getLogManager - assertTrue("Log for partition [topic,0] should be created", - waitUntilTrue(() => logManager.getLog(TopicAndPartition(topic, part)).isDefined, 5000)) + waitUntilTrue(() => logManager.getLog(TopicAndPartition(topic, part)).isDefined, + "Log for partition [topic,0] should be created") val log = logManager.getLog(TopicAndPartition(topic, part)).get val message = new Message(Integer.toString(42).getBytes()) @@ -94,8 +94,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10) assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets) - assertTrue("Leader should be elected", - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 5000)) + waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") val topicAndPartition = TopicAndPartition(topic, part) val offsetRequest = OffsetRequest( Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)), @@ -158,8 +157,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10) assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets) - assertTrue("Leader should be elected", - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 5000)) + waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") val topicAndPartition = TopicAndPartition(topic, part) val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0) val consumerOffsets = @@ -187,8 +185,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(Seq(0L), offsets) - assertTrue("Leader should be elected", - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 5000)) + waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") val topicAndPartition = TopicAndPartition(topic, part) val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10))) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 7a0ef6f..1b87acf 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -77,9 +77,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { sendMessages(numMessages.toInt) // give some time for the follower 1 to record leader HW - assertTrue("Failed to update highwatermark for follower after 1000 ms", - TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages, 10000)) + TestUtils.waitUntilTrue(() => + server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages, + "Failed to update high watermark for follower after timeout") servers.foreach(server => server.replicaManager.checkpointHighWatermarks()) producer.close() @@ -135,8 +135,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { hw += 1 // give some time for follower 1 to record leader HW of 60 - assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000)) + TestUtils.waitUntilTrue(() => + server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) producer.close() @@ -161,8 +162,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { sendMessages(20) var hw = 20L // give some time for follower 1 to record leader HW of 600 - assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000)) + TestUtils.waitUntilTrue(() => + server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) producer.close() @@ -191,8 +193,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { var hw = 2L // allow some time for the follower to get the leader HW - assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000)) + TestUtils.waitUntilTrue(() => + server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + "Failed to update high watermark for follower after timeout") // kill the server hosting the preferred replica server1.shutdown() server2.shutdown() @@ -216,8 +219,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { hw += 2 // allow some time for the follower to get the leader HW - assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => - server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000)) + TestUtils.waitUntilTrue(() => + server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) producer.close() diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 481a400..faf466b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -71,6 +71,6 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { } result } - assertTrue("Broker logs should be identical", waitUntilTrue(logsMatch, 6000)) + waitUntilTrue(logsMatch, "Broker logs should be identical") } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 130b6be..498941d 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -533,15 +533,15 @@ object TestUtils extends Logging { } /** - * Wait until the given condition is true or the given wait time ellapses + * Wait until the given condition is true or throw an exception if the given wait time elapses. */ - def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = { + def waitUntilTrue(condition: () => Boolean, msg: String, waitTime: Long = 5000L): Boolean = { val startTime = System.currentTimeMillis() while (true) { if (condition()) return true if (System.currentTimeMillis() > startTime + waitTime) - return false + fail(msg) Thread.sleep(waitTime.min(100L)) } // should never hit here @@ -570,9 +570,10 @@ object TestUtils extends Logging { } def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long = 5000L) = { - assertTrue("Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout), - TestUtils.waitUntilTrue(() => - servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic, partition)), timeout)) + TestUtils.waitUntilTrue(() => + servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic, partition)), + "Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout), + waitTime = timeout) } def writeNonsenseToFile(fileName: File, position: Long, size: Int) { @@ -600,14 +601,22 @@ object TestUtils extends Logging { def ensureNoUnderReplicatedPartitions(zkClient: ZkClient, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int], servers: Seq[KafkaServer]) { - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) - assertFalse("Reassigned partition [%s,%d] is underreplicated".format(topic, partitionToBeReassigned), - inSyncReplicas.size < assignedReplicas.size) - val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionToBeReassigned) - assertTrue("Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned), leader.isDefined) - val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head - assertTrue("Reassigned partition [%s,%d] is underreplicated as reported by the leader %d".format(topic, partitionToBeReassigned, leader.get), - leaderBroker.replicaManager.underReplicatedPartitionCount() == 0) + TestUtils.waitUntilTrue(() => { + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) + inSyncReplicas.size == assignedReplicas.size + }, + "Reassigned partition [%s,%d] is under replicated".format(topic, partitionToBeReassigned)) + var leader: Option[Int] = None + TestUtils.waitUntilTrue(() => { + leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionToBeReassigned) + leader.isDefined + }, + "Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned)) + TestUtils.waitUntilTrue(() => { + val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head + leaderBroker.replicaManager.underReplicatedPartitionCount() == 0 + }, + "Reassigned partition [%s,%d] is under-replicated as reported by the leader %d".format(topic, partitionToBeReassigned, leader.get)) } def checkIfReassignPartitionPathExists(zkClient: ZkClient): Boolean = {