diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1a4ffce..32d915f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -52,78 +52,125 @@ class KafkaApis(val requestChannel: RequestChannel, private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) private val delayedRequestMetrics = new DelayedRequestMetrics - /* following 3 data structures are updated by the update metadata request - * and is queried by the topic metadata request. */ var metadataCache = new MetadataCache - private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() - private val partitionMetadataLock = new ReentrantReadWriteLock() this.logIdent = "[KafkaApi-%d] ".format(brokerId) class MetadataCache { private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] = new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]() - - def addPartitionInfo(topic: String, - partitionId: Int, - stateInfo: PartitionStateInfo) { - cache.get(topic) match { - case Some(infos) => infos.put(partitionId, stateInfo) - case None => { - val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo] - cache.put(topic, newInfos) - newInfos.put(partitionId, stateInfo) + private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() + private val partitionMetadataLock = new ReentrantReadWriteLock() + + def getTopicMetadata(topics: Set[String]) = { + val isAllTopics = topics.isEmpty + val topicsRequested = if(isAllTopics) cache.keySet else topics + val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] + inLock(partitionMetadataLock.readLock()) { + for (topic <- topicsRequested) { + if (isAllTopics || cache.contains(topic)) { + val partitionStateInfos = cache(topic) + val partitionMetadata = partitionStateInfos.map { + case (partitionId, partitionState) => + val replicas = partitionState.allReplicas + val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq + var leaderInfo: Option[Broker] = None + var isrInfo: Seq[Broker] = Nil + val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch + val leader = leaderIsrAndEpoch.leaderAndIsr.leader + val isr = leaderIsrAndEpoch.leaderAndIsr.isr + val topicPartition = TopicAndPartition(topic, partitionId) + try { + leaderInfo = aliveBrokers.get(leader) + if (!leaderInfo.isDefined) + throw new LeaderNotAvailableException("Leader not available for %s".format(topicPartition)) + isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) + if (replicaInfo.size < replicas.size) + throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) + if (isrInfo.size < isr.size) + throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + + isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) + } catch { + case e: Throwable => + debug("Error while fetching metadata for %s. Possible cause: %s".format(topicPartition, e.getMessage)) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } + } + topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq) + } } } + topicResponses } - def removePartitionInfo(topic: String, partitionId: Int) = { - cache.get(topic) match { - case Some(infos) => { - infos.remove(partitionId) - if(infos.isEmpty) { - cache.remove(topic) + def addOrUpdatePartitionInfo(topic: String, + partitionId: Int, + stateInfo: PartitionStateInfo) { + inLock(partitionMetadataLock.writeLock()) { + cache.get(topic) match { + case Some(infos) => infos.put(partitionId, stateInfo) + case None => { + val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo] + cache.put(topic, newInfos) + newInfos.put(partitionId, stateInfo) } - true } - case None => false } } - def getPartitionInfos(topic: String) = cache(topic) + def getPartitionInfos(topic: String) = { + inLock(partitionMetadataLock.readLock()) { + cache(topic) + } + } def containsTopicAndPartition(topic: String, partitionId: Int): Boolean = { - cache.get(topic) match { - case Some(partitionInfos) => partitionInfos.contains(partitionId) - case None => false + inLock(partitionMetadataLock.readLock()) { + cache.get(topic) match { + case Some(partitionInfos) => partitionInfos.contains(partitionId) + case None => false + } } } - def allTopics = cache.keySet - - def removeTopic(topic: String) = cache.remove(topic) - - def containsTopic(topic: String) = cache.contains(topic) - def updateCache(updateMetadataRequest: UpdateMetadataRequest, brokerId: Int, - stateChangeLogger: StateChangeLogger) = { - updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) => - if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) { - removePartitionInfo(tp.topic, tp.partition) - stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d") - .format(brokerId, tp, updateMetadataRequest.controllerId, - updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } else { - addPartitionInfo(tp.topic, tp.partition, info) - stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d") - .format(brokerId, info, tp, updateMetadataRequest.controllerId, - updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + stateChangeLogger: StateChangeLogger) { + inLock(partitionMetadataLock.writeLock()) { + updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) + updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) => + if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) { + removePartitionInfo(tp.topic, tp.partition) + stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d") + .format(brokerId, tp, updateMetadataRequest.controllerId, + updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } else { + addOrUpdatePartitionInfo(tp.topic, tp.partition, info) + stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d") + .format(brokerId, info, tp, updateMetadataRequest.controllerId, + updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } } } } + + private def removePartitionInfo(topic: String, partitionId: Int) = { + cache.get(topic) match { + case Some(infos) => { + infos.remove(partitionId) + if(infos.isEmpty) { + cache.remove(topic) + } + true + } + case None => false + } + } } /** @@ -154,14 +201,6 @@ class KafkaApis(val requestChannel: RequestChannel, request.apiLocalCompleteTimeMs = SystemTime.milliseconds } - // ensureTopicExists is only for client facing requests - private def ensureTopicExists(topic: String) = { - inLock(partitionMetadataLock.readLock()) { - if (!metadataCache.containsTopic(topic)) - throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted") - } - } - def handleLeaderAndIsrRequest(request: RequestChannel.Request) { // ensureTopicExists is only for client facing requests // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they @@ -194,21 +233,10 @@ class KafkaApis(val requestChannel: RequestChannel, // ensureTopicExists is only for client facing requests // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they // stop serving data to clients for the topic being deleted + replicaManager.maybeUpdateControllerEpoch(updateMetadataRequest) + val stateChangeLogger = replicaManager.stateChangeLogger - if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) { - val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " + - "old controller %d with epoch %d. Latest known controller epoch is %d").format(brokerId, - updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, - replicaManager.controllerEpoch) - stateChangeLogger.warn(stateControllerEpochErrorMessage) - throw new ControllerMovedException(stateControllerEpochErrorMessage) - } - inLock(partitionMetadataLock.writeLock()) { - replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch - // cache the list of alive brokers in the cluster - updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) - metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger) - } + metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger) val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) } @@ -388,7 +416,6 @@ class KafkaApis(val requestChannel: RequestChannel, BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) try { - ensureTopicExists(topicAndPartition.topic) val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val info = partitionOpt match { @@ -491,7 +518,6 @@ class KafkaApis(val requestChannel: RequestChannel, case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => val partitionData = try { - ensureTopicExists(topic) val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes) @@ -562,7 +588,6 @@ class KafkaApis(val requestChannel: RequestChannel, val responseMap = offsetRequest.requestInfo.map(elem => { val (topicAndPartition, partitionOffsetRequestInfo) = elem try { - ensureTopicExists(topicAndPartition.topic) // ensure leader exists val localReplica = if(!offsetRequest.isFromDebuggingClient) replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) @@ -658,69 +683,24 @@ class KafkaApis(val requestChannel: RequestChannel, } private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { - val config = replicaManager.config - - // Returning all topics when requested topics are empty - val isAllTopics = topics.isEmpty - val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] - val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String] - - inLock(partitionMetadataLock.readLock()) { - val topicsRequested = if (isAllTopics) metadataCache.allTopics else topics - for (topic <- topicsRequested) { - if (isAllTopics || metadataCache.containsTopic(topic)) { - val partitionStateInfos = metadataCache.getPartitionInfos(topic) - val partitionMetadata = partitionStateInfos.map { - case (partitionId, partitionState) => - val replicas = partitionState.allReplicas - val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq - var leaderInfo: Option[Broker] = None - var isrInfo: Seq[Broker] = Nil - val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch - val leader = leaderIsrAndEpoch.leaderAndIsr.leader - val isr = leaderIsrAndEpoch.leaderAndIsr.isr - debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) - try { - leaderInfo = aliveBrokers.get(leader) - if (!leaderInfo.isDefined) - throw new LeaderNotAvailableException("Leader not available for topic %s partition %s".format(topic, partitionId)) - isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) - if (replicaInfo.size < replicas.size) - throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + - replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) - if (isrInfo.size < isr.size) - throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + - isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) - new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) - } catch { - case e: Throwable => - debug("Error while fetching metadata for topic %s partition %s. Possible cause: %s".format(topic, partitionId, e.getMessage)) - new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, - ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - } + val topicResponses = metadataCache.getTopicMetadata(topics) + if (topics.size > 0 && topicResponses.size != topics.size) { + val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet + val responsesForNonExistentTopics = nonExistentTopics.map { topic => + if (config.autoCreateTopicsEnable) { + try { + AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) + info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor)) + } catch { + case e: TopicExistsException => // let it go, possibly another broker created this topic } - topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq) - } else if (config.autoCreateTopicsEnable || topic == OffsetManager.OffsetsTopicName) { - topicsToBeCreated += topic + new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode) } else { - topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) + new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) } } + topicResponses.appendAll(responsesForNonExistentTopics) } - - topicResponses.appendAll(topicsToBeCreated.map { topic => - try { - if (topic == OffsetManager.OffsetsTopicName) - AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, offsetManager.offsetsTopicConfig) - else - AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) - info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor)) - } catch { - case e: TopicExistsException => // let it go, possibly another broker created this topic - } - new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode) - }) - topicResponses } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5588f59..f099e93 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -23,9 +23,8 @@ import kafka.utils._ import kafka.log.LogManager import kafka.metrics.KafkaMetricsGroup import kafka.common._ -import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} +import kafka.api.{UpdateMetadataRequest, StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} import kafka.controller.KafkaController -import org.apache.log4j.Logger import org.I0Itec.zkclient.ZkClient import com.yammer.metrics.core.Gauge import java.util.concurrent.atomic.AtomicBoolean @@ -205,6 +204,21 @@ class ReplicaManager(val config: KafkaConfig, } } + def maybeUpdateControllerEpoch(updateMetadataRequest: UpdateMetadataRequest) { + replicaStateChangeLock synchronized { + if(updateMetadataRequest.controllerEpoch < controllerEpoch) { + val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " + + "old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId, + updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, + controllerEpoch) + stateChangeLogger.warn(stateControllerEpochErrorMessage) + throw new ControllerMovedException(stateControllerEpochErrorMessage) + } else if (updateMetadataRequest.controllerEpoch > controllerEpoch) { + controllerEpoch = updateMetadataRequest.controllerEpoch + } + } + } + def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 17b08e1..b1c4ce9 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -96,7 +96,7 @@ class SimpleFetchTest extends JUnit3Suite { val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo) + apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo) EasyMock.replay(partitionStateInfo) // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log val goodFetch = new FetchRequestBuilder() @@ -169,7 +169,7 @@ class SimpleFetchTest extends JUnit3Suite { val requestChannel = new RequestChannel(2, 5) val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo) + apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo) EasyMock.replay(partitionStateInfo) /**