diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 3aa7b08..16aede3 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -43,7 +43,9 @@ class ConsumerFetcherManager(private val consumerIdString: String, private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition] private val lock = new ReentrantLock private val cond = lock.newCondition() - private val leaderFinderThread = new ShutdownableThread(consumerIdString + "-leader-finder-thread"){ + private var leaderFinderThread: ShutdownableThread = null + + private class LeaderFinderThread(name: String) extends ShutdownableThread(name) { // thread responsible for adding the fetcher to the right broker when leader is available override def doWork() { lock.lock() @@ -93,8 +95,6 @@ class ConsumerFetcherManager(private val consumerIdString: String, Thread.sleep(config.refreshLeaderBackoffMs) } } - leaderFinderThread.start() - override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { new ConsumerFetcherThread( @@ -103,8 +103,9 @@ class ConsumerFetcherManager(private val consumerIdString: String, } def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) { - if (!leaderFinderThread.isRunning.get()) - throw new RuntimeException("%s already shutdown".format(name)) + leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread") + leaderFinderThread.start() + lock.lock() try { partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap @@ -116,16 +117,17 @@ class ConsumerFetcherManager(private val consumerIdString: String, } } - def stopAllConnections() { - lock.lock() - // first, clear noLeaderPartitionSet so that no more fetchers can be added to leader_finder_thread - noLeaderPartitionSet.clear() - // second, clear partitionMap - partitionMap = null - lock.unlock() + def stopConnections() { + info("Stopping leader finder thread") + if (leaderFinderThread != null) { + leaderFinderThread.shutdown() + leaderFinderThread = null + } - // third, stop all existing fetchers + info("Stopping all fetchers") closeAllFetchers() + + info("All connections stopped") } def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) { @@ -141,11 +143,4 @@ class ConsumerFetcherManager(private val consumerIdString: String, lock.unlock() } } - - def shutdown() { - info("shutting down") - leaderFinderThread.shutdown() - stopAllConnections() - info("shutdown completed") - } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index dcbcf21..9a5fbfe 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -162,7 +162,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (config.autoCommitEnable) scheduler.shutdownNow() fetcher match { - case Some(f) => f.shutdown + case Some(f) => f.stopConnections case None => } sendShutdownToAllQueues() @@ -483,7 +483,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten fetcher match { case Some(f) => - f.stopAllConnections + f.stopConnections clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams) info("Committing all offsets after clearing the fetcher queues") /** diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 4ee23cd..087979f 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -159,7 +159,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } def addPartition(topic: String, partitionId: Int, initialOffset: Long) { - partitionMapLock.lock() + partitionMapLock.lockInterruptibly() try { val topicPartition = TopicAndPartition(topic, partitionId) partitionMap.put( @@ -172,7 +172,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } def removePartition(topic: String, partitionId: Int) { - partitionMapLock.lock() + partitionMapLock.lockInterruptibly() try { partitionMap.remove(TopicAndPartition(topic, partitionId)) } finally { @@ -180,17 +180,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } - def hasPartition(topic: String, partitionId: Int): Boolean = { - partitionMapLock.lock() - try { - partitionMap.get(TopicAndPartition(topic, partitionId)).isDefined - } finally { - partitionMapLock.unlock() - } - } - def partitionCount() = { - partitionMapLock.lock() + partitionMapLock.lockInterruptibly() try { partitionMap.size } finally { diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 5a57bd1..c5cddea 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -58,12 +58,12 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient) - fetcher.stopAllConnections() + fetcher.stopConnections() fetcher.startConnections(topicInfos, cluster) } override def tearDown() { - fetcher.shutdown() + fetcher.stopConnections() super.tearDown }