From e01d19c03c5fff05ea52a2c321bb4e378d1454ac Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 8 Jan 2013 20:16:46 -0500 Subject: [PATCH] KAFKA-690 Return all topics if non are specified in TopicMetadataRequest Also fix a problematic debug statement --- .../scala/kafka/api/TopicMetadataRequest.scala | 1 - core/src/main/scala/kafka/server/KafkaApis.scala | 7 ++++- .../unit/kafka/integration/TopicMetadataTest.scala | 31 ++++++++++++++++---- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 5bdb2c1..e659532 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -40,7 +40,6 @@ object TopicMetadataRequest extends Logging { for(i <- 0 until numTopics) topics += readShortString(buffer) val topicsList = topics.toList - debug("topic = %s".format(topicsList.head)) new TopicMetadataRequest(versionId, clientId, topics.toList, correlationId) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 4283973..11e8e8e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -509,7 +509,12 @@ class KafkaApis(val requestChannel: RequestChannel, val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]() val config = replicaManager.config - val uniqueTopics = metadataRequest.topics.toSet + val uniqueTopics = { + if(metadataRequest.topics.size > 0) + metadataRequest.topics.toSet + else + ZkUtils.getAllTopics(zkClient).toSet + } val topicMetadataList = AdminUtils.fetchTopicMetadataFromZk(uniqueTopics, zkClient) topicMetadataList.foreach( topicAndMetadata => { diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 230119b..fc76234 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -70,7 +70,27 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { 0 -> configs.head.brokerId ) TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) - val topicMetadata = mockLogManagerAndTestTopic(topic) + val topicMetadataRequest = new TopicMetadataRequest(List(topic)) + val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest) + assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size) + assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic) + val partitionMetadata = topicMetadata.head.partitionsMetadata + assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size) + assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId) + assertEquals(1, partitionMetadata.head.replicas.size) + } + + def testGetAllTopicMetadata { + // create topic + val topic = "test" + CreateTopicCommand.createTopic(zkClient, topic, 1) + // set up leader for topic partition 0 + val leaderForPartitionMap = Map( + 0 -> configs.head.brokerId + ) + TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) + val topicMetadataRequest = new TopicMetadataRequest(List()) + val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest) assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size) assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic) val partitionMetadata = topicMetadata.head.partitionsMetadata @@ -83,7 +103,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { // auto create topic val topic = "test" - val topicMetadata = mockLogManagerAndTestTopic(topic) + val topicMetadataRequest = new TopicMetadataRequest(List(topic)) + val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest) assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size) assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic) val partitionMetadata = topicMetadata.head.partitionsMetadata @@ -94,16 +115,14 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(ErrorMapping.LeaderNotAvailableCode, partitionMetadata.head.errorCode) } - private def mockLogManagerAndTestTopic(topic: String): Seq[TopicMetadata] = { + private def mockLogManagerAndTestTopic(request: TopicMetadataRequest): Seq[TopicMetadata] = { // topic metadata request only requires 1 call from the replica manager val replicaManager = EasyMock.createMock(classOf[ReplicaManager]) EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes() EasyMock.replay(replicaManager) - // create a topic metadata request - val topicMetadataRequest = new TopicMetadataRequest(List(topic)) - val serializedMetadataRequest = TestUtils.createRequestByteBuffer(topicMetadataRequest) + val serializedMetadataRequest = TestUtils.createRequestByteBuffer(request) // create the kafka request handler val requestChannel = new RequestChannel(2, 5) -- 1.7.5.4