diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d96229e..c573615 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -52,20 +52,116 @@ class KafkaApis(val requestChannel: RequestChannel, private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) private val delayedRequestMetrics = new DelayedRequestMetrics - /* following 3 data structures are updated by the update metadata request - * and is queried by the topic metadata request. */ var metadataCache = new MetadataCache - private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() - private val partitionMetadataLock = new ReentrantReadWriteLock() this.logIdent = "[KafkaApi-%d] ".format(brokerId) class MetadataCache { private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] = new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]() + private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() + private val partitionMetadataLock = new ReentrantReadWriteLock() + + def getTopicMetadata(topics: Set[String]): Tuple2[mutable.ListBuffer[TopicMetadata], mutable.ListBuffer[String]] = { + val isAllTopics = topics.isEmpty + val topicsRequested = if(isAllTopics) cache.keySet else topics + val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] + val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String] + inLock(partitionMetadataLock.readLock()) { + for (topic <- topicsRequested) { + if (isAllTopics || this.containsTopic(topic)) { + val partitionStateInfos = cache(topic) + val partitionMetadata = partitionStateInfos.map { + case (partitionId, partitionState) => + val replicas = partitionState.allReplicas + val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq + var leaderInfo: Option[Broker] = None + var isrInfo: Seq[Broker] = Nil + val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch + val leader = leaderIsrAndEpoch.leaderAndIsr.leader + val isr = leaderIsrAndEpoch.leaderAndIsr.isr + debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) + try { + leaderInfo = aliveBrokers.get(leader) + if (!leaderInfo.isDefined) + throw new LeaderNotAvailableException("Leader not available for topic %s partition %s".format(topic, partitionId)) + isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) + if (replicaInfo.size < replicas.size) + throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) + if (isrInfo.size < isr.size) + throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + + isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) + } catch { + case e: Throwable => + debug("Error while fetching metadata for topic %s partition %s. Possible cause: %s".format(topic, partitionId, e.getMessage)) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } + } + topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq) + } else if (config.autoCreateTopicsEnable || topic == OffsetManager.OffsetsTopicName) { + topicsToBeCreated += topic + } else { + topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) + } + } + } + (topicResponses, topicsToBeCreated) + } def addPartitionInfo(topic: String, partitionId: Int, stateInfo: PartitionStateInfo) { + inLock(partitionMetadataLock.writeLock()) { + addPartitionInfoInternal(topic, partitionId, stateInfo) + } + } + + def getPartitionInfos(topic: String) = { + inLock(partitionMetadataLock.readLock()) { + cache(topic) + } + } + + def containsTopicAndPartition(topic: String, + partitionId: Int): Boolean = { + inLock(partitionMetadataLock.readLock()) { + cache.get(topic) match { + case Some(partitionInfos) => partitionInfos.contains(partitionId) + case None => false + } + } + } + + def containsTopic(topic: String) = cache.contains(topic) + + def updateCache(updateMetadataRequest: UpdateMetadataRequest, + brokerId: Int, + stateChangeLogger: StateChangeLogger) { + inLock(partitionMetadataLock.writeLock()) { + updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) + updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) => + if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) { + removePartitionInfo(tp.topic, tp.partition) + stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d") + .format(brokerId, tp, updateMetadataRequest.controllerId, + updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } else { + addPartitionInfo(tp.topic, tp.partition, info) + stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d") + .format(brokerId, info, tp, updateMetadataRequest.controllerId, + updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } + } + } + } + + private def addPartitionInfoInternal(topic: String, + partitionId: Int, + stateInfo: PartitionStateInfo) { cache.get(topic) match { case Some(infos) => infos.put(partitionId, stateInfo) case None => { @@ -76,7 +172,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def removePartitionInfo(topic: String, partitionId: Int) = { + private def removePartitionInfo(topic: String, partitionId: Int) = { cache.get(topic) match { case Some(infos) => { infos.remove(partitionId) @@ -88,42 +184,6 @@ class KafkaApis(val requestChannel: RequestChannel, case None => false } } - - def getPartitionInfos(topic: String) = cache(topic) - - def containsTopicAndPartition(topic: String, - partitionId: Int): Boolean = { - cache.get(topic) match { - case Some(partitionInfos) => partitionInfos.contains(partitionId) - case None => false - } - } - - def allTopics = cache.keySet - - def removeTopic(topic: String) = cache.remove(topic) - - def containsTopic(topic: String) = cache.contains(topic) - - def updateCache(updateMetadataRequest: UpdateMetadataRequest, - brokerId: Int, - stateChangeLogger: StateChangeLogger) = { - updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) => - if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) { - removePartitionInfo(tp.topic, tp.partition) - stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d") - .format(brokerId, tp, updateMetadataRequest.controllerId, - updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } else { - addPartitionInfo(tp.topic, tp.partition, info) - stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d") - .format(brokerId, info, tp, updateMetadataRequest.controllerId, - updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } - } - } } /** @@ -156,10 +216,8 @@ class KafkaApis(val requestChannel: RequestChannel, // ensureTopicExists is only for client facing requests private def ensureTopicExists(topic: String) = { - inLock(partitionMetadataLock.readLock()) { - if (!metadataCache.containsTopic(topic)) - throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted") - } + if (!metadataCache.containsTopic(topic)) + throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted") } def handleLeaderAndIsrRequest(request: RequestChannel.Request) { @@ -194,21 +252,10 @@ class KafkaApis(val requestChannel: RequestChannel, // ensureTopicExists is only for client facing requests // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they // stop serving data to clients for the topic being deleted + replicaManager.maybeUpdateControllerEpoch(updateMetadataRequest) + val stateChangeLogger = replicaManager.stateChangeLogger - if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) { - val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " + - "old controller %d with epoch %d. Latest known controller epoch is %d").format(brokerId, - updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, - replicaManager.controllerEpoch) - stateChangeLogger.warn(stateControllerEpochErrorMessage) - throw new ControllerMovedException(stateControllerEpochErrorMessage) - } - inLock(partitionMetadataLock.writeLock()) { - replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch - // cache the list of alive brokers in the cluster - updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) - metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger) - } + metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger) val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) } @@ -659,54 +706,7 @@ class KafkaApis(val requestChannel: RequestChannel, private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { val config = replicaManager.config - - // Returning all topics when requested topics are empty - val isAllTopics = topics.isEmpty - val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] - val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String] - - inLock(partitionMetadataLock.readLock()) { - val topicsRequested = if (isAllTopics) metadataCache.allTopics else topics - for (topic <- topicsRequested) { - if (isAllTopics || metadataCache.containsTopic(topic)) { - val partitionStateInfos = metadataCache.getPartitionInfos(topic) - val partitionMetadata = partitionStateInfos.map { - case (partitionId, partitionState) => - val replicas = partitionState.allReplicas - val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq - var leaderInfo: Option[Broker] = None - var isrInfo: Seq[Broker] = Nil - val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch - val leader = leaderIsrAndEpoch.leaderAndIsr.leader - val isr = leaderIsrAndEpoch.leaderAndIsr.isr - debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) - try { - leaderInfo = aliveBrokers.get(leader) - if (!leaderInfo.isDefined) - throw new LeaderNotAvailableException("Leader not available for topic %s partition %s".format(topic, partitionId)) - isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) - if (replicaInfo.size < replicas.size) - throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + - replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) - if (isrInfo.size < isr.size) - throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + - isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) - new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) - } catch { - case e: Throwable => - debug("Error while fetching metadata for topic %s partition %s. Possible cause: %s".format(topic, partitionId, e.getMessage)) - new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, - ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - } - } - topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq) - } else if (config.autoCreateTopicsEnable || topic == OffsetManager.OffsetsTopicName) { - topicsToBeCreated += topic - } else { - topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) - } - } - } + val (topicResponses, topicsToBeCreated) = metadataCache.getTopicMetadata(topics) topicResponses.appendAll(topicsToBeCreated.map { topic => try { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0f5aaa9..648ab6b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -23,7 +23,7 @@ import kafka.utils._ import kafka.log.LogManager import kafka.metrics.KafkaMetricsGroup import kafka.common._ -import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} +import kafka.api.{UpdateMetadataRequest, StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} import kafka.controller.KafkaController import org.apache.log4j.Logger import org.I0Itec.zkclient.ZkClient @@ -205,6 +205,21 @@ class ReplicaManager(val config: KafkaConfig, } } + def maybeUpdateControllerEpoch(updateMetadataRequest: UpdateMetadataRequest) { + replicaStateChangeLock synchronized { + if(updateMetadataRequest.controllerEpoch < controllerEpoch) { + val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " + + "old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId, + updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, + controllerEpoch) + stateChangeLogger.warn(stateControllerEpochErrorMessage) + throw new ControllerMovedException(stateControllerEpochErrorMessage) + } else if (updateMetadataRequest.controllerEpoch > controllerEpoch) { + controllerEpoch = updateMetadataRequest.controllerEpoch + } + } + } + def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>