diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index a9bb3c8..9ab5bd6 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -215,6 +215,33 @@ class Partition(val topic: String,
     }
   }
 
+  /**
+   *  Check if the local replica is the leader, if yes:
+   *  1. truncate the log to high watermark
+   *  2. set leaderId to None
+   *
+   *  Otherwise:
+   *  1. stop any existing fetcher on this partition from the local replica
+   *
+   *  Note that this function might be called multiple times
+   */
+  def stopReplica(topic: String, partitionId: Int): Boolean = {
+    val localReplica = getOrCreateReplica()
+
+    val leaderReplicaOpt = leaderReplicaIfLocal()
+    leaderReplicaOpt match {
+      case Some(leaderReplica) =>
+        // truncate the log to high watermark
+        localReplica.log.get.truncateTo(localReplica.highWatermark)
+        // mark the current leader as None; we depend on the controller to elect the new leader
+        leaderReplicaIdOpt = None
+      case None =>
+        // stop fetcher thread to previous leader
+        replicaFetcherManager.removeFetcher(topic, partitionId)
+    }
+    true
+  }
+
   def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
     leaderIsrUpdateLock synchronized {
       debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ed1ce0b..0ce4f2e 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -20,12 +20,13 @@ import kafka.network.{Receive, BlockingChannel}
 import kafka.utils.{Logging, ShutdownableThread}
 import collection.mutable.HashMap
 import kafka.cluster.Broker
-import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, BlockingQueue}
 import kafka.server.KafkaConfig
 import collection.mutable
 import kafka.api._
 import org.apache.log4j.Logger
 import kafka.common.TopicAndPartition
+import java.util.concurrent.atomic.AtomicBoolean
 
 class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging {
   private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
@@ -51,7 +52,10 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
       val stateInfoOpt = brokerStateInfo.get(brokerId)
       stateInfoOpt match {
         case Some(stateInfo) =>
-          stateInfo.messageQueue.put((request, callback))
+          if (!stateInfo.beingRemoved)
+            stateInfo.messageQueue.put((request, callback))
+          else
+            warn("Not sending request %s to broker %d, since it is being removed.".format(request, brokerId))
         case None =>
           warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
       }
@@ -70,6 +74,9 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
 
   def removeBroker(brokerId: Int) {
     brokerLock synchronized {
+      // Stop pushing request to the thread and wait for the sender thread to finish all requests
+      brokerStateInfo(brokerId).beingRemoved = true;
+      brokerStateInfo(brokerId).requestSendThread.finishSendAllRequest
       removeExistingBroker(brokerId)
     }
   }
@@ -112,8 +119,23 @@ class RequestSendThread(val controllerId: Int,
   extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) {
   private val lock = new Object()
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
+  val reportFinish: AtomicBoolean = new AtomicBoolean(false)
+  private val finishSendLatch = new CountDownLatch(1)
+
+  def finishSendAllRequest(): Unit = {
+    info("Asking the thread to finish all requests and report")
+    reportFinish.set(true)
+    finishSendLatch.await()
+    info("All requests finished sending")
+  }
 
   override def doWork(): Unit = {
+    // If report finish is set and the queue is full,
+    // notify the caller
+    if (reportFinish.get && queue.isEmpty) {
+      finishSendLatch.countDown
+    }
+
     val queueItem = queue.take()
     val request = queueItem._1
     val callback = queueItem._2
@@ -276,5 +298,7 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
 case class ControllerBrokerStateInfo(channel: BlockingChannel,
                                      broker: Broker,
                                      messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
-                                     requestSendThread: RequestSendThread)
+                                     requestSendThread: RequestSendThread) {
+  var beingRemoved : Boolean = false
+}
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index ab18b7a..2af8b47 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -186,13 +186,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
                 controlledShutdownPartitionLeaderSelector)
             }
             else {
-              // Stop the replica first. The state change below initiates ZK changes which should take some time
-              // before which the stop replica request should be completed (in most cases)
-              brokerRequestBatch.newBatch()
-              brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, topicAndPartition.partition, deletePartition = false)
-              brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
-
               // If the broker is a follower, updates the isr in ZK and notifies the current leader
+              // this will also send the stop replica command to the broker
               replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
                 topicAndPartition.partition, id)), OfflineReplica)
             }
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index c964857..c30e6a4 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -165,6 +165,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           replicaState.put((topic, partition, replicaId), OnlineReplica)
         case OfflineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
+          // send stop replica command to make sure the replica is dropped by the broker
+          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
           // As an optimization, the controller removes dead replicas from the ISR
           val leaderAndIsrIsEmpty: Boolean =
             controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
@@ -267,11 +269,12 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
                 .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
               newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
-              deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
               if(newBrokerIds.size > 0)
                 controller.onBrokerStartup(newBrokerIds.toSeq)
               if(deadBrokerIds.size > 0)
                 controller.onBrokerFailure(deadBrokerIds.toSeq)
+              // Remove dead broker after the handling broker failure since it might be in GC not actually failed
+              deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
             } catch {
               case e => error("Error while handling broker changes", e)
             }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f551243..5b93c21 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -114,16 +114,17 @@ class ReplicaManager(val config: KafkaConfig,
     val errorCode = ErrorMapping.NoError
     getReplica(topic, partitionId) match {
       case Some(replica) =>
-        replicaFetcherManager.removeFetcher(topic, partitionId)
-        /* TODO: handle deleteLog in a better way */
-        //if (deletePartition)
-        //  logManager.deleteLog(topic, partition)
+        replica.partition.stopReplica(topic, partitionId)
+        // update the leader partition cache
         leaderPartitionsLock synchronized {
           leaderPartitions -= replica.partition
         }
+        /* TODO: handle deleteLog in a better way */
+        //if (deletePartition)
+        //  logManager.deleteLog(topic, partition)
         if(deletePartition)
           allPartitions.remove((topic, partitionId))
-      case None => //do nothing if replica no longer exists
+      case None => // do nothing if replica no longer exists
     }
     stateChangeLogger.trace("Broker %d finished handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId))
     errorCode
