diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 02d2c44..f0346fa 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -347,7 +347,7 @@ class Partition(val topic: String,
     val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
       ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
       ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
-    if (updateSucceeded){
+    if (updateSucceeded) {
       inSyncReplicas = newIsr
       zkVersion = newVersion
       trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index f334685..d51e7ef 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -164,18 +164,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       debug("Partitions to move leadership from broker %d: %s".format(id, partitionsToMove.mkString(",")))
 
       partitionsToMove.foreach { topicAndPartition =>
-        val (topic, partition) = topicAndPartition.asTuple
         // move leadership serially to relinquish lock.
         controllerContext.controllerLock synchronized {
           controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
             if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
               partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
                 controlledShutdownPartitionLeaderSelector)
-              val newLeaderIsrAndControllerEpoch = controllerContext.partitionLeadershipInfo(topicAndPartition)
-
-              // mark replica offline only if leadership was moved successfully
-              if (newLeaderIsrAndControllerEpoch.leaderAndIsr.leader != currLeaderIsrAndControllerEpoch.leaderAndIsr.leader)
-                replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica)
             } else
               debug("Partition %s moved from leader %d to new leader %d during shutdown."
                 .format(topicAndPartition, id, currLeaderIsrAndControllerEpoch.leaderAndIsr.leader))
@@ -192,23 +186,24 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       * to wait until completion.
       */
       if (partitionsRemaining.size == 0) {
-        brokerRequestBatch.newBatch()
-        allPartitionsAndReplicationFactorOnBroker foreach {
-          case(topicAndPartition, replicationFactor) =>
-            val (topic, partition) = topicAndPartition.asTuple
-            if (controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader != id) {
-              brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
-              removeReplicaFromIsr(topic, partition, id) match {
-                case Some(updatedLeaderIsrAndControllerEpoch) =>
-                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
-                    Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition,
-                    updatedLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(topicAndPartition))
-                case None =>
-                // ignore
+        controllerContext.controllerLock synchronized {
+          brokerRequestBatch.newBatch()
+          allPartitionsAndReplicationFactorOnBroker foreach {
+            case(topicAndPartition, replicationFactor) =>
+              val (topic, partition) = topicAndPartition.asTuple
+              val currentLeaderAndIsr = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr
+              val reducedLeaderAndIsr = LeaderAndIsr(currentLeaderAndIsr.leader, currentLeaderAndIsr.leaderEpoch+1,
+                                            currentLeaderAndIsr.isr.filter(_ != id), currentLeaderAndIsr.zkVersion+1)
+              val newLeaderIsrAndEpoch = LeaderIsrAndControllerEpoch(reducedLeaderAndIsr, epoch)
+              controllerContext.partitionLeadershipInfo.put(topicAndPartition, newLeaderIsrAndEpoch)
+              if (controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader != id) {
+                brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
+                brokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(currentLeaderAndIsr.leader), topic, partition,
+                  newLeaderIsrAndEpoch, controllerContext.partitionReplicaAssignment(topicAndPartition))
               }
-            }
+          }
+          brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
         }
-        brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
       }
 
       debug("Remaining partitions to move from broker %d: %s".format(id, partitionsRemaining.mkString(",")))
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 95e7218..483fa43 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -340,11 +340,13 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val controllerId = ZkUtils.getController(zkClient)
     val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
     var partitionsRemaining = controller.shutdownBroker(2)
+    val leaderId = controller.controllerContext.partitionLeadershipInfo(TopicAndPartition(topic, partition)).leaderAndIsr.leader
     var activeServers = servers.filter(s => s.config.brokerId != 2)
+    val leaderBroker = servers.filter(s => s.config.brokerId == leaderId).head
     try {
       // wait for the update metadata request to trickle to the brokers
       assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() =>
-        activeServers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000))
+        leaderBroker.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3, 1000))
       assertEquals(0, partitionsRemaining)
       var partitionStateInfo = activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition))
       var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
