Index: core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (revision 1408865) +++ core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (working copy) @@ -75,6 +75,8 @@ addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) } noLeaderPartitionSet --= leaderForPartitionsMap.keySet + + shutdownEmptyFetcherThread() } catch { case t => warn("Failed to find leader for %s".format(noLeaderPartitionSet), t) } @@ -136,6 +138,7 @@ lock.lock() try { if (partitionMap != null) { + partitionList.foreach(tp => removeFetcher(tp.topic, tp.partition)) noLeaderPartitionSet ++= partitionList cond.signalAll() } Index: core/src/main/scala/kafka/server/AbstractFetcherManager.scala =================================================================== --- core/src/main/scala/kafka/server/AbstractFetcherManager.scala (revision 1408865) +++ core/src/main/scala/kafka/server/AbstractFetcherManager.scala (working copy) @@ -23,7 +23,7 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging { // map of (source brokerid, fetcher Id per source broker) => fetcher - private val fetcherThreadMap = new mutable.HashMap[(Broker, Int), AbstractFetcherThread] + private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread] private val mapLock = new Object this.logIdent = "[" + name + "] " @@ -37,17 +37,17 @@ def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) { mapLock synchronized { var fetcherThread: AbstractFetcherThread = null - val key = (sourceBroker, getFetcherId(topic, partitionId)) + val key = new BrokerAndFetcherId(sourceBroker, getFetcherId(topic, partitionId)) fetcherThreadMap.get(key) match { case Some(f) => fetcherThread = f case None => - fetcherThread = createFetcherThread(key._2, sourceBroker) + fetcherThread = createFetcherThread(key.fetcherId, sourceBroker) fetcherThreadMap.put(key, fetcherThread) fetcherThread.start } fetcherThread.addPartition(topic, partitionId, initialOffset) info("adding fetcher on topic %s, partion %d, initOffset %d to broker %d with fetcherId %d" - .format(topic, partitionId, initialOffset, sourceBroker.id, key._2)) + .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId)) } } @@ -56,11 +56,20 @@ mapLock synchronized { for ((key, fetcher) <- fetcherThreadMap) { fetcher.removePartition(topic, partitionId) + } + } + } + + def shutdownEmptyFetcherThread() { + mapLock synchronized { + val keysToBeRemoved = new mutable.HashSet[BrokerAndFetcherId] + for ((key, fetcher) <- fetcherThreadMap) { if (fetcher.partitionCount <= 0) { fetcher.shutdown() - fetcherThreadMap.remove(key) + keysToBeRemoved += key } } + fetcherThreadMap --= keysToBeRemoved } } @@ -73,3 +82,5 @@ } } } + +case class BrokerAndFetcherId(broker: Broker, fetcherId: Int) \ No newline at end of file Index: core/src/main/scala/kafka/server/ReplicaManager.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaManager.scala (revision 1408865) +++ core/src/main/scala/kafka/server/ReplicaManager.scala (working copy) @@ -181,6 +181,9 @@ responseMap.put(topicAndPartition, errorCode) } + info("Completed leader and isr request %s".format(leaderAndISRRequest)) + replicaFetcherManager.shutdownEmptyFetcherThread() + /** * If IsInit flag is on, this means that the controller wants to treat topics not in the request * as deleted. Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1408865) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -152,6 +152,8 @@ val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) + + replicaManager.replicaFetcherManager.shutdownEmptyFetcherThread() } /** Index: core/src/main/scala/kafka/server/AbstractFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala (revision 1408865) +++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala (working copy) @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong import kafka.utils.{Pool, ShutdownableThread} import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock /** @@ -39,16 +40,11 @@ extends ShutdownableThread(name) { private val fetchMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map - private val fetchMapLock = new Object + private val fetchMapLock = new ReentrantLock + private val fetchMapCond = fetchMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize) val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id) - val fetchRequestuilder = new FetchRequestBuilder(). - clientId(clientId). - replicaId(fetcherBrokerId). - maxWait(maxWait). - minBytes(minBytes) - /* callbacks to be defined in subclass */ // process fetched data @@ -67,12 +63,23 @@ } override def doWork() { - fetchMapLock synchronized { + val fetchRequestuilder = new FetchRequestBuilder(). + clientId(clientId). + replicaId(fetcherBrokerId). + maxWait(maxWait). + minBytes(minBytes) + + fetchMapLock.lock() + try { + while (fetchMap.isEmpty) + fetchMapCond.await() fetchMap.foreach { case((topicAndPartition, offset)) => fetchRequestuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, offset, fetchSize) } + } finally { + fetchMapLock.unlock() } val fetchRequest = fetchRequestuilder.build() @@ -87,7 +94,6 @@ if (isRunning.get) { fetchMapLock synchronized { partitionsWithError ++= fetchMap.keys - fetchMap.clear() } } } @@ -95,7 +101,8 @@ if (response != null) { // process fetched data - fetchMapLock synchronized { + fetchMapLock.lock() + try { response.data.foreach { case(topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple @@ -120,13 +127,14 @@ warn("current offset %d for topic %s partition %d out of range; reset offset to %d" .format(currentOffset.get, topic, partitionId, newOffset)) case _ => - error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), + warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), ErrorMapping.exceptionFor(partitionData.error)) partitionsWithError += topicAndPartition - fetchMap.remove(topicAndPartition) } } } + } finally { + fetchMapLock.unlock() } } @@ -137,27 +145,44 @@ } def addPartition(topic: String, partitionId: Int, initialOffset: Long) { - fetchMapLock synchronized { + fetchMapLock.lock() + try { fetchMap.put(TopicAndPartition(topic, partitionId), initialOffset) + fetchMapCond.signalAll() + } finally { + fetchMapLock.unlock() } } def removePartition(topic: String, partitionId: Int) { - fetchMapLock synchronized { + fetchMapLock.lock() + try { fetchMap.remove(TopicAndPartition(topic, partitionId)) + } finally { + fetchMapLock.unlock() } } def hasPartition(topic: String, partitionId: Int): Boolean = { - fetchMapLock synchronized { - fetchMap.get(TopicAndPartition(topic, partitionId)).isDefined + var hasPart = false + fetchMapLock.lock() + try { + hasPart = fetchMap.get(TopicAndPartition(topic, partitionId)).isDefined + } finally { + fetchMapLock.unlock() } + hasPart } def partitionCount() = { - fetchMapLock synchronized { - fetchMap.size + var partCount = 0 + fetchMapLock.lock() + try { + partCount = fetchMap.size + } finally { + fetchMapLock.unlock() } + partCount } }