diff --git core/src/main/scala/kafka/server/AbstractFetcherManager.scala core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 8b26be3..c956a02 100644 --- core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -23,7 +23,7 @@ import kafka.cluster.Broker abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging { // map of (source brokerid, fetcher Id per source broker) => fetcher - private val fetcherThreadMap = new mutable.HashMap[(Int, Int), AbstractFetcherThread] + private val fetcherThreadMap = new mutable.HashMap[(Broker, Int), AbstractFetcherThread] private val mapLock = new Object this.logIdent = "[" + name + "], " @@ -37,7 +37,7 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) { mapLock synchronized { var fetcherThread: AbstractFetcherThread = null - val key = (sourceBroker.id, getFetcherId(topic, partitionId)) + val key = (sourceBroker, getFetcherId(topic, partitionId)) fetcherThreadMap.get(key) match { case Some(f) => fetcherThread = f case None => @@ -64,15 +64,6 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I } } - def fetcherSourceBroker(topic: String, partitionId: Int): Option[Int] = { - mapLock synchronized { - for ( ((sourceBrokerId, _), fetcher) <- fetcherThreadMap) - if (fetcher.hasPartition(topic, partitionId)) - return Some(sourceBrokerId) - } - None - } - def closeAllFetchers() { mapLock synchronized { for ( (_, fetcher) <- fetcherThreadMap) { @@ -81,4 +72,4 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I fetcherThreadMap.clear() } } -} \ No newline at end of file +}