diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index be872dc..4269219 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -46,13 +46,13 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I fetcherThread.start } fetcherThread.addPartition(topic, partitionId, initialOffset) - info("adding fetcher on topic %s, partition %d, initOffset %d to broker %d with fetcherId %d" + info("Adding fetcher for partition [%s,%d], initOffset %d to broker %d with fetcherId %d" .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId)) } } def removeFetcher(topic: String, partitionId: Int) { - info("removing fetcher on topic %s, partition %d".format(topic, partitionId)) + info("Removing fetcher for partition [%s,%d]".format(topic, partitionId)) mapLock synchronized { for ((key, fetcher) <- fetcherThreadMap) { fetcher.removePartition(topic, partitionId) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6d849ac..bbfdbe4 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -25,7 +25,7 @@ import kafka.utils._ import kafka.log.LogManager import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge -import java.util.concurrent.TimeUnit +import java.util.concurrent.{CopyOnWriteArraySet, TimeUnit} import kafka.common._ import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} import kafka.controller.KafkaController @@ -45,6 +45,7 @@ class ReplicaManager(val config: KafkaConfig, @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 private val localBrokerId = config.brokerId private val allPartitions = new Pool[(String, Int), Partition] + private val stoppedPartitions = new CopyOnWriteArraySet[(String, Int)]() private var leaderPartitions = new mutable.HashSet[Partition]() private val leaderPartitionsLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) @@ -67,7 +68,7 @@ class ReplicaManager(val config: KafkaConfig, newGauge( "PartitionCount", new Gauge[Int] { - def getValue = allPartitions.size + def getValue = allPartitions.size - stoppedPartitions.size() } ) newGauge( @@ -120,11 +121,9 @@ class ReplicaManager(val config: KafkaConfig, leaderPartitionsLock synchronized { leaderPartitions -= replica.partition } - allPartitions.remove((topic, partitionId)) - info("After removing partition [%s,%d], the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions)) case None => //do nothing if replica no longer exists } - stateChangeLogger.trace("Broker %d finished handling stop replica [%s,%d]".format(localBrokerId, topic, partitionId)) + stateChangeLogger.trace("Broker %d finished handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId)) errorCode } @@ -142,6 +141,7 @@ class ReplicaManager(val config: KafkaConfig, val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions) responseMap.put((topic, partitionId), errorCode) } + stoppedPartitions.addAll(scala.collection.JavaConversions.asCollection(stopReplicaRequest.partitions)) (responseMap, ErrorMapping.NoError) } } @@ -159,8 +159,13 @@ class ReplicaManager(val config: KafkaConfig, val partition = allPartitions.get((topic, partitionId)) if (partition == null) None - else - Some(partition) + else { + // check if the partitions has been removed using stop replica request. If so, do not return it + if(stoppedPartitions.contains((topic, partitionId))) + None + else + Some(partition) + } } def getReplicaOrException(topic: String, partition: Int): Replica = { @@ -168,7 +173,7 @@ class ReplicaManager(val config: KafkaConfig, if(replicaOpt.isDefined) return replicaOpt.get else - throw new ReplicaNotAvailableException("Replica %d is not available for partiton [%s,%d] yet".format(config.brokerId, topic, partition)) + throw new ReplicaNotAvailableException("Replica %d is not available for partition [%s,%d]".format(config.brokerId, topic, partition)) } def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica = { @@ -230,10 +235,9 @@ class ReplicaManager(val config: KafkaConfig, errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) } responseMap.put(topicAndPartition, errorCode) - leaderAndISRRequest.partitionStateInfos.foreach(p => - stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]" - .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, - leaderAndISRRequest.controllerEpoch, p._1._1, p._1._2))) + stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]" + .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, + topicAndPartition._1, topicAndPartition._2)) } info("Handled leader and isr request %s".format(leaderAndISRRequest)) // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions