diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9e569eb..aabe62d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -651,17 +651,22 @@ class KafkaApis(val requestChannel: RequestChannel, private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { val topicResponses = metadataCache.getTopicMetadata(topics) - if (topics.size > 0 && topicResponses.size != topics.size && config.autoCreateTopicsEnable) { - val topicsToBeCreated = topics -- topicResponses.map(_.topic).toSet - topicResponses.appendAll(topicsToBeCreated.map { topic => - try { - AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) - info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor)) - } catch { - case e: TopicExistsException => // let it go, possibly another broker created this topic + if (topics.size > 0 && topicResponses.size != topics.size) { + val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet + val responsesForNonExistentTopics = nonExistentTopics.map { topic => + if (config.autoCreateTopicsEnable) { + try { + AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) + info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor)) + } catch { + case e: TopicExistsException => // let it go, possibly another broker created this topic + } + new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode) + } else { + new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) } - new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode) - }) + } + topicResponses.appendAll(responsesForNonExistentTopics) } topicResponses