diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index 0513a59..649fcb6 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -32,9 +32,11 @@ object TopicMetadata { val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val topic = readShortString(buffer) val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue)) - val partitionsMetadata = new ArrayBuffer[PartitionMetadata]() - for(i <- 0 until numPartitions) - partitionsMetadata += PartitionMetadata.readFrom(buffer, brokers) + val partitionsMetadata: Array[PartitionMetadata] = Array.fill(numPartitions){null} + for(i <- 0 until numPartitions) { + val partitionMetadata = PartitionMetadata.readFrom(buffer, brokers) + partitionsMetadata(partitionMetadata.partitionId) = partitionMetadata + } new TopicMetadata(topic, partitionsMetadata, errorCode) } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index fcabd0d..d44b6af 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -36,7 +36,6 @@ import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger -import org.apache.log4j.Logger import java.util.concurrent.locks.ReentrantLock import scala.Some import kafka.common.TopicAndPartition diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c068ef6..4c35f0a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -31,7 +31,9 @@ import kafka.utils.{Pool, SystemTime, Logging} import kafka.network.RequestChannel.Response import kafka.cluster.Broker import kafka.controller.KafkaController +import kafka.utils.Utils.inLock import org.I0Itec.zkclient.ZkClient +import java.util.concurrent.locks.ReentrantReadWriteLock /** * Logic to handle the various Kafka requests @@ -54,7 +56,7 @@ class KafkaApis(val requestChannel: RequestChannel, var metadataCache: mutable.Map[TopicAndPartition, PartitionStateInfo] = new mutable.HashMap[TopicAndPartition, PartitionStateInfo]() private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() - private val partitionMetadataLock = new Object + private val partitionMetadataLock = new ReentrantReadWriteLock() this.logIdent = "[KafkaApi-%d] ".format(brokerId) /** @@ -132,7 +134,7 @@ class KafkaApis(val requestChannel: RequestChannel, stateChangeLogger.warn(stateControllerEpochErrorMessage) throw new ControllerMovedException(stateControllerEpochErrorMessage) } - partitionMetadataLock synchronized { + inLock(partitionMetadataLock.writeLock()) { replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch // cache the list of alive brokers in the cluster updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) @@ -610,12 +612,16 @@ class KafkaApis(val requestChannel: RequestChannel, private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { val config = replicaManager.config - partitionMetadataLock synchronized { - topics.map { topic => - if(metadataCache.keySet.map(_.topic).contains(topic)) { + // Returning all topics when requested topics are empty + val isAllTopics = topics.isEmpty + + inLock(partitionMetadataLock.readLock()) { + val allTopics = metadataCache.keySet.map(_.topic) + val topicsRequested = if(isAllTopics) allTopics else topics + topicsRequested.map { topic => + if(isAllTopics || allTopics.contains(topic)) { val partitionStateInfo = metadataCache.filter(p => p._1.topic.equals(topic)) - val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition) - val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) => + val partitionMetadata = partitionStateInfo.map { case(topicAndPartition, partitionState) => val replicas = metadataCache(topicAndPartition).allReplicas val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq var leaderInfo: Option[Broker] = None @@ -625,9 +631,9 @@ class KafkaApis(val requestChannel: RequestChannel, val isr = leaderIsrAndEpoch.leaderAndIsr.isr debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) try { - if(aliveBrokers.keySet.contains(leader)) - leaderInfo = Some(aliveBrokers(leader)) - else throw new LeaderNotAvailableException("Leader not available for partition %s".format(topicAndPartition)) + leaderInfo = aliveBrokers.get(leader) + if(!leaderInfo.isDefined) + throw new LeaderNotAvailableException("Leader not available for partition %s".format(topicAndPartition)) isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) if(replicaInfo.size < replicas.size) throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + @@ -643,7 +649,7 @@ class KafkaApis(val requestChannel: RequestChannel, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } } - new TopicMetadata(topic, partitionMetadata) + new TopicMetadata(topic, partitionMetadata.toSeq) } else { // topic doesn't exist, send appropriate error code after handling auto create topics val isOffsetsTopic = topic == OffsetManager.OffsetsTopicName @@ -673,17 +679,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] - var uniqueTopics = Set.empty[String] - uniqueTopics = { - if(metadataRequest.topics.size > 0) - metadataRequest.topics.toSet - else { - partitionMetadataLock synchronized { - metadataCache.keySet.map(_.topic) - } - } - } - val topicMetadata = getTopicMetadata(uniqueTopics) + val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet) trace("Sending topic metadata %s for correlation id %d to client %s".format(topicMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) val response = new TopicMetadataResponse(topicMetadata, metadataRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))