diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index f334685..f92ddfb 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -164,18 +164,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       debug("Partitions to move leadership from broker %d: %s".format(id, partitionsToMove.mkString(",")))
 
       partitionsToMove.foreach { topicAndPartition =>
-        val (topic, partition) = topicAndPartition.asTuple
         // move leadership serially to relinquish lock.
         controllerContext.controllerLock synchronized {
           controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
             if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
               partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
                 controlledShutdownPartitionLeaderSelector)
-              val newLeaderIsrAndControllerEpoch = controllerContext.partitionLeadershipInfo(topicAndPartition)
-
-              // mark replica offline only if leadership was moved successfully
-              if (newLeaderIsrAndControllerEpoch.leaderAndIsr.leader != currLeaderIsrAndControllerEpoch.leaderAndIsr.leader)
-                replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica)
             } else
               debug("Partition %s moved from leader %d to new leader %d during shutdown."
                 .format(topicAndPartition, id, currLeaderIsrAndControllerEpoch.leaderAndIsr.leader))
@@ -192,23 +186,16 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       * to wait until completion.
       */
       if (partitionsRemaining.size == 0) {
-        brokerRequestBatch.newBatch()
-        allPartitionsAndReplicationFactorOnBroker foreach {
-          case(topicAndPartition, replicationFactor) =>
-            val (topic, partition) = topicAndPartition.asTuple
-            if (controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader != id) {
-              brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
-              removeReplicaFromIsr(topic, partition, id) match {
-                case Some(updatedLeaderIsrAndControllerEpoch) =>
-                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
-                    Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition,
-                    updatedLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(topicAndPartition))
-                case None =>
-                // ignore
-              }
-            }
+        controllerContext.controllerLock synchronized {
+          brokerRequestBatch.newBatch()
+          allPartitionsAndReplicationFactorOnBroker foreach {
+            case(topicAndPartition, replicationFactor) =>
+              val (topic, partition) = topicAndPartition.asTuple
+              if (controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader != id)
+                brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
+          }
+          brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
         }
-        brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
       }
 
       debug("Remaining partitions to move from broker %d: %s".format(id, partitionsRemaining.mkString(",")))
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index e237805..927eac0 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -83,7 +83,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
    * @param targetState  The state that the replicas should be moved to
    * The controller's allLeaders cache should have been updated before this
    */
-  def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState) {
+  def handleStateChanges(replicas: immutable.Set[PartitionAndReplica], targetState: ReplicaState) {
     info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
     try {
       brokerRequestBatch.newBatch()
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 4f6fcd4..9d1072e 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -618,15 +618,15 @@ object ZkUtils extends Logging {
     }
   }
 
-  def getAllReplicasOnBroker(zkClient: ZkClient, topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = {
-    Set.empty[PartitionAndReplica] ++ brokerIds.map { brokerId =>
+  def getAllReplicasOnBroker(zkClient: ZkClient, topics: Seq[String], brokerIds: Seq[Int]): immutable.Set[PartitionAndReplica] = {
+    brokerIds.map { brokerId =>
       // read all the partitions and their assigned replicas into a map organized by
       // { replica id -> partition 1, partition 2...
       val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(zkClient, topics, brokerId)
       if(partitionsAssignedToThisBroker.size == 0)
         info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
       partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1, p._2, brokerId))
-    }.flatten
+    }.flatten.toSet
   }
   
   def getPartitionsUndergoingPreferredReplicaElection(zkClient: ZkClient): Set[TopicAndPartition] = {
