diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 161f581..54f6e16 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -328,8 +328,7 @@ class ReplicaManager(val config: KafkaConfig, * the error message will be set on each partition since we do not know which partition caused it * TODO: the above may need to be fixed later */ - private def makeFollowers(controllerId: Int, epoch: Int, - partitionState: Map[Partition, PartitionStateInfo], + private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) { stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-follower transition for partitions %s") @@ -351,11 +350,22 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) if (!isShuttingDown.get()) { - replicaFetcherManager.addFetcherForPartitions(partitionState.map{ case(partition, partitionStateInfo) => - new TopicAndPartition(partition) -> - BrokerAndInitialOffset(leaders.find(_.id == partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader).get, - partition.getReplica().get.logEndOffset)} - ) + val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]() + partitionState.foreach { + case (partition, partitionStateInfo) => + val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader + leaders.find(_.id == leader) match { + case Some(leaderBroker) => + partitionAndOffsets.put(new TopicAndPartition(partition), + BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset)) + case None => + stateChangeLogger.trace("Broker %d ignored the become-follower state change with correlation id %d " + + "controller %d epoch %d for topic-partition %s since the designated leader %d " + + "cannot be found in live or shutting down brokers %s" + .format(localBrokerId, correlationId, controllerId, epoch, partition, leader, leaders)) + } + } + replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) } else { stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " +