Index: core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (revision 1401308) +++ core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (working copy) @@ -51,31 +51,32 @@ if (noLeaderPartitionSet.isEmpty) cond.await() - val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata - val leaderForPartitionsMap = new HashMap[(String, Int), Broker] - topicsMetadata.foreach( - tmd => { - val topic = tmd.topic - tmd.partitionsMetadata.foreach( - pmd => { - if(pmd.leader.isDefined){ - val partition = pmd.partitionId - val leaderBroker = pmd.leader.get - leaderForPartitionsMap.put((topic, partition), leaderBroker) - } + try { + trace("Partitions without leader %s".format(noLeaderPartitionSet)) + val brokers = getAllBrokersInCluster(zkClient) + val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata + val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker] + topicsMetadata.foreach( + tmd => { + val topic = tmd.topic + tmd.partitionsMetadata.foreach( + pmd => { + val topicAndPartition = TopicAndPartition(topic, pmd.partitionId) + if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) { + val leaderBroker = pmd.leader.get + leaderForPartitionsMap.put(topicAndPartition, leaderBroker) + } + }) }) - }) - noLeaderPartitionSet.foreach - { - case(TopicAndPartition(topic, partitionId)) => - // find the leader for this partition - val leaderBrokerOpt = leaderForPartitionsMap.get((topic, partitionId)) - if(leaderBrokerOpt.isDefined){ - val pti = partitionMap(TopicAndPartition(topic, partitionId)) - addFetcher(topic, partitionId, pti.getFetchOffset(), leaderBrokerOpt.get) - noLeaderPartitionSet.remove(TopicAndPartition(topic, partitionId)) - } + + leaderForPartitionsMap.foreach{ + case(topicAndPartition, leaderBroker) => + val pti = partitionMap(topicAndPartition) + addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) + } + noLeaderPartitionSet --= leaderForPartitionsMap.keySet + } catch { + case t => warn("Failed to find leader for %s".format(noLeaderPartitionSet), t) } } finally { lock.unlock()