diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 5146f12..199640b 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -138,7 +138,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case NewReplica => // add this replica to the assigned replicas list for its partition val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) - controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) + if(!currentAssignedReplicas.contains(replicaId)) + controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" .format(controllerId, controller.epoch, replicaId, topicAndPartition)) case _ =>