Index: core/src/main/scala/kafka/controller/KafkaController.scala =================================================================== --- core/src/main/scala/kafka/controller/KafkaController.scala (revision 1394632) +++ core/src/main/scala/kafka/controller/KafkaController.scala (working copy) @@ -147,8 +147,20 @@ info("New partition creation callback for %s".format(newPartitions.mkString(","))) partitionStateMachine.handleStateChanges(newPartitions, NewPartition) partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition) + val replicas = getAllReplicasForPartition(newPartitions) + replicas.foreach { + case(topic, partitionId, replicaId) => + replicaStateMachine.replicaState.put((topic, partitionId, replicaId), OnlineReplica) + } } + private def getAllReplicasForPartition(partitions: Seq[(String, Int)]): Seq[Tuple3[String, Int, Int]] = { + partitions.map { p => + val replicas = controllerContext.partitionReplicaAssignment(p) + replicas.map(r => (p._1, p._2, r)) + }.flatten + } + /* TODO: kafka-330 This API is unused until we introduce the delete topic functionality. remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/ def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) { Index: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala =================================================================== --- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (revision 1394632) +++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (working copy) @@ -100,7 +100,7 @@ * @param replicaId The replica for which the state transition is invoked * @param targetState The end state that the replica should be moved to */ - private def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) { + def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) { try { targetState match { case OnlineReplica =>