diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 3a6c5ff..bc7ec09 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -159,13 +159,13 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; - }, 1000) + }) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) // in sync replicas should not have any replica that is not in the new assigned replicas checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) - assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000)) + TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet) servers.foreach(_.shutdown()) } @@ -188,12 +188,12 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; - }, 1000) + }) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) - assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000)) + TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet) servers.foreach(_.shutdown()) } @@ -216,12 +216,12 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; - }, 2000) + }) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) - assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000)) + TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet) servers.foreach(_.shutdown()) } @@ -256,13 +256,15 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { reassignPartitionsCommand.reassignPartitions // create brokers val servers = TestUtils.createBrokerConfigs(2).map(b => TestUtils.createServer(new KafkaConfig(b))) - TestUtils.waitUntilTrue(() => checkIfReassignPartitionPathExists(zkClient), 1000) + + // wait until reassignment completes + TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient)) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) // ensure that there are no under replicated partitions ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) - assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000)) + TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet) servers.foreach(_.shutdown()) } @@ -318,8 +320,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { var activeServers = servers.filter(s => s.config.brokerId != 2) try { // wait for the update metadata request to trickle to the brokers - assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() => - activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000)) + TestUtils.waitUntilTrue(() => + activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), + msg = "Topic test not created after timeout") assertEquals(0, partitionsRemaining.size) var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition) var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 6d489ad..e7ea6cb 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -234,17 +234,21 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with } // wait until the messages are published - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test1", 0)).get.logEndOffset == 2 }, 1000) - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test2", 0)).get.logEndOffset == 2 }, 1000) - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test3", 0)).get.logEndOffset == 2 }, 1000) - TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test4", 0)).get.logEndOffset == 2 }, 1000) + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test1", 0)).get.logEndOffset == 2 }) + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test2", 0)).get.logEndOffset == 2 }) + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test3", 0)).get.logEndOffset == 2 }) + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test4", 0)).get.logEndOffset == 2 }) val replicaId = servers.head.config.brokerId val hwWaitMs = config.replicaHighWatermarkCheckpointIntervalMs - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark == 2 }, hwWaitMs) - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark == 2 }, hwWaitMs) - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark == 2 }, hwWaitMs) - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark == 2 }, hwWaitMs) + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark == 2 }, + waitTime = hwWaitMs) + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark == 2 }, + waitTime = hwWaitMs) + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark == 2 }, + waitTime = hwWaitMs) + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark == 2 }, + waitTime = hwWaitMs) // test if the consumer received the messages in the correct order when producer has enabled request pipelining val request = builder.build() diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index c3b1ac4..3086051 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -119,10 +119,8 @@ class SocketServerTest extends JUnitSuite { server.requestChannel.sendResponse(new RequestChannel.Response(0, request, null)) // After the response is sent to the client (which is async and may take a bit of time), the socket key should be available for reads. - Assert.assertTrue( - TestUtils.waitUntilTrue( - () => { (request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ }, - 5000)) + TestUtils.waitUntilTrue( + () => { (request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ }) } @Test(expected = classOf[SocketException]) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index c1219a8..6a27e0d 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -307,8 +307,9 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // create topic AdminUtils.createTopic(zkClient, "new-topic", 2, 1) - assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => - AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) + TestUtils.waitUntilTrue(() => + AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime, + msg = "Topic new-topic not created after timeout") TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0) producer.send(new KeyedMessage[String, String]("new-topic", "key", null)) diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index b278bb6..c619e1d 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -132,7 +132,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { staleControllerEpoch, 0, "") controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback) - TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000) + TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true) assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected) controllerChannelManager.shutdown() diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 3fb08e6..93e31d9 100644 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -82,8 +82,8 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.createTopic(zkClient, topic, 1, 1) val logManager = server.getLogManager - assertTrue("Log for partition [topic,0] should be created", - waitUntilTrue(() => logManager.getLog(TopicAndPartition(topic, part)).isDefined, 5000)) + waitUntilTrue(() => logManager.getLog(TopicAndPartition(topic, part)).isDefined, + msg = "Log for partition [topic,0] should be created") val log = logManager.getLog(TopicAndPartition(topic, part)).get val message = new Message(Integer.toString(42).getBytes()) @@ -94,8 +94,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10) assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets) - assertTrue("Leader should be elected", - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 5000)) + waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), msg = "Leader should be elected") val topicAndPartition = TopicAndPartition(topic, part) val offsetRequest = OffsetRequest( Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)), @@ -158,8 +157,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10) assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets) - assertTrue("Leader should be elected", - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 5000)) + waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), msg = "Leader should be elected") val topicAndPartition = TopicAndPartition(topic, part) val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0) val consumerOffsets = @@ -187,8 +185,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(Seq(0L), offsets) - assertTrue("Leader should be elected", - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 5000)) + waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), msg = "Leader should be elected") val topicAndPartition = TopicAndPartition(topic, part) val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10))) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 7a0ef6f..106fd04 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -77,9 +77,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { sendMessages(numMessages.toInt) // give some time for the follower 1 to record leader HW - assertTrue("Failed to update highwatermark for follower after 1000 ms", - TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages, 10000)) + TestUtils.waitUntilTrue(() => + server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages, + msg = "Failed to update high watermark for follower after timeout") servers.foreach(server => server.replicaManager.checkpointHighWatermarks()) producer.close() @@ -135,8 +135,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { hw += 1 // give some time for follower 1 to record leader HW of 60 - assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000)) + TestUtils.waitUntilTrue(() => + server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + msg = "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) producer.close() @@ -161,8 +162,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { sendMessages(20) var hw = 20L // give some time for follower 1 to record leader HW of 600 - assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000)) + TestUtils.waitUntilTrue(() => + server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + msg = "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) producer.close() @@ -191,8 +193,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { var hw = 2L // allow some time for the follower to get the leader HW - assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000)) + TestUtils.waitUntilTrue(() => + server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + msg = "Failed to update high watermark for follower after timeout") // kill the server hosting the preferred replica server1.shutdown() server2.shutdown() @@ -216,8 +219,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { hw += 2 // allow some time for the follower to get the leader HW - assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => - server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000)) + TestUtils.waitUntilTrue(() => + server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + msg = "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) producer.close() diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 481a400..7c09077 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -71,6 +71,6 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { } result } - assertTrue("Broker logs should be identical", waitUntilTrue(logsMatch, 6000)) + waitUntilTrue(logsMatch, msg = "Broker logs should be identical") } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 130b6be..eb2d7a5 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -533,15 +533,15 @@ object TestUtils extends Logging { } /** - * Wait until the given condition is true or the given wait time ellapses + * Wait until the given condition is true or throw an exception if the given wait time elapses. */ - def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = { + def waitUntilTrue(condition: () => Boolean, waitTime: Long = 5000L, msg: String = null): Boolean = { val startTime = System.currentTimeMillis() while (true) { if (condition()) return true if (System.currentTimeMillis() > startTime + waitTime) - return false + throw new RuntimeException("Condition is not satisfied after %d ms".format(waitTime)) Thread.sleep(waitTime.min(100L)) } // should never hit here @@ -570,9 +570,10 @@ object TestUtils extends Logging { } def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long = 5000L) = { - assertTrue("Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout), - TestUtils.waitUntilTrue(() => - servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic, partition)), timeout)) + TestUtils.waitUntilTrue(() => + servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic, partition)), + waitTime = timeout, + msg = "Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout)) } def writeNonsenseToFile(fileName: File, position: Long, size: Int) { @@ -600,14 +601,22 @@ object TestUtils extends Logging { def ensureNoUnderReplicatedPartitions(zkClient: ZkClient, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int], servers: Seq[KafkaServer]) { - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) - assertFalse("Reassigned partition [%s,%d] is underreplicated".format(topic, partitionToBeReassigned), - inSyncReplicas.size < assignedReplicas.size) - val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionToBeReassigned) - assertTrue("Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned), leader.isDefined) - val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head - assertTrue("Reassigned partition [%s,%d] is underreplicated as reported by the leader %d".format(topic, partitionToBeReassigned, leader.get), - leaderBroker.replicaManager.underReplicatedPartitionCount() == 0) + TestUtils.waitUntilTrue(() => { + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) + inSyncReplicas.size == assignedReplicas.size + }, + msg = "Reassigned partition [%s,%d] is underreplicated".format(topic, partitionToBeReassigned)) + var leader: Option[Int] = None + TestUtils.waitUntilTrue(() => { + leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionToBeReassigned) + leader.isDefined + }, + msg = "Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned)) + TestUtils.waitUntilTrue(() => { + val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head + leaderBroker.replicaManager.underReplicatedPartitionCount() == 0 + }, + msg = "Reassigned partition [%s,%d] is under-replicated as reported by the leader %d".format(topic, partitionToBeReassigned, leader.get)) } def checkIfReassignPartitionPathExists(zkClient: ZkClient): Boolean = {