Index: core/src/main/scala/kafka/api/TopicMetadataResponse.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/api/TopicMetadataResponse.scala (revision 8e444a3562d6723b9f33cbdaa6a461409c84c98b) +++ core/src/main/scala/kafka/api/TopicMetadataResponse.scala (revision ) @@ -29,33 +29,26 @@ val brokerMap = brokers.map(b => (b.id, b)).toMap val topicCount = buffer.getInt val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap)) - new TopicMetadataResponse(topicsMetadata, correlationId) + new TopicMetadataResponse(brokers, topicsMetadata, correlationId) } } -case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata], +case class TopicMetadataResponse(brokers: Seq[Broker], + topicsMetadata: Seq[TopicMetadata], override val correlationId: Int) extends RequestOrResponse(correlationId = correlationId) { val sizeInBytes: Int = { - val brokers = extractBrokers(topicsMetadata).values 4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum } def writeTo(buffer: ByteBuffer) { buffer.putInt(correlationId) /* brokers */ - val brokers = extractBrokers(topicsMetadata).values buffer.putInt(brokers.size) brokers.foreach(_.writeTo(buffer)) /* topic metadata */ buffer.putInt(topicsMetadata.length) topicsMetadata.foreach(_.writeTo(buffer)) - } - - def extractBrokers(topicMetadatas: Seq[TopicMetadata]): Map[Int, Broker] = { - val parts = topicsMetadata.flatMap(_.partitionsMetadata) - val brokers = (parts.flatMap(_.replicas)) ++ (parts.map(_.leader).collect{case Some(l) => l}) - brokers.map(b => (b.id, b)).toMap } override def describe(details: Boolean):String = { toString } Index: core/src/main/scala/kafka/api/TopicMetadataRequest.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/api/TopicMetadataRequest.scala (revision 8e444a3562d6723b9f33cbdaa6a461409c84c98b) +++ core/src/main/scala/kafka/api/TopicMetadataRequest.scala (revision ) @@ -79,7 +79,7 @@ val topicMetadata = topics.map { topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } - val errorResponse = TopicMetadataResponse(topicMetadata, correlationId) + val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } \ No newline at end of file Index: core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala (revision 8e444a3562d6723b9f33cbdaa6a461409c84c98b) +++ core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala (revision ) @@ -118,7 +118,7 @@ info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) - val brokerMap = topicsMetadataResponse.extractBrokers(topicsMetadataResponse.topicsMetadata) + val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) true \ No newline at end of file Index: core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala (revision 8e444a3562d6723b9f33cbdaa6a461409c84c98b) +++ core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala (revision ) @@ -143,7 +143,7 @@ } def createTestTopicMetadataResponse: TopicMetadataResponse = { - new TopicMetadataResponse(Seq(topicmetaData1, topicmetaData2), 1) + new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1) } def createTestOffsetCommitRequest: OffsetCommitRequest = { Index: core/src/main/scala/kafka/server/KafkaApis.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 8e444a3562d6723b9f33cbdaa6a461409c84c98b) +++ core/src/main/scala/kafka/server/KafkaApis.scala (revision ) @@ -591,8 +591,9 @@ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet) - trace("Sending topic metadata %s for correlation id %d to client %s".format(topicMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) - val response = new TopicMetadataResponse(topicMetadata, metadataRequest.correlationId) + val brokers = metadataCache.getAliveBrokers + trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) + val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } Index: core/src/main/scala/kafka/server/MetadataCache.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/server/MetadataCache.scala (revision 8e444a3562d6723b9f33cbdaa6a461409c84c98b) +++ core/src/main/scala/kafka/server/MetadataCache.scala (revision ) @@ -81,6 +81,12 @@ topicResponses } + def getAliveBrokers = { + inLock(partitionMetadataLock.readLock()) { + aliveBrokers.values.toList + } + } + def addOrUpdatePartitionInfo(topic: String, partitionId: Int, stateInfo: PartitionStateInfo) {