diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index e3af0c3..0f5ebde 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -189,13 +189,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    */
   private def initializePartitionState() {
     for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
-      val topic = topicPartition.topic
-      val partition = topicPartition.partition
       // check if leader and isr path exists for partition. If not, then it is in NEW state
-      ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
-        case Some(currentLeaderAndIsr) =>
+      controllerContext.partitionLeadershipInfo.get(topicPartition) match {
+        case Some(currentLeaderIsrAndEpoch) =>
           // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
-          controllerContext.liveBrokerIds.contains(currentLeaderAndIsr.leader) match {
+          controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader) match {
             case true => // leader is alive
               partitionState.put(topicPartition, OnlinePartition)
             case false =>
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index e237805..f260714 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -59,7 +59,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     initializeReplicaState()
     hasStarted.set(true)
     // move all Online replicas to Online
-    handleStateChanges(ZkUtils.getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq,
+    handleStateChanges(getAllReplicasOnBroker(controllerContext.allTopics.toSeq,
       controllerContext.liveBrokerIds.toSeq), OnlineReplica)
     info("Started replica state machine with initial state -> " + replicaState.toString())
   }
@@ -229,6 +229,16 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     }
   }
 
+  private def getAllReplicasOnBroker(topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = {
+    Set.empty[PartitionAndReplica] ++ brokerIds.map { brokerId =>
+      val partitionsAssignedToThisBroker =
+        controllerContext.partitionReplicaAssignment.filter(p => topics.contains(p._1.topic) && p._2.contains(brokerId))
+      if(partitionsAssignedToThisBroker.size == 0)
+        info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
+      partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1.topic, p._1.partition, brokerId))
+    }.flatten
+  }
+
   def getPartitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = {
     controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq
   }
