diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 9e9a428..132a766 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -23,6 +23,7 @@ import kafka.cluster.Broker import kafka.utils.{Logging, Utils, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException +import scala.collection._ import scala.collection.mutable import kafka.common.{BrokerNotAvailableException, LeaderNotAvailableException, ReplicaNotAvailableException, ErrorMapping} @@ -85,57 +86,61 @@ object AdminUtils extends Logging { case e2 => throw new AdministrationException(e2.toString) } } + + def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata = + fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker]) - def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[TopicMetadata] = { + def fetchTopicMetadataFromZk(topics: Set[String], zkClient: ZkClient): Set[TopicMetadata] = { val cachedBrokerInfo = new mutable.HashMap[Int, Broker]() - topics.map { topic => - if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) { - val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get - val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) + topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo)) + } + + private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = { + if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) { + val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get + val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) - val partitionMetadata = sortedPartitions.map { partitionMap => - val partition = partitionMap._1 - val replicas = partitionMap._2 - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition) - val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) - debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader) + val partitionMetadata = sortedPartitions.map { partitionMap => + val partition = partitionMap._1 + val replicas = partitionMap._2 + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition) + val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) + debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader) - var leaderInfo: Option[Broker] = None - var replicaInfo: Seq[Broker] = Nil - var isrInfo: Seq[Broker] = Nil - try { - try { - leaderInfo = leader match { - case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) - case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) - } - }catch { - case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d" - .format(topic, partition)) - } + var leaderInfo: Option[Broker] = None + var replicaInfo: Seq[Broker] = Nil + var isrInfo: Seq[Broker] = Nil + try { + try { + leaderInfo = leader match { + case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) + case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) + } + } catch { + case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition)) + } - try { - replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) - isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas) - }catch { - case e => throw new ReplicaNotAvailableException(e) - } + try { + replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) + isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas) + } catch { + case e => throw new ReplicaNotAvailableException(e) + } - new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) - }catch { - case e: ReplicaNotAvailableException => - new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, - ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - case le: LeaderNotAvailableException => - new PartitionMetadata(partition, None, replicaInfo, isrInfo, - ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]])) - } + new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) + } catch { + case e: ReplicaNotAvailableException => + new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + case le: LeaderNotAvailableException => + new PartitionMetadata(partition, None, replicaInfo, isrInfo, + ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]])) } - new TopicMetadata(topic, partitionMetadata) - } else { - // topic doesn't exist, send appropriate error code - new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) } + new TopicMetadata(topic, partitionMetadata) + } else { + // topic doesn't exist, send appropriate error code + new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) } } diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala index 42ed3ef..f91eca2 100644 --- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala @@ -77,7 +77,7 @@ object ListTopicCommand { } def showTopic(topic: String, zkClient: ZkClient) { - val topicMetaData = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head + val topicMetaData = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient) topicMetaData.errorCode match { case ErrorMapping.UnknownTopicOrPartitionCode => println("topic " + topic + " doesn't exist!") diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 514bd59..29e29f3 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -52,7 +52,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, cond.await() val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = getTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSeq, brokers).topicsMetadata + val topicsMetadata = getTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata val leaderForPartitionsMap = new HashMap[(String, Int), Broker] topicsMetadata.foreach( tmd => { diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index f5df1fc..9cf6428 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -414,7 +414,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic val consumersPerTopicMap = getConsumersPerTopic(zkClient, group) val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = getTopicMetadata(myTopicThreadIdsMap.keySet.toSeq, brokers).topicsMetadata + val topicsMetadata = getTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int] topicsMetadata.foreach(m =>{ diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index 4c1e2a9..f881b29 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -45,7 +45,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, case Some(m) => m case None => // refresh the topic metadata cache - updateInfo(List(topic)) + updateInfo(Set(topic)) val topicMetadata = topicPartitionInfo.get(topic) topicMetadata match { case Some(m) => m @@ -69,7 +69,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, * It updates the cache by issuing a get topic metadata request to a random broker. * @param topic the topic for which the metadata is to be fetched */ - def updateInfo(topics: Seq[String]) = { + def updateInfo(topics: Set[String]) = { var topicsMetadata: Seq[TopicMetadata] = Nil val topicMetadataResponse = Utils.getTopicMetadata(topics, brokers) topicsMetadata = topicMetadataResponse.topicsMetadata diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 1133867..c2388b4 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -55,7 +55,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.producerRetryBackoffMs) // get topics of the outstanding produce requests and refresh metadata for those - Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic))) + Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic).toSet)) remainingRetries -= 1 ProducerStats.resendRate.mark() } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f648df3..2296cb3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -358,33 +358,33 @@ class KafkaApis(val requestChannel: RequestChannel, val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]() val config = replicaManager.config - val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient) - metadataRequest.topics.zip(topicMetadataList).foreach( + val uniqueTopics = metadataRequest.topics.toSet + val topicMetadataList = AdminUtils.fetchTopicMetadataFromZk(uniqueTopics, zkClient) + topicMetadataList.foreach( topicAndMetadata => { - val topic = topicAndMetadata._1 - topicAndMetadata._2.errorCode match { - case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2 + topicAndMetadata.errorCode match { + case ErrorMapping.NoError => topicsMetadata += topicAndMetadata case ErrorMapping.UnknownTopicOrPartitionCode => try { /* check if auto creation of topics is turned on */ if (config.autoCreateTopics) { - CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) + CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" - .format(topic, config.numPartitions, config.defaultReplicationFactor)) - val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head + .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)) + val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicAndMetadata.topic, zkClient) topicsMetadata += newTopicMetadata newTopicMetadata.errorCode match { case ErrorMapping.NoError => - case _ => throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topic)) + case _ => throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topicAndMetadata.topic)) } } } catch { case e => error("Error while retrieving topic metadata", e) } case _ => - error("Error while fetching topic metadata for topic " + topic, - ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause) - topicsMetadata += topicAndMetadata._2 + error("Error while fetching topic metadata for topic " + topicAndMetadata.topic, + ErrorMapping.exceptionFor(topicAndMetadata.errorCode).getCause) + topicsMetadata += topicAndMetadata } }) topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString)) diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 70d3b02..9fbc30b 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -677,10 +677,10 @@ object Utils extends Logging { } } - def getTopicMetadata(topics: Seq[String], brokers: Seq[Broker]): TopicMetadataResponse = { + def getTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 - val topicMetadataRequest = new TopicMetadataRequest(topics) + val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq) var topicMetadataResponse: TopicMetadataResponse = null var t: Throwable = null while(i < brokers.size && !fetchMetaDataSucceeded) { diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index c9e2229..80a1fb6 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -158,13 +158,11 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap) - val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head - .partitionsMetadata.map(p => p.replicas) + val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas) val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList assertEquals(expectedReplicaAssignment.size, actualReplicaList.size) - for( i <- 0 until actualReplicaList.size ) { + for(i <- 0 until actualReplicaList.size) assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i)) - } try { AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) @@ -191,7 +189,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap) - val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head + val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient) newTopicMetadata.errorCode match { case ErrorMapping.UnknownTopicOrPartitionCode => fail("Topic " + topic + " should've been automatically created") diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index ef33238..2ce5d37 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -318,8 +318,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val newTopic = "new-topic" CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString) assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => - AdminUtils.getTopicMetaDataFromZK(List(newTopic), - zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) + AdminUtils.fetchTopicMetadataFromZk(newTopic, zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500) val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build()) assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 2077ec1..6f27ffc 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -88,8 +88,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ def testUpdateBrokerPartitionInfo() { CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2) assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => - AdminUtils.getTopicMetaDataFromZK(List("new-topic"), - zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) + AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) val props1 = new util.Properties() @@ -154,8 +153,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // create topic with 1 partition and await leadership CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2) assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => - AdminUtils.getTopicMetaDataFromZK(List("new-topic"), - zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) + AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) val producer1 = new Producer[String, String](producerConfig1) @@ -206,8 +204,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // create topic CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0") assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => - AdminUtils.getTopicMetaDataFromZK(List("new-topic"), - zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) + AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 1, 500) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 2, 500) @@ -267,8 +264,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // create topics in ZK CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1") assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => - AdminUtils.getTopicMetaDataFromZK(List("new-topic"), - zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) + AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) // do a simple test to make sure plumbing is okay