diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index be872dc..4269219 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -46,13 +46,13 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I fetcherThread.start } fetcherThread.addPartition(topic, partitionId, initialOffset) - info("adding fetcher on topic %s, partition %d, initOffset %d to broker %d with fetcherId %d" + info("Adding fetcher for partition [%s,%d], initOffset %d to broker %d with fetcherId %d" .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId)) } } def removeFetcher(topic: String, partitionId: Int) { - info("removing fetcher on topic %s, partition %d".format(topic, partitionId)) + info("Removing fetcher for partition [%s,%d]".format(topic, partitionId)) mapLock synchronized { for ((key, fetcher) <- fetcherThreadMap) { fetcher.removePartition(topic, partitionId) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6d849ac..4a41bde 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -120,11 +120,11 @@ class ReplicaManager(val config: KafkaConfig, leaderPartitionsLock synchronized { leaderPartitions -= replica.partition } - allPartitions.remove((topic, partitionId)) - info("After removing partition [%s,%d], the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions)) + if(deletePartition) + allPartitions.remove((topic, partitionId)) case None => //do nothing if replica no longer exists } - stateChangeLogger.trace("Broker %d finished handling stop replica [%s,%d]".format(localBrokerId, topic, partitionId)) + stateChangeLogger.trace("Broker %d finished handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId)) errorCode } @@ -168,7 +168,7 @@ class ReplicaManager(val config: KafkaConfig, if(replicaOpt.isDefined) return replicaOpt.get else - throw new ReplicaNotAvailableException("Replica %d is not available for partiton [%s,%d] yet".format(config.brokerId, topic, partition)) + throw new ReplicaNotAvailableException("Replica %d is not available for partition [%s,%d]".format(config.brokerId, topic, partition)) } def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica = { @@ -230,10 +230,9 @@ class ReplicaManager(val config: KafkaConfig, errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) } responseMap.put(topicAndPartition, errorCode) - leaderAndISRRequest.partitionStateInfos.foreach(p => - stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]" - .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, - leaderAndISRRequest.controllerEpoch, p._1._1, p._1._2))) + stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]" + .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, + topicAndPartition._1, topicAndPartition._2)) } info("Handled leader and isr request %s".format(leaderAndISRRequest)) // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions