diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 81bf0bd..881f51e 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -31,7 +31,6 @@ import java.util.UUID import kafka.serializer._ import kafka.utils.ZkUtils._ import kafka.common._ -import kafka.client.ClientUtils import com.yammer.metrics.core.Gauge import kafka.metrics._ import scala.Some @@ -422,17 +421,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, true } else { - val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, - brokers, - config.clientId, - config.socketTimeoutMs, - correlationId.getAndIncrement).topicsMetadata - val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] - topicsMetadata.foreach(m => { - val topic = m.topic - val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) - partitionsPerTopicMap.put(topic, partitions) - }) + val partitionsAssignmentPerTopicMap = getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq) + val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq)) /** * fetchers must be stopped to avoid data duplication, since if the current