diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index d1373c9..c6250dc 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -129,6 +129,10 @@ class ConsumerFetcherManager(private val consumerIdString: String, info("Stopping all fetchers") closeAllFetchers() + // no need to hold the lock for the following since leaderFindThread and all fetchers have been stopped + partitionMap = null + noLeaderPartitionSet.clear() + info("All connections stopped") }