diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ed1ce0b..cbb3e8a 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -160,7 +160,7 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
   def newBatch() {
-    // raise error if the previous batch is not empty
+    // raise error if the previous batch is not empty; also do not need to clear the batch afterwards.
     if(leaderAndIsrRequestMap.size > 0)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
         "a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
@@ -173,10 +173,6 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
     if(stopAndDeleteReplicaRequestMap.size > 0)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
         "new one. Some StopReplica with delete state changes %s might be lost ".format(stopAndDeleteReplicaRequestMap.toString()))
-    leaderAndIsrRequestMap.clear()
-    stopReplicaRequestMap.clear()
-    updateMetadataRequestMap.clear()
-    stopAndDeleteReplicaRequestMap.clear()
   }
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index bde405a..25d8e82 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -282,7 +282,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     sendUpdateMetadataRequest(newBrokers)
     // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
     // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
-    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), OnlineReplica)
+    replicaStateMachine.handleStateChanges(replicaStateMachine.getAllReplicasOnBroker(controllerContext.allTopics.toSeq, newBrokers), OnlineReplica)
     // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
     // to see if these brokers can become leaders for some/all of those
     partitionStateMachine.triggerOnlinePartitionStateChange()
@@ -320,7 +320,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()
     // handle dead replicas
-    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica)
+    replicaStateMachine.handleStateChanges(replicaStateMachine.getAllReplicasOnBroker(controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica)
   }
 
   /**
@@ -337,8 +337,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   /**
-   * This callback is invoked by the topic change callback with the list of failed brokers as input.
-   * It does the following -
+   * This callback is invoked by the partition change and new partition creation callback with the list of
+   * new partitions as input. It does the following -
    * 1. Move the newly created partitions to the NewPartition state
    * 2. Move the newly created partitions from NewPartition->OnlinePartition state
    */
@@ -595,8 +595,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     // stop watching the ISR changes for this partition
     zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
       controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
-    // update the assigned replica list
-    controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas)
   }
 
   private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition,
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index a084830..2ccd482 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -272,8 +272,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   }
 
   /**
-   * Invoked on the OfflinePartition->OnlinePartition state change. It invokes the leader election API to elect a leader
-   * for the input offline partition
+   * Invoked on the OfflinePartition->OnlinePartition or OnlinePartition->OnlinePartition state change.
+   * It invokes the leader election API to elect a leader for the input offline partition
    * @param topic               The topic of the offline partition
    * @param partition           The offline partition
    * @param leaderSelector      Specific leader selector (e.g., offline/reassigned/etc.)
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 9f752f4..5fee74f 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -132,9 +132,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
           // send stop replica command
           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true)
-          // remove this replica from the assigned replicas list for its partition
-          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-          controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
+          // do not need to remove this replica from the assigned replicas list for its partition for now
+          // since this is only triggered by the partition reassignment tool and the assignment will be updated later along with ZK update
           replicaState.remove((topic, partition, replicaId))
           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NonExistentReplica"
                                     .format(controllerId, controller.epoch, replicaId, topicAndPartition))
@@ -142,10 +141,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
           replicaState((topic, partition, replicaId)) match {
             case NewReplica =>
-              // add this replica to the assigned replicas list for its partition
-              val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-              if(!currentAssignedReplicas.contains(replicaId))
-                controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
+              // do not need to add this replica to the assigned replicas list for its partition for now
+              // since this is only triggered by the partition reassignment tool or on new broker startup,
+              // and in the former/latter case the assignment will be updated later/has already be updated along with ZK update
               stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
                                         .format(controllerId, controller.epoch, replicaId, topicAndPartition))
             case _ =>
@@ -154,7 +152,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                 case Some(leaderIsrAndControllerEpoch) =>
                   brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
                     replicaAssignment)
-                  replicaState.put((topic, partition, replicaId), OnlineReplica)
                   stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
                     .format(controllerId, controller.epoch, replicaId, topicAndPartition))
                 case None => // that means the partition was never in OnlinePartition state, this means the broker never
@@ -228,7 +225,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     }
   }
 
-  private def getAllReplicasOnBroker(topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = {
+  def getAllReplicasOnBroker(topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = {
     brokerIds.map { brokerId =>
       val partitionsAssignedToThisBroker =
         controllerContext.partitionReplicaAssignment.filter(p => topics.contains(p._1.topic) && p._2.contains(brokerId))
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ba5eacc..52705ed 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -662,6 +662,7 @@ object ZkUtils extends Logging {
     }
   }
 
+  /** Currently not used any more */
   def getAllReplicasOnBroker(zkClient: ZkClient, topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = {
     Set.empty[PartitionAndReplica] ++ brokerIds.map { brokerId =>
       // read all the partitions and their assigned replicas into a map organized by
