diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 2ca7ee6..597990c 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -179,13 +179,13 @@ class Partition(val topic: String, // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch + // make sure local replica exists + val localReplica = getOrCreateReplica() val newLeaderBrokerId: Int = leaderAndIsr.leader liveBrokers.find(_.id == newLeaderBrokerId) match { case Some(leaderBroker) => // stop fetcher thread to previous leader replicaFetcherManager.removeFetcher(topic, partitionId) - // make sure local replica exists - val localReplica = getOrCreateReplica() localReplica.log.get.truncateTo(localReplica.highWatermark) inSyncReplicas = Set.empty[Replica] leaderEpoch = leaderAndIsr.leaderEpoch