diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index 0513a59..51380a6 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -32,9 +32,11 @@ object TopicMetadata { val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val topic = readShortString(buffer) val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue)) - val partitionsMetadata = new ArrayBuffer[PartitionMetadata]() - for(i <- 0 until numPartitions) - partitionsMetadata += PartitionMetadata.readFrom(buffer, brokers) + val partitionsMetadata: Array[PartitionMetadata] = new Array[PartitionMetadata](numPartitions) + for(i <- 0 until numPartitions) { + val partitionMetadata = PartitionMetadata.readFrom(buffer, brokers) + partitionsMetadata(partitionMetadata.partitionId) = partitionMetadata + } new TopicMetadata(topic, partitionsMetadata, errorCode) } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index c8c02ce..933de9d 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -36,7 +36,6 @@ import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger -import org.apache.log4j.Logger import java.util.concurrent.locks.ReentrantLock import scala.Some import kafka.common.TopicAndPartition diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 705d87e..4e11785 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -31,7 +31,10 @@ import kafka.utils.{Pool, SystemTime, Logging} import kafka.network.RequestChannel.Response import kafka.cluster.Broker import kafka.controller.KafkaController +import kafka.utils.Utils.inLock import org.I0Itec.zkclient.ZkClient +import java.util.concurrent.locks.ReentrantReadWriteLock +import kafka.controller.KafkaController.StateChangeLogger /** * Logic to handle the various Kafka requests @@ -51,12 +54,78 @@ class KafkaApis(val requestChannel: RequestChannel, private val delayedRequestMetrics = new DelayedRequestMetrics /* following 3 data structures are updated by the update metadata request * and is queried by the topic metadata request. */ - var metadataCache: mutable.Map[TopicAndPartition, PartitionStateInfo] = - new mutable.HashMap[TopicAndPartition, PartitionStateInfo]() + var metadataCache = new MetadataCache private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() - private val partitionMetadataLock = new Object + private val partitionMetadataLock = new ReentrantReadWriteLock() this.logIdent = "[KafkaApi-%d] ".format(brokerId) + class MetadataCache { + private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] = + new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]() + + def addPartitionInfo(topic: String, + partitionId: Int, + stateInfo: PartitionStateInfo) { + cache.get(topic) match { + case Some(infos) => infos.put(partitionId, stateInfo) + case None => { + val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo] + cache.put(topic, newInfos) + newInfos.put(partitionId, stateInfo) + } + } + } + + def removePartitionInfo(topic: String, partitionId: Int) = { + cache.get(topic) match { + case Some(infos) => { + infos.remove(partitionId) + if(infos.isEmpty) { + cache.remove(topic) + } + true + } + case None => false + } + } + + def getPartitionInfos(topic: String) = cache(topic) + + def containsTopicAndPartition(topic: String, + partitionId: Int): Boolean = { + cache.get(topic) match { + case Some(partitionInfos) => partitionInfos.contains(partitionId) + case None => false + } + } + + def allTopics = cache.keySet + + def removeTopic(topic: String) = cache.remove(topic) + + def containsTopic(topic: String) = cache.contains(topic) + + def updateCache(updateMetadataRequest: UpdateMetadataRequest, + brokerId: Int, + stateChangeLogger: StateChangeLogger) = { + updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) => + if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) { + removePartitionInfo(tp.topic, tp.partition) + stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d") + .format(brokerId, tp, updateMetadataRequest.controllerId, + updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } else { + addPartitionInfo(tp.topic, tp.partition, info) + stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d") + .format(brokerId, info, tp, updateMetadataRequest.controllerId, + updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } + } + } + } + /** * Top-level method that handles all requests and multiplexes to the right api */ @@ -87,8 +156,10 @@ class KafkaApis(val requestChannel: RequestChannel, // ensureTopicExists is only for client facing requests private def ensureTopicExists(topic: String) = { - if(!metadataCache.exists { case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic.equals(topic)} ) - throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted") + inLock(partitionMetadataLock.readLock()) { + if (!metadataCache.containsTopic(topic)) + throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted") + } } def handleLeaderAndIsrRequest(request: RequestChannel.Request) { @@ -132,26 +203,11 @@ class KafkaApis(val requestChannel: RequestChannel, stateChangeLogger.warn(stateControllerEpochErrorMessage) throw new ControllerMovedException(stateControllerEpochErrorMessage) } - partitionMetadataLock synchronized { + inLock(partitionMetadataLock.writeLock()) { replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch // cache the list of alive brokers in the cluster updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) - updateMetadataRequest.partitionStateInfos.foreach { partitionState => - if (partitionState._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) { - val partition = partitionState._1 - metadataCache.remove(partition) - stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d") - .format(brokerId, partition, updateMetadataRequest.controllerId, - updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } else { - metadataCache.put(partitionState._1, partitionState._2) - stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d") - .format(brokerId, partitionState._2, partitionState._1, updateMetadataRequest.controllerId, - updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } - } + metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger) } val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) @@ -604,62 +660,68 @@ class KafkaApis(val requestChannel: RequestChannel, private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { val config = replicaManager.config - partitionMetadataLock synchronized { - topics.map { topic => - if(metadataCache.keySet.map(_.topic).contains(topic)) { - val partitionStateInfo = metadataCache.filter(p => p._1.topic.equals(topic)) - val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition) - val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) => - val replicas = metadataCache(topicAndPartition).allReplicas - val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq - var leaderInfo: Option[Broker] = None - var isrInfo: Seq[Broker] = Nil - val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch - val leader = leaderIsrAndEpoch.leaderAndIsr.leader - val isr = leaderIsrAndEpoch.leaderAndIsr.isr - debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) - try { - if(aliveBrokers.keySet.contains(leader)) - leaderInfo = Some(aliveBrokers(leader)) - else throw new LeaderNotAvailableException("Leader not available for partition %s".format(topicAndPartition)) - isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) - if(replicaInfo.size < replicas.size) - throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + - replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) - if(isrInfo.size < isr.size) - throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + - isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) - new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) - } catch { - case e: Throwable => - error("Error while fetching metadata for partition %s".format(topicAndPartition), e) - new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, - ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - } + // Returning all topics when requested topics are empty + val isAllTopics = topics.isEmpty + val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] + val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String] + + inLock(partitionMetadataLock.readLock()) { + val topicsRequested = if (isAllTopics) metadataCache.allTopics else topics + for (topic <- topicsRequested) { + if (isAllTopics || metadataCache.containsTopic(topic)) { + val partitionStateInfos = metadataCache.getPartitionInfos(topic) + val partitionMetadata = partitionStateInfos.map { + case (partitionId, partitionState) => + val replicas = partitionState.allReplicas + val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq + var leaderInfo: Option[Broker] = None + var isrInfo: Seq[Broker] = Nil + val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch + val leader = leaderIsrAndEpoch.leaderAndIsr.leader + val isr = leaderIsrAndEpoch.leaderAndIsr.isr + debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) + try { + leaderInfo = aliveBrokers.get(leader) + if (!leaderInfo.isDefined) + throw new LeaderNotAvailableException("Leader not available for topic %s partition %s".format(topic, partitionId)) + isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) + if (replicaInfo.size < replicas.size) + throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) + if (isrInfo.size < isr.size) + throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + + isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) + } catch { + case e: Throwable => + error("Error while fetching metadata for topic %s partition %s".format(topic, partitionId), e) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } } - new TopicMetadata(topic, partitionMetadata) + topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq) + } else if (config.autoCreateTopicsEnable || topic == OffsetManager.OffsetsTopicName) { + topicsToBeCreated += topic } else { - // topic doesn't exist, send appropriate error code after handling auto create topics - val isOffsetsTopic = topic == OffsetManager.OffsetsTopicName - if (config.autoCreateTopicsEnable || isOffsetsTopic) { - try { - if (isOffsetsTopic) - AdminUtils.createTopic(zkClient, topic, - config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, offsetManager.offsetsTopicConfig) - else - AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) - info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" - .format(topic, config.numPartitions, config.defaultReplicationFactor)) - } catch { - case e: TopicExistsException => // let it go, possibly another broker created this topic - } - new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode) - } else { - new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) - } + topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) } } - }.toSeq + } + + topicResponses.appendAll(topicsToBeCreated.map { topic => + try { + if (topic == OffsetManager.OffsetsTopicName) + AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, offsetManager.offsetsTopicConfig) + else + AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) + info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor)) + } catch { + case e: TopicExistsException => // let it go, possibly another broker created this topic + } + new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode) + }) + + topicResponses } /** @@ -667,17 +729,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] - var uniqueTopics = Set.empty[String] - uniqueTopics = { - if(metadataRequest.topics.size > 0) - metadataRequest.topics.toSet - else { - partitionMetadataLock synchronized { - metadataCache.keySet.map(_.topic) - } - } - } - val topicMetadata = getTopicMetadata(uniqueTopics) + val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet) trace("Sending topic metadata %s for correlation id %d to client %s".format(topicMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) val response = new TopicMetadataResponse(topicMetadata, metadataRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index d5644ea..00b17c4 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -320,9 +320,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { try { // wait for the update metadata request to trickle to the brokers assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() => - activeServers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000)) + activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000)) assertEquals(0, partitionsRemaining.size) - var partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic, partition)) + var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition) var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader assertEquals(0, leaderAfterShutdown) assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size) @@ -331,15 +331,15 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { partitionsRemaining = controller.shutdownBroker(1) assertEquals(0, partitionsRemaining.size) activeServers = servers.filter(s => s.config.brokerId == 0) - partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic, partition)) + partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition) leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader assertEquals(0, leaderAfterShutdown) - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) partitionsRemaining = controller.shutdownBroker(0) assertEquals(1, partitionsRemaining.size) // leader doesn't change since all the replicas are shut down - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) } finally { servers.foreach(_.shutdown()) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 22bb6f2..17b08e1 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -96,7 +96,7 @@ class SimpleFetchTest extends JUnit3Suite { val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo) + apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo) EasyMock.replay(partitionStateInfo) // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log val goodFetch = new FetchRequestBuilder() @@ -169,7 +169,7 @@ class SimpleFetchTest extends JUnit3Suite { val requestChannel = new RequestChannel(2, 5) val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo) + apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo) EasyMock.replay(partitionStateInfo) /** diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 2054c25..71ab6e1 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -531,7 +531,7 @@ object TestUtils extends Logging { def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = { assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), TestUtils.waitUntilTrue(() => - servers.foldLeft(true)(_ && _.apis.metadataCache.keySet.contains(TopicAndPartition(topic, partition))), timeout)) + servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic, partition)), timeout)) } def writeNonsenseToFile(fileName: File, position: Long, size: Int) {