diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 6e73003..2ca7ee6 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -195,7 +195,7 @@ class Partition(val topic: String, replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) case None => // leader went down stateChangeLogger.trace("Broker %d aborted the become-follower state change with correlation id %d from " + - " controller %d epoch %d since leader %d for partition [%s,%d] became unavailable during the state change operation" + " controller %d epoch %d since leader %d for partition [%s,%d] is unavailable during the state change operation" .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, newLeaderBrokerId, topic, partitionId)) } diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 199640b..3cf1da3 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -143,21 +143,18 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" .format(controllerId, controller.epoch, replicaId, topicAndPartition)) case _ => - // check if the leader for this partition is alive or even exists - controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { + // check if the leader for this partition ever existed + controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some(leaderIsrAndControllerEpoch) => - controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match { - case true => // leader is alive - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), - topic, partition, leaderIsrAndControllerEpoch, - replicaAssignment.size) - replicaState.put((topic, partition, replicaId), OnlineReplica) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) - case false => // ignore partitions whose leader is not alive - } - case None => // ignore partitions who don't have a leader yet + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, + replicaAssignment.size) + replicaState.put((topic, partition, replicaId), OnlineReplica) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + case None => // that means the partition was never in OnlinePartition state, this means the broker never + // started a log for that partition and does not have a high watermark value for this partition } + } replicaState.put((topic, partition, replicaId), OnlineReplica) case OfflineReplica =>