diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 71ae640..ae3ae04 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -92,7 +92,20 @@ class ConsumerFetcherManager(private val consumerIdString: String, leaderForPartitionsMap.foreach { case(topicAndPartition, leaderBroker) => val pti = partitionMap(topicAndPartition) - addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) + try { + addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) + } catch { + case t => { + if (!isRunning.get()) + throw t /* If this thread is stopped, propagate this exception to kill the thread. */ + else { + warn("Failed to add leader for partition %s".format(topicAndPartition), t) + lock.lock() + noLeaderPartitionSet += topicAndPartition + lock.unlock() + } + } + } } shutdownIdleFetcherThreads()