Index: core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (revision 1365864) +++ core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (working copy) @@ -107,14 +107,18 @@ } def stopAllConnections() { + // first, clear noLeaderPartitionSet so that no more fetchers can be added to leader_finder_thread lock.lock() - try { - partitionMap = null - noLeaderPartitionSet.clear() - } finally { - lock.unlock() - } + noLeaderPartitionSet.clear() + lock.unlock() + + // second, stop all existing fetchers closeAllFetchers() + + // finally clear partitionMap + lock.lock() + partitionMap = null + lock.unlock() } def getPartitionTopicInfo(key: (String, Int)) : PartitionTopicInfo = {