diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index a8b7bf7..3198cdf 100644 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -96,18 +96,11 @@ private[server] class MetadataCache { } } - def getPartitionInfos(topic: String) = { - inLock(partitionMetadataLock.readLock()) { - cache(topic) - } - } - - def containsTopicAndPartition(topic: String, - partitionId: Int): Boolean = { + def getPartitionInfo(topic: String, partitionId: Int): Option[PartitionStateInfo] = { inLock(partitionMetadataLock.readLock()) { cache.get(topic) match { - case Some(partitionInfos) => partitionInfos.contains(partitionId) - case None => false + case Some(partitionInfos) => partitionInfos.get(partitionId) + case None => None } } } diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index a8d92f6..4f6ddca 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -330,10 +330,10 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { try { // wait for the update metadata request to trickle to the brokers TestUtils.waitUntilTrue(() => - activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), + activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), "Topic test not created after timeout") assertEquals(0, partitionsRemaining.size) - var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition) + var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader assertEquals(0, leaderAfterShutdown) assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size) @@ -342,15 +342,15 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { partitionsRemaining = controller.shutdownBroker(1) assertEquals(0, partitionsRemaining.size) activeServers = servers.filter(s => s.config.brokerId == 0) - partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition) + partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader assertEquals(0, leaderAfterShutdown) - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) partitionsRemaining = controller.shutdownBroker(0) assertEquals(1, partitionsRemaining.size) // leader doesn't change since all the replicas are shut down - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) } finally { servers.foreach(_.shutdown()) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index f5a7a5b..384c74e 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -581,7 +581,15 @@ object TestUtils extends Logging { def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long = 5000L) = { TestUtils.waitUntilTrue(() => - servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic, partition)), + servers.foldLeft(true) { + (result, server) => + val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition) + partitionStateOpt match { + case None => false + case Some(partitionState) => + result && Request.isValidBrokerId(partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader) + } + }, "Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout), waitTime = timeout) }