diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index 0513a59..51380a6 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -32,9 +32,11 @@ object TopicMetadata { val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val topic = readShortString(buffer) val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue)) - val partitionsMetadata = new ArrayBuffer[PartitionMetadata]() - for(i <- 0 until numPartitions) - partitionsMetadata += PartitionMetadata.readFrom(buffer, brokers) + val partitionsMetadata: Array[PartitionMetadata] = new Array[PartitionMetadata](numPartitions) + for(i <- 0 until numPartitions) { + val partitionMetadata = PartitionMetadata.readFrom(buffer, brokers) + partitionsMetadata(partitionMetadata.partitionId) = partitionMetadata + } new TopicMetadata(topic, partitionsMetadata, errorCode) } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index d6c0321..e225226 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -34,7 +34,6 @@ import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger -import org.apache.log4j.Logger import java.util.concurrent.locks.ReentrantLock import scala.Some import kafka.common.TopicAndPartition diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0f137c5..b9d2260 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -33,7 +33,10 @@ import kafka.utils.{ZkUtils, Pool, SystemTime, Logging} import kafka.network.RequestChannel.Response import kafka.cluster.Broker import kafka.controller.KafkaController - +import kafka.utils.Utils.inLock +import org.I0Itec.zkclient.ZkClient +import java.util.concurrent.locks.ReentrantReadWriteLock +import kafka.controller.KafkaController.StateChangeLogger /** * Logic to handle the various Kafka requests @@ -52,12 +55,130 @@ class KafkaApis(val requestChannel: RequestChannel, private val delayedRequestMetrics = new DelayedRequestMetrics /* following 3 data structures are updated by the update metadata request * and is queried by the topic metadata request. */ - var metadataCache: mutable.Map[TopicAndPartition, PartitionStateInfo] = - new mutable.HashMap[TopicAndPartition, PartitionStateInfo]() - private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() - private val partitionMetadataLock = new Object + var metadataCache = new MetadataCache this.logIdent = "[KafkaApi-%d] ".format(brokerId) + class MetadataCache { + private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] = + new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]() + + private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() + private val partitionMetadataLock = new ReentrantReadWriteLock() + + def getTopicMetadata(topics: Set[String]): Tuple2[mutable.ListBuffer[TopicMetadata], mutable.ListBuffer[String]] = { + val isAllTopics = topics.isEmpty + val topicsRequested = if(isAllTopics) cache.keySet else topics + val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] + val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String] + inLock(partitionMetadataLock.readLock()) { + for (topic <- topicsRequested) { + if (isAllTopics || this.containsTopic(topic)) { + val partitionStateInfos = cache(topic) + val partitionMetadata = partitionStateInfos.map { case (partitionId, partitionState) => + val replicas = partitionState.allReplicas + val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq + var leaderInfo: Option[Broker] = None + var isrInfo: Seq[Broker] = Nil + val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch + val leader = leaderIsrAndEpoch.leaderAndIsr.leader + val isr = leaderIsrAndEpoch.leaderAndIsr.isr + debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) + try { + leaderInfo = aliveBrokers.get(leader) + if (!leaderInfo.isDefined) + throw new LeaderNotAvailableException("Leader not available for topic %s partition %s".format(topic, partitionId)) + isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) + if (replicaInfo.size < replicas.size) + throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) + if (isrInfo.size < isr.size) + throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + + isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) + } catch { + case e: Throwable => + debug("Error while fetching metadata for topic %s partition %s. Possible cause: %s".format(topic, partitionId, e.getMessage)) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } + } + topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq) + } else if (config.autoCreateTopicsEnable) { + topicsToBeCreated += topic + } else { + topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) + } + } + } + (topicResponses, topicsToBeCreated) + } + + def addPartitionInfo(topic: String, + partitionId: Int, + stateInfo: PartitionStateInfo) { + inLock(partitionMetadataLock.writeLock()) { + addPartitionInfoInternal(topic, partitionId, stateInfo) + } + } + + def getPartitionInfos(topic: String) = { + inLock(partitionMetadataLock.readLock()) { + cache(topic) + } + } + + def containsTopicAndPartition(topic: String, + partitionId: Int): Boolean = { + inLock(partitionMetadataLock.readLock()) { + cache.get(topic) match { + case Some(partitionInfos) => partitionInfos.contains(partitionId) + case None => false + } + } + } + + def containsTopic(topic: String) = cache.contains(topic) + + def updateCache(updateMetadataRequest: UpdateMetadataRequest, + brokerId: Int, + stateChangeLogger: StateChangeLogger) { + inLock(partitionMetadataLock.writeLock()) { + updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) + updateMetadataRequest.partitionStateInfos.foreach { partitionState => + addPartitionInfoInternal(partitionState._1.topic, partitionState._1.partition, partitionState._2) + stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1, + updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } + // remove the topics that don't exist in the UpdateMetadata request since those are the topics that are + // currently being deleted by the controller + val topicsKnownToThisBroker = cache.keySet + val topicsKnownToTheController = updateMetadataRequest.partitionStateInfos.map { + case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet + val deletedTopics = topicsKnownToThisBroker -- topicsKnownToTheController + deletedTopics.foreach { topic => + cache.remove(topic) + stateChangeLogger.trace(("Broker %d deleted partitions for topic %s from metadata cache in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d").format(brokerId, topic, + updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } + } + } + + private def addPartitionInfoInternal(topic: String, + partitionId: Int, + stateInfo: PartitionStateInfo) { + cache.get(topic) match { + case Some(infos) => infos.put(partitionId, stateInfo) + case None => { + val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo] + cache.put(topic, newInfos) + newInfos.put(partitionId, stateInfo) + } + } + } + } + /** * Top-level method that handles all requests and multiplexes to the right api */ @@ -87,7 +208,7 @@ class KafkaApis(val requestChannel: RequestChannel, // ensureTopicExists is only for client facing requests private def ensureTopicExists(topic: String) = { - if(!metadataCache.exists { case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic.equals(topic)} ) + if (!metadataCache.containsTopic(topic)) throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted") } @@ -132,33 +253,9 @@ class KafkaApis(val requestChannel: RequestChannel, stateChangeLogger.warn(stateControllerEpochErrorMessage) throw new ControllerMovedException(stateControllerEpochErrorMessage) } - partitionMetadataLock synchronized { - replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch + replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch // cache the list of alive brokers in the cluster - updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) - updateMetadataRequest.partitionStateInfos.foreach { partitionState => - metadataCache.put(partitionState._1, partitionState._2) - stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1, - updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } - // remove the topics that don't exist in the UpdateMetadata request since those are the topics that are - // currently being deleted by the controller - val topicsKnownToThisBroker = metadataCache.map { - case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet - val topicsKnownToTheController = updateMetadataRequest.partitionStateInfos.map { - case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet - val deletedTopics = topicsKnownToThisBroker -- topicsKnownToTheController - val partitionsToBeDeleted = metadataCache.filter { - case(topicAndPartition, partitionStateInfo) => deletedTopics.contains(topicAndPartition.topic) - }.keySet - partitionsToBeDeleted.foreach { partition => - metadataCache.remove(partition) - stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d").format(brokerId, partition, - updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } - } + metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger) val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) } @@ -550,89 +647,26 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] - val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]() - val config = replicaManager.config - var uniqueTopics = Set.empty[String] - uniqueTopics = { - if(metadataRequest.topics.size > 0) - metadataRequest.topics.toSet - else { - partitionMetadataLock synchronized { - metadataCache.keySet.map(_.topic) - } - } - } - val topicMetadataList = - partitionMetadataLock synchronized { - uniqueTopics.map { topic => - if(metadataCache.keySet.map(_.topic).contains(topic)) { - debug("Topic %s exists in metadata cache on broker %d".format(topic, config.brokerId)) - val partitionStateInfo = metadataCache.filter(p => p._1.topic.equals(topic)) - val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition) - val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) => - val replicas = metadataCache(topicAndPartition).allReplicas - var replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq - var leaderInfo: Option[Broker] = None - var isrInfo: Seq[Broker] = Nil - val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch - val leader = leaderIsrAndEpoch.leaderAndIsr.leader - val isr = leaderIsrAndEpoch.leaderAndIsr.isr - debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) - try { - if(aliveBrokers.keySet.contains(leader)) - leaderInfo = Some(aliveBrokers(leader)) - else throw new LeaderNotAvailableException("Leader not available for partition %s".format(topicAndPartition)) - isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) - if(replicaInfo.size < replicas.size) - throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + - replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) - if(isrInfo.size < isr.size) - throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + - isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) - new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) - } catch { - case e: Throwable => - error("Error while fetching metadata for partition %s".format(topicAndPartition), e) - new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, - ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - } - } - new TopicMetadata(topic, partitionMetadata) - } else { - debug("Topic %s does not exist in metadata cache on broker %d".format(topic, config.brokerId)) - // topic doesn't exist, send appropriate error code - new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) - } - } - } + val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet) + trace("Sending topic metadata %s for correlation id %d to client %s".format(topicMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) + val response = new TopicMetadataResponse(topicMetadata, metadataRequest.correlationId) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } - // handle auto create topics - topicMetadataList.foreach { topicMetadata => - topicMetadata.errorCode match { - case ErrorMapping.NoError => topicsMetadata += topicMetadata - case ErrorMapping.UnknownTopicOrPartitionCode => - if (config.autoCreateTopicsEnable) { - try { - AdminUtils.createTopic(zkClient, topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor) - info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" - .format(topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor)) - } catch { - case e: TopicExistsException => // let it go, possibly another broker created this topic - } - topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode) - } else { - debug("Auto create topic skipped for %s".format(topicMetadata.topic)) - topicsMetadata += topicMetadata - } - case _ => - debug("Error while fetching topic metadata for topic %s due to %s ".format(topicMetadata.topic, - ErrorMapping.exceptionFor(topicMetadata.errorCode).getClass.getName)) - topicsMetadata += topicMetadata + private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { + val (topicResponses, topicsToBeCreated) = metadataCache.getTopicMetadata(topics) + + topicResponses.appendAll(topicsToBeCreated.map { topic => + try { + AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) + info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor)) + } catch { + case e: TopicExistsException => // let it go, possibly another broker created this topic } - } - trace("Sending topic metadata %s for correlation id %d to client %s".format(topicsMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) - val response = new TopicMetadataResponse(topicsMetadata.toSeq, metadataRequest.correlationId) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode) + }) + + topicResponses } /* diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index d5644ea..00b17c4 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -320,9 +320,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { try { // wait for the update metadata request to trickle to the brokers assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() => - activeServers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000)) + activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000)) assertEquals(0, partitionsRemaining.size) - var partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic, partition)) + var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition) var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader assertEquals(0, leaderAfterShutdown) assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size) @@ -331,15 +331,15 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { partitionsRemaining = controller.shutdownBroker(1) assertEquals(0, partitionsRemaining.size) activeServers = servers.filter(s => s.config.brokerId == 0) - partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic, partition)) + partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition) leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader assertEquals(0, leaderAfterShutdown) - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) partitionsRemaining = controller.shutdownBroker(0) assertEquals(1, partitionsRemaining.size) // leader doesn't change since all the replicas are shut down - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) } finally { servers.foreach(_.shutdown()) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 1317b4c..a6b4b2d 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -93,7 +93,7 @@ class SimpleFetchTest extends JUnit3Suite { val requestChannel = new RequestChannel(2, 5) val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo) + apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo) EasyMock.replay(partitionStateInfo) // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log val goodFetch = new FetchRequestBuilder() @@ -164,7 +164,7 @@ class SimpleFetchTest extends JUnit3Suite { val requestChannel = new RequestChannel(2, 5) val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo) + apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo) EasyMock.replay(partitionStateInfo) /** diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 500eeca..4ada42b 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -513,7 +513,7 @@ object TestUtils extends Logging { def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = { Assert.assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), TestUtils.waitUntilTrue(() => - servers.foldLeft(true)(_ && _.apis.metadataCache.keySet.contains(TopicAndPartition(topic, partition))), timeout)) + servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic, partition)), timeout)) } def writeNonsenseToFile(fileName: File, position: Long, size: Int) {