diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b9d2260..0513e45 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -27,7 +27,6 @@ import scala.collection._ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic._ import kafka.metrics.KafkaMetricsGroup -import org.I0Itec.zkclient.ZkClient import kafka.common._ import kafka.utils.{ZkUtils, Pool, SystemTime, Logging} import kafka.network.RequestChannel.Response @@ -65,11 +64,10 @@ class KafkaApis(val requestChannel: RequestChannel, private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() private val partitionMetadataLock = new ReentrantReadWriteLock() - def getTopicMetadata(topics: Set[String]): Tuple2[mutable.ListBuffer[TopicMetadata], mutable.ListBuffer[String]] = { + def getTopicMetadata(topics: Set[String]) = { val isAllTopics = topics.isEmpty val topicsRequested = if(isAllTopics) cache.keySet else topics val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] - val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String] inLock(partitionMetadataLock.readLock()) { for (topic <- topicsRequested) { if (isAllTopics || this.containsTopic(topic)) { @@ -82,11 +80,11 @@ class KafkaApis(val requestChannel: RequestChannel, val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch val leader = leaderIsrAndEpoch.leaderAndIsr.leader val isr = leaderIsrAndEpoch.leaderAndIsr.isr - debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) + val topicPartition = TopicAndPartition(topic, partitionId) try { leaderInfo = aliveBrokers.get(leader) if (!leaderInfo.isDefined) - throw new LeaderNotAvailableException("Leader not available for topic %s partition %s".format(topic, partitionId)) + throw new LeaderNotAvailableException("Leader not available for %s.".format(topicPartition)) isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) if (replicaInfo.size < replicas.size) throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + @@ -97,20 +95,16 @@ class KafkaApis(val requestChannel: RequestChannel, new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) } catch { case e: Throwable => - debug("Error while fetching metadata for topic %s partition %s. Possible cause: %s".format(topic, partitionId, e.getMessage)) + debug("Error while fetching metadata for %s. Possible cause: %s".format(topicPartition, e.getMessage)) new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } } topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq) - } else if (config.autoCreateTopicsEnable) { - topicsToBeCreated += topic - } else { - topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) } } } - (topicResponses, topicsToBeCreated) + topicResponses } def addPartitionInfo(topic: String, @@ -654,17 +648,19 @@ class KafkaApis(val requestChannel: RequestChannel, } private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { - val (topicResponses, topicsToBeCreated) = metadataCache.getTopicMetadata(topics) - - topicResponses.appendAll(topicsToBeCreated.map { topic => - try { - AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) - info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor)) - } catch { - case e: TopicExistsException => // let it go, possibly another broker created this topic - } - new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode) - }) + val topicResponses = metadataCache.getTopicMetadata(topics) + if (topics.size > 0 && topicResponses.size != topics.size && config.autoCreateTopicsEnable) { + val topicsToBeCreated = topics -- topicResponses.map(_.topic).toSet + topicResponses.appendAll(topicsToBeCreated.map { topic => + try { + AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) + info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor)) + } catch { + case e: TopicExistsException => // let it go, possibly another broker created this topic + } + new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode) + }) + } topicResponses }