diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 161f581..19607a9 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -328,8 +328,7 @@ class ReplicaManager(val config: KafkaConfig, * the error message will be set on each partition since we do not know which partition caused it * TODO: the above may need to be fixed later */ - private def makeFollowers(controllerId: Int, epoch: Int, - partitionState: Map[Partition, PartitionStateInfo], + private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) { stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-follower transition for partitions %s") @@ -351,11 +350,21 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) if (!isShuttingDown.get()) { - replicaFetcherManager.addFetcherForPartitions(partitionState.map{ case(partition, partitionStateInfo) => - new TopicAndPartition(partition) -> - BrokerAndInitialOffset(leaders.find(_.id == partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader).get, - partition.getReplica().get.logEndOffset)} - ) + val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]() + partitionState.foreach { + case (partition, partitionStateInfo) => + val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader + if (leaders.find(_.id == leader) != None) { + partitionAndOffsets(new TopicAndPartition(partition)) = + BrokerAndInitialOffset(leaders.find(_.id == leader).get, partition.getReplica().get.logEndOffset) + } else { + stateChangeLogger.trace("Broker %d ignored the become-follower state change with correlation id %d " + + "controller %d epoch %d for topic-partition %s since the designated leader %d" + + "cannot be found in live or shutting down brokers %s" + .format(localBrokerId, correlationId, controllerId, epoch, leader, partition, leaders)) + } + } + replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) } else { stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " +