diff --git core/src/main/scala/kafka/admin/ShutdownBroker.scala core/src/main/scala/kafka/admin/ShutdownBroker.scala
new file mode 100644
index 0000000..9571fd5
--- /dev/null
+++ core/src/main/scala/kafka/admin/ShutdownBroker.scala
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+
+import joptsimple.OptionParser
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import javax.management.remote.{JMXServiceURL, JMXConnectorFactory}
+import javax.management.ObjectName
+import kafka.controller.KafkaController
+import scala.Some
+
+
+object ShutdownBroker extends Logging {
+
+  private case class ShutdownParams(zkConnect: String, brokerId: java.lang.Integer, jmxUrl: String)
+
+  private def invokeShutdown(params: ShutdownParams): Boolean = {
+    var zkClient: ZkClient = null
+    try {
+      zkClient = new ZkClient(params.zkConnect, 30000, 30000, ZKStringSerializer)
+      val controllerBrokerId = ZkUtils.getController(zkClient)
+      val controllerOpt = ZkUtils.getBrokerInfo(zkClient, controllerBrokerId)
+      controllerOpt match {
+        case Some(controller) =>
+          val jmxUrl = new JMXServiceURL(params.jmxUrl)
+          val jmxc = JMXConnectorFactory.connect(jmxUrl, null)
+          val mbsc = jmxc.getMBeanServerConnection
+          val leaderPartitionsRemaining = mbsc.invoke(new ObjectName(KafkaController.MBeanName),
+            "shutdownBroker",
+            Array(params.brokerId),
+            Array(classOf[Int].getName)).asInstanceOf[Int]
+          val shutdownComplete = (leaderPartitionsRemaining == 0)
+          info("Shutdown status: " + (if (shutdownComplete)
+                  "complete" else
+                  "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining)))
+          shutdownComplete
+        case None =>
+          error("Operation failed due to controller failure on %d.".format(controllerBrokerId))
+          false
+      }
+    }
+    catch {
+      case t: Throwable =>
+        error("Operation failed due to %s.".format(t.getMessage), t)
+        false
+    }
+    finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+
+  def main(args: Array[String]) {
+    val parser = new OptionParser
+    val brokerOpt = parser.accepts("broker", "REQUIRED: The broker to shutdown.")
+            .withRequiredArg
+            .describedAs("Broker Id")
+            .ofType(classOf[java.lang.Integer])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+            "Multiple URLS can be given to allow fail-over.")
+            .withRequiredArg
+            .describedAs("urls")
+            .ofType(classOf[String])
+    val numRetriesOpt = parser.accepts("num.retries", "Number of attempts to retry if shutdown does not complete.")
+            .withRequiredArg
+            .describedAs("number of retries")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(0)
+    val retryIntervalOpt = parser.accepts("retry.interval.ms", "Retry interval if retries requested.")
+            .withRequiredArg
+            .describedAs("retry interval in ms (> 1000)")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(1000)
+    val jmxUrlOpt = parser.accepts("jmx.url", "Controller's JMX URL.")
+            .withRequiredArg
+            .describedAs("JMX url.")
+            .ofType(classOf[String])
+            .defaultsTo("service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi")
+
+    val options = parser.parse(args : _*)
+    CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt)
+
+    val retryIntervalMs = options.valueOf(retryIntervalOpt).intValue.max(1000)
+    val numRetries = options.valueOf(numRetriesOpt).intValue
+
+    val shutdownParams =
+      ShutdownParams(options.valueOf(zkConnectOpt), options.valueOf(brokerOpt), options.valueOf(jmxUrlOpt))
+
+    if (!invokeShutdown(shutdownParams)) {
+      (1 to numRetries).takeWhile(attempt => {
+        info("Retry " + attempt)
+        try {
+          Thread.sleep(retryIntervalMs)
+        }
+        catch {
+          case ie: InterruptedException => // ignore
+        }
+        !invokeShutdown(shutdownParams)
+      })
+    }
+  }
+
+}
+
diff --git core/src/main/scala/kafka/api/StopReplicaRequest.scala core/src/main/scala/kafka/api/StopReplicaRequest.scala
index c3db6f9..4c8b1df 100644
--- core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -20,8 +20,10 @@ package kafka.api
 
 import java.nio._
 import kafka.api.ApiUtils._
+import kafka.utils.{Utils, Logging}
 
-object StopReplicaRequest {
+
+object StopReplicaRequest extends Logging {
   val CurrentVersion = 1.shortValue()
   val DefaultClientId = ""
   val DefaultAckTimeout = 100
@@ -30,28 +32,39 @@ object StopReplicaRequest {
     val versionId = buffer.getShort
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val deletePartitions = buffer.get match {
+      case 1 => true
+      case 0 => false
+      case x =>
+        warn("Invalid byte %d in delete partitions field. (Assuming false.)".format(x))
+        false
+    }
     val topicPartitionPairCount = buffer.getInt
     val topicPartitionPairSet = new collection.mutable.HashSet[(String, Int)]()
-    for (i <- 0 until topicPartitionPairCount)
+    (1 to topicPartitionPairCount) foreach { _ =>
       topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
-    new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet.toSet)
+    }
+    StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet)
   }
 }
 
 case class StopReplicaRequest(versionId: Short,
                               clientId: String,
                               ackTimeoutMs: Int,
+                              deletePartitions: Boolean,
                               partitions: Set[(String, Int)])
         extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
-  def this(partitions: Set[(String, Int)]) = {
+
+  def this(deletePartitions: Boolean, partitions: Set[(String, Int)]) = {
     this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
-        partitions)
+         deletePartitions, partitions)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
     buffer.putInt(partitions.size)
     for ((topic, partitionId) <- partitions){
       writeShortString(buffer, topic)
@@ -60,9 +73,15 @@ case class StopReplicaRequest(versionId: Short,
   }
 
   def sizeInBytes(): Int = {
-    var size = 2 + (2 + clientId.length()) + 4 + 4
+    var size =
+      2 + /* versionId */
+      ApiUtils.shortStringLength(clientId) +
+      4 + /* ackTimeoutMs */
+      1 + /* deletePartitions */
+      4 /* partition count */
     for ((topic, partitionId) <- partitions){
-      size += (2 + topic.length()) + 4
+      size += (ApiUtils.shortStringLength(topic)) +
+              4 /* partition id */
     }
     size
   }
diff --git core/src/main/scala/kafka/common/BrokerNotExistException.scala core/src/main/scala/kafka/common/BrokerNotExistException.scala
deleted file mode 100644
index e69de29..0000000
diff --git core/src/main/scala/kafka/controller/ControllerChannelManager.scala core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 3791a03..b7a2a5b 100644
--- core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -123,7 +123,7 @@ class RequestSendThread(val controllerId: Int,
           case RequestKeys.StopReplicaKey =>
             response = StopReplicaResponse.readFrom(receive.buffer)
         }
-        trace("got a response %s".format(controllerId, response, toBrokerId))
+        trace("Controller %d request to broker %d got a response %s".format(controllerId, toBrokerId, response))
 
         if(callback != null){
           callback(response)
@@ -141,6 +141,7 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
   extends  Logging {
   val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
   val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
+  val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
 
   def newBatch() {
     // raise error if the previous batch is not empty
@@ -149,6 +150,7 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
         "a new one. Some state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
     leaderAndIsrRequestMap.clear()
     stopReplicaRequestMap.clear()
+    stopAndDeleteReplicaRequestMap.clear()
   }
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr, replicationFactor: Int) {
@@ -160,10 +162,18 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
     }
   }
 
-  def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int) {
+  def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean) {
     brokerIds.foreach { brokerId =>
       stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[(String, Int)])
-      stopReplicaRequestMap(brokerId) :+ (topic, partition)
+      stopAndDeleteReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[(String, Int)])
+      if (deletePartition) {
+        val v = stopAndDeleteReplicaRequestMap(brokerId)
+        stopAndDeleteReplicaRequestMap(brokerId) = v :+ (topic, partition)
+      }
+      else {
+        val v = stopReplicaRequestMap(brokerId)
+        stopReplicaRequestMap(brokerId) = v :+ (topic, partition)
+      }
     }
   }
 
@@ -176,12 +186,19 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
       sendRequest(broker, leaderAndIsrRequest, null)
     }
     leaderAndIsrRequestMap.clear()
-    stopReplicaRequestMap.foreach { r =>
-      val broker = r._1
-      debug("The stop replica request sent to broker %d is %s".format(broker, r._2.mkString(",")))
-      sendRequest(broker, new StopReplicaRequest(Set.empty[(String, Int)] ++ r._2), null)
+    Seq((stopReplicaRequestMap, false), (stopAndDeleteReplicaRequestMap, true)) foreach {
+      case(m, deletePartitions) => {
+        m.foreach(r => {
+          val broker = r._1
+          if (r._2.size > 0) {
+            debug("The stop replica request (delete = %s) sent to broker %d is %s"
+                          .format(deletePartitions, broker, r._2.mkString(",")))
+            sendRequest(broker, new StopReplicaRequest(deletePartitions, Set.empty[(String, Int)] ++ r._2), null)
+          }
+        })
+        m.clear()
+      }
     }
-    stopReplicaRequestMap.clear()
   }
 }
 
diff --git core/src/main/scala/kafka/controller/KafkaController.scala core/src/main/scala/kafka/controller/KafkaController.scala
index 2022c9f..ffba168 100644
--- core/src/main/scala/kafka/controller/KafkaController.scala
+++ core/src/main/scala/kafka/controller/KafkaController.scala
@@ -31,21 +31,45 @@ import kafka.utils.{Utils, ZkUtils, Logging}
 import org.I0Itec.zkclient.exception.ZkNoNodeException
 import java.lang.{IllegalStateException, Object}
 import kafka.admin.PreferredReplicaLeaderElectionCommand
-import kafka.common.{TopicAndPartition, KafkaException}
+import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException}
 
 class ControllerContext(val zkClient: ZkClient,
                         var controllerChannelManager: ControllerChannelManager = null,
                         val controllerLock: Object = new Object,
-                        var liveBrokers: Set[Broker] = null,
-                        var liveBrokerIds: Set[Int] = null,
-                        var allTopics: Set[String] = null,
-                        var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = null,
-                        var allLeaders: mutable.Map[TopicAndPartition, Int] = null,
+                        var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
+                        var allTopics: Set[String] = Set.empty,
+                        var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty,
+                        var allLeaders: mutable.Map[TopicAndPartition, Int] = mutable.Map.empty,
                         var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
-                        new mutable.HashMap,
-                        var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet)
+                          new mutable.HashMap,
+                        var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] =
+                          new mutable.HashSet) {
 
-class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup {
+  private var liveBrokersUnderlying: Set[Broker] = Set.empty
+  private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
+
+  // setter
+  def liveBrokers_=(brokers: Set[Broker]) {
+    liveBrokersUnderlying = brokers.filter(broker => !shuttingDownBrokerIds.contains(broker.id))
+    liveBrokerIdsUnderlying = liveBrokersUnderlying.map(_.id)
+  }
+
+  // getter
+  def liveBrokers = liveBrokersUnderlying.filter(broker => !shuttingDownBrokerIds.contains(broker.id))
+  def liveBrokerIds = liveBrokerIdsUnderlying.filter(brokerId => !shuttingDownBrokerIds.contains(brokerId))
+
+  def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying ++ shuttingDownBrokerIds
+}
+
+trait KafkaControllerMBean {
+  def shutdownBroker(id: Int): Int
+}
+
+object KafkaController {
+  val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps"
+}
+
+class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
   val controllerContext = new ControllerContext(zkClient)
@@ -55,6 +79,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     config.brokerId)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
+  private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest)
 
   newGauge(
     "ActiveControllerCount",
@@ -63,6 +89,87 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     }
   )
 
+  def shutdownBroker(id: Int) = {
+
+    info("Shutting down broker " + id)
+
+    controllerContext.controllerLock synchronized {
+      if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
+        throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
+
+      controllerContext.shuttingDownBrokerIds.add(id)
+
+      debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
+      debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
+    }
+
+    val allPartitionsAndReplicationFactorOnBroker = controllerContext.controllerLock synchronized {
+      getPartitionsAssignedToBroker(zkClient, controllerContext.allTopics.toSeq, id).map {
+        case(topic, partition) =>
+          val topicAndPartition = TopicAndPartition(topic, partition)
+          (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size)
+      }
+    }
+
+    def replicatedPartitionsBrokerLeads = controllerContext.controllerLock.synchronized {
+      trace("All leaders = " + controllerContext.allLeaders.mkString(","))
+      controllerContext.allLeaders.filter {
+        case (topicAndPartition, leader) =>
+          leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
+      }.map(_._1)
+    }
+    debug("Partitions to move leadership from broker %d: %s".format(id, replicatedPartitionsBrokerLeads.mkString(",")))
+
+    val partitionsToMove = replicatedPartitionsBrokerLeads.toSet
+
+    partitionsToMove.foreach(topicAndPartition => {
+      val (topic, partition) = topicAndPartition.asTuple
+      // move leadership serially to relinquish lock.
+      controllerContext.controllerLock synchronized {
+        controllerContext.allLeaders.get(topicAndPartition).foreach(currLeader => {
+          if (currLeader == id) {
+            partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
+                                                     controlledShutdownPartitionLeaderSelector)
+            val newLeader = controllerContext.allLeaders(topicAndPartition)
+
+            // mark replica offline only if leadership was moved successfully
+            if (newLeader != currLeader)
+              replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica)
+          }
+          else
+            debug("Partition %s moved from leader %d to new leader %d during shutdown."
+                  .format(topicAndPartition, id, currLeader))
+        })
+      }
+    })
+
+    val partitionsRemaining = replicatedPartitionsBrokerLeads.toSet
+    debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))
+
+    /*
+     * Force the shutting down broker out of the ISR of partitions that it
+     * follows, and shutdown the corresponding replica fetcher threads.
+     * This is really an optimization, so no need to register any callback
+     * to wait until completion.
+     */
+    brokerRequestBatch.newBatch()
+    allPartitionsAndReplicationFactorOnBroker.filter(e => !partitionsRemaining.contains(e._1)) foreach {
+      case(topicAndPartition, replicationFactor) =>
+        val (topic, partition) = topicAndPartition.asTuple
+        brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
+        maybeRemoveReplicaFromIsr(topic, partition, id) match {
+          case Some(updatedLeaderAndIsr) =>
+            brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
+              Seq(updatedLeaderAndIsr.leader), topic, partition, updatedLeaderAndIsr, replicationFactor)
+          case None =>
+          // ignore
+        }
+    }
+    brokerRequestBatch.sendRequestsToBrokers()
+
+    partitionsRemaining.size
+  }
+
   /**
    * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
    * It does the following things on the become-controller state change -
@@ -84,7 +191,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       partitionStateMachine.startup()
       replicaStateMachine.startup()
       info("Broker %d is ready to serve as the new controller".format(config.brokerId))
-    }else
+    }
+    else
       info("Controller has been shut down, aborting startup/failover")
   }
 
@@ -105,6 +213,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    */
   def onBrokerStartup(newBrokers: Seq[Int]) {
     info("New broker startup callback for %s".format(newBrokers.mkString(",")))
+
     // update leader and isr cache for broker
     updateLeaderAndIsrCache()
     // update partition state machine
@@ -128,6 +237,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    */
   def onBrokerFailure(deadBrokers: Seq[Int]) {
     info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
+
+    val deadBrokersThatWereShuttingDown =
+      deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
+    info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
+
     // update leader and isr cache for broker
     updateLeaderAndIsrCache()
     // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
@@ -228,6 +342,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       registerSessionExpirationListener()
       isRunning = true
       controllerElector.startup
+      Utils.registerMBean(this, KafkaController.MBeanName)
       info("Controller startup complete")
     }
   }
@@ -260,7 +375,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
 
   private def initializeControllerContext() {
     controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
-    controllerContext.liveBrokerIds = controllerContext.liveBrokers.map(_.id)
     controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
     controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,
       controllerContext.allTopics.toSeq)
@@ -270,6 +384,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     // start the channel manager
     startChannelManager()
     info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
+    info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds))
     info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
     initializeAndMaybeTriggerPartitionReassignment()
     initializeAndMaybeTriggerPreferredReplicaElection()
@@ -341,7 +456,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
       // move the leader to one of the alive and caught up new replicas
       partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
-    }else {
+    }
+    else {
       // check if the leader is alive or not
       controllerContext.liveBrokerIds.contains(currentLeader) match {
         case true =>
@@ -440,7 +556,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
       if(currentLeader == preferredReplica) {
         info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
-      }else {
+      }
+      else {
         warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
       }
     }
@@ -455,6 +572,47 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     }.flatten
   }
 
+  /**
+   * Removes a given partition replica from the ISR; if it is not the current
+   * leader and there are sufficient remaining replicas in ISR.
+   * @param topic topic
+   * @param partition partition
+   * @param replicaId replica Id
+   * @return the new leaderAndIsr with the replica removed if it was present,
+   *         or None if leaderAndIsr is empty.
+   */
+  def maybeRemoveReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderAndIsr] = {
+    var finalLeaderAndIsr: Option[LeaderAndIsr] = None
+    var zkWriteCompleteOrUnnecessary = false
+    while (!zkWriteCompleteOrUnnecessary) {
+      // refresh leader and isr from zookeeper again
+      val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
+      zkWriteCompleteOrUnnecessary = leaderAndIsrOpt match {
+        case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes
+          if (leaderAndIsr.leader != replicaId && leaderAndIsr.isr.size > 1 && leaderAndIsr.isr.contains(replicaId)) {
+            val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
+                                               leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
+            info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
+            // update the new leadership decision in zookeeper or retry
+            val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
+              zkClient,
+              ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(),
+              leaderAndIsr.zkVersion)
+            newLeaderAndIsr.zkVersion = newVersion
+            finalLeaderAndIsr = Some(newLeaderAndIsr)
+            updateSucceeded
+          }
+          else {
+            finalLeaderAndIsr = Some(leaderAndIsr)
+            true
+          }
+        case None =>
+          true
+      }
+    }
+    finalLeaderAndIsr
+  }
+
   class SessionExpirationListener() extends IZkStateListener with Logging {
     this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
     @throws(classOf[Exception])
@@ -523,14 +681,16 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
                 throw new KafkaException("Partition %s to be reassigned is already assigned to replicas"
                   .format(topicAndPartition) +
                   " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
-              }else {
+              }
+              else {
                 if(aliveNewReplicas == newReplicas) {
                   info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition,
                     newReplicas.mkString(",")))
                   val context = createReassignmentContextForPartition(topic, partition, newReplicas)
                   controllerContext.partitionsBeingReassigned.put(topicAndPartition, context)
                   controller.onPartitionReassignment(topicAndPartition, context)
-                }else {
+                }
+                else {
                   // some replica in RAR is not alive. Fail partition reassignment
                   throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
                     " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
@@ -609,7 +769,8 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
                     .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
                     "Resuming partition reassignment")
                   controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
-                }else {
+                }
+                else {
                   info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
                     .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
                     "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
diff --git core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index cb19edd..a5213fb 100644
--- core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -25,10 +25,9 @@ trait PartitionLeaderSelector {
   /**
    * @param topic                      The topic of the partition whose leader needs to be elected
    * @param partition                  The partition whose leader needs to be elected
-   * @param assignedReplicas           The list of replicas assigned to the input partition
    * @param currentLeaderAndIsr        The current leader and isr of input partition read from zookeeper
    * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
-   * @returns The leader and isr request, with the newly selected leader info, to send to the brokers
+   * @return The leader and isr request, with the newly selected leader info, to send to the brokers
    * Also, returns the list of replicas the returned leader and isr request should be sent to
    * This API selects a new leader for the input partition
    */
@@ -146,4 +145,40 @@ with Logging {
       }
     }
   }
-}
\ No newline at end of file
+}
+
+/**
+ * Picks one of the alive replicas (other than the current leader) in ISR as
+ * new leader, fails if there are no other replicas in ISR.
+ */
+class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
+        extends PartitionLeaderSelector
+        with Logging {
+
+  this.logIdent = "[ControlledShutdownLeaderSelector]: "
+
+  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
+    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
+
+    val currentLeader = currentLeaderAndIsr.leader
+
+    val assignedReplicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
+    val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
+    val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
+
+    val newIsr = currentLeaderAndIsr.isr.filter(brokerId => brokerId != currentLeader &&
+                                                            !controllerContext.shuttingDownBrokerIds.contains(brokerId))
+    val newLeaderOpt = newIsr.headOption
+    newLeaderOpt match {
+      case Some(newLeader) =>
+        debug("Partition %s-%d : current leader = %d, new leader = %d".format(topic, partition, currentLeader, newLeader))
+        (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
+         liveAssignedReplicas)
+      case None =>
+        throw new StateChangeFailedException("No other replicas in ISR for %s-%s.".format(topic, partition))
+    }
+  }
+
+}
+
diff --git core/src/main/scala/kafka/controller/PartitionStateMachine.scala core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index cd1becd..e813da7 100644
--- core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -121,8 +121,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
                                 leaderSelector: PartitionLeaderSelector) {
     val topicAndPartition = TopicAndPartition(topic, partition)
+    val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
     try {
-      partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
       targetState match {
         case NewPartition =>
           // pre: partition did not exist before this
@@ -163,8 +163,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           // post: partition state is deleted from all brokers and zookeeper
       }
     }catch {
-      case e => error("State change for partition [%s, %d] ".format(topic, partition) +
-        "from %s to %s failed".format(partitionState(topicAndPartition), targetState), e)
+      case t: Throwable => error("State change for partition [%s, %d] ".format(topic, partition) +
+        "from %s to %s failed".format(currState, targetState), t)
     }
   }
 
@@ -203,8 +203,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   /**
    * Invoked on the NonExistentPartition->NewPartition state transition to update the controller's cache with the
    * partition's replica assignment.
-   * @topic     The topic of the partition whose replica assignment is to be cached
-   * @partition The partition whose replica assignment is to be cached
+   * @param topic     The topic of the partition whose replica assignment is to be cached
+   * @param partition The partition whose replica assignment is to be cached
    */
   private def assignReplicasToPartitions(topic: String, partition: Int) {
     val assignedReplicas = ZkUtils.getReplicasForPartition(controllerContext.zkClient, topic, partition)
@@ -216,10 +216,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * a leader and isr path in zookeeper. Once the partition moves to the OnlinePartition state, it's leader and isr
    * path gets initialized and it never goes back to the NewPartition state. From here, it can only go to the
    * OfflinePartition state.
-   * @topic               The topic of the partition whose leader and isr path is to be initialized
-   * @partition           The partition whose leader and isr path is to be initialized
-   * @brokerRequestBatch  The object that holds the leader and isr requests to be sent to each broker as a result of
-   *                      this state change
+   * @param topicAndPartition   The topic/partition whose leader and isr path is to be initialized
    */
   private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
     debug("Initializing leader and isr for partition %s".format(topicAndPartition))
@@ -258,10 +255,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   /**
    * Invoked on the OfflinePartition->OnlinePartition state change. It invokes the leader election API to elect a leader
    * for the input offline partition
-   * @topic               The topic of the offline partition
-   * @partition           The offline partition
-   * @brokerRequestBatch  The object that holds the leader and isr requests to be sent to each broker as a result of
-   *                      this state change
+   * @param topic               The topic of the offline partition
+   * @param partition           The offline partition
+   * @param leaderSelector      Specific leader selector (e.g., offline/reassigned/etc.)
    */
   def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
     /** handle leader election for the partitions whose leader is no longer alive **/
@@ -291,8 +287,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     }catch {
       case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
         .format(topic, partition) + " Marking this partition offline", poe)
-      case sce => throw new StateChangeFailedException(("Error while electing leader for partition" +
-        " [%s, %d]").format(topic, partition), sce)
+      case sce => throw new StateChangeFailedException(("Error while electing leader for partition " +
+        " [%s, %d] due to: %s.").format(topic, partition, sce.getMessage), sce)
     }
     debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
   }
diff --git core/src/main/scala/kafka/controller/ReplicaStateMachine.scala core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index f0ccb89..2b4a950 100644
--- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -75,7 +75,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
 
   /**
    * This API is invoked by the broker change controller callbacks and the startup API of the state machine
-   * @param brokerIds    The list of brokers that need to be transitioned to the target state
+   * @param replicas     The list of replicas (brokers) that need to be transitioned to the target state
    * @param targetState  The state that the replicas should be moved to
    * The controller's allLeaders cache should have been updated before this
    */
@@ -122,7 +122,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
         case NonExistentReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
           // send stop replica command
-          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition)
+          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true)
           // remove this replica from the assigned replicas list for its partition
           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
           controllerContext.partitionReplicaAssignment.put(topicAndPartition,
@@ -159,34 +159,24 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
         case OfflineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
           // As an optimization, the controller removes dead replicas from the ISR
-          var zookeeperPathUpdateSucceeded: Boolean = false
-          var newLeaderAndIsr: LeaderAndIsr = null
-          while(!zookeeperPathUpdateSucceeded) {
-            // refresh leader and isr from zookeeper again
-            val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
-            leaderAndIsrOpt match {
-              case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes
-                newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
-                  leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
-                info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
-                // update the new leadership decision in zookeeper or retry
-                val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-                  ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(),
-                  leaderAndIsr.zkVersion)
-                newLeaderAndIsr.zkVersion = newVersion
-                zookeeperPathUpdateSucceeded = updateSucceeded
-              case None => throw new StateChangeFailedException("Failed to change state of replica %d".format(replicaId) +
-                " for partition [%s, %d] since the leader and isr path in zookeeper is empty".format(topic, partition))
-            }
+          controller.maybeRemoveReplicaFromIsr(topic, partition, replicaId) match {
+            case Some(updatedLeaderAndIsr) =>
+              // send the shrunk ISR state change request only to the leader
+              brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderAndIsr.leader),
+                                                                  topic, partition, updatedLeaderAndIsr,
+                                                                  replicaAssignment.size)
+              // update the local leader and isr cache
+              controllerContext.allLeaders.put(topicAndPartition, updatedLeaderAndIsr.leader)
+              replicaState.put((topic, partition, replicaId), OfflineReplica)
+              info("Replica %d for partition [%s, %d] state changed to OfflineReplica"
+                   .format(replicaId, topic, partition))
+              info("Removed offline replica %d from ISR for partition [%s, %d]"
+                   .format(replicaId, topic, partition))
+            case None =>
+              throw new StateChangeFailedException(
+                "Failed to change state of replica %d for partition [%s, %d] since the leader and isr path in zookeeper is empty"
+                .format(replicaId, topic, partition))
           }
-          // send the shrunk ISR state change request only to the leader
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(newLeaderAndIsr.leader),
-                                                              topic, partition, newLeaderAndIsr, replicaAssignment.size)
-          // update the local leader and isr cache
-          controllerContext.allLeaders.put(topicAndPartition, newLeaderAndIsr.leader)
-          replicaState.put((topic, partition, replicaId), OfflineReplica)
-          info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition))
-          info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition))
       }
     }catch {
       case e => error("Error while changing state of replica %d for partition ".format(replicaId) +
@@ -239,12 +229,11 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           controllerContext.controllerLock synchronized {
             try {
               val curBrokerIds = currentBrokerList.map(_.toInt).toSet
-              val newBrokerIds = curBrokerIds -- controllerContext.liveBrokerIds
+              val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
               val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
-              val deadBrokerIds = controllerContext.liveBrokerIds -- curBrokerIds
+              val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
               controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
-              controllerContext.liveBrokerIds = controllerContext.liveBrokers.map(_.id)
-              info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s"
+              info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
                 .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
               newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
               deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
diff --git core/src/main/scala/kafka/server/AbstractFetcherManager.scala core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index c956a02..66728e3 100644
--- core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -25,7 +25,7 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
     // map of (source brokerid, fetcher Id per source broker) => fetcher
   private val fetcherThreadMap = new mutable.HashMap[(Broker, Int), AbstractFetcherThread]
   private val mapLock = new Object
-  this.logIdent = "[" + name + "], "
+  this.logIdent = "[" + name + "] "
 
   private def getFetcherId(topic: String, partitionId: Int) : Int = {
     (topic.hashCode() + 31 * partitionId) % numFetchers
diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala
index 4a868d2..3baf27f 100644
--- core/src/main/scala/kafka/server/KafkaApis.scala
+++ core/src/main/scala/kafka/server/KafkaApis.scala
@@ -88,11 +88,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     trace("Handling stop replica request " + stopReplicaRequest)
 
     val responseMap = new HashMap[(String, Int), Short]
-
     for((topic, partitionId) <- stopReplicaRequest.partitions) {
-      val errorCode = replicaManager.stopReplica(topic, partitionId)
+      val errorCode = replicaManager.stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
       responseMap.put((topic, partitionId), errorCode)
     }
+
     val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
   }
@@ -455,11 +455,18 @@ class KafkaApis(val requestChannel: RequestChannel,
      * When a request expires just answer it with whatever data is present
      */
     def expire(delayed: DelayedFetch) {
-      val topicData = readMessageSets(delayed.fetch)
-      val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
-      val fromFollower = delayed.fetch.isFromFollower
-      delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
-      requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
+      debug("Expiring fetch request %s.".format(delayed.fetch))
+      try {
+        val topicData = readMessageSets(delayed.fetch)
+        val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
+        val fromFollower = delayed.fetch.isFromFollower
+        delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
+        requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
+      }
+      catch {
+        case e: LeaderNotAvailableException =>
+          debug("Leader changed before fetch request %s expired.".format(delayed.fetch))
+      }
     }
   }
 
diff --git core/src/main/scala/kafka/server/ReplicaFetcherManager.scala core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 69db208..9f696dd 100644
--- core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -20,7 +20,7 @@ package kafka.server
 import kafka.cluster.Broker
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
-        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) {
+        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
diff --git core/src/main/scala/kafka/server/ReplicaManager.scala core/src/main/scala/kafka/server/ReplicaManager.scala
index 8461dbe..11d08f4 100644
--- core/src/main/scala/kafka/server/ReplicaManager.scala
+++ core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -91,14 +91,15 @@ class ReplicaManager(val config: KafkaConfig,
     kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
   }
 
-  def stopReplica(topic: String, partitionId: Int): Short  = {
+  def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {
     trace("Handling stop replica for partition [%s, %d]".format(topic, partitionId))
     val errorCode = ErrorMapping.NoError
     getReplica(topic, partitionId) match {
       case Some(replica) =>
         replicaFetcherManager.removeFetcher(topic, partitionId)
         /* TODO: handle deleteLog in a better way */
-        //logManager.deleteLog(topic, partition)
+        //if (deletePartition)
+        //  logManager.deleteLog(topic, partition)
         leaderPartitionsLock synchronized {
           leaderPartitions -= replica.partition
         }
diff --git core/src/main/scala/kafka/utils/ZkUtils.scala core/src/main/scala/kafka/utils/ZkUtils.scala
index 114bc98..c87ea69 100644
--- core/src/main/scala/kafka/utils/ZkUtils.scala
+++ core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -650,7 +650,7 @@ object ZkUtils extends Logging {
    * or throws an exception if the broker dies before the query to zookeeper finishes
    * @param brokerId The broker id
    * @param zkClient The zookeeper client connection
-   * @returns An optional Broker object encapsulating the broker metadata
+   * @return An optional Broker object encapsulating the broker metadata
    */
   def getBrokerInfo(zkClient: ZkClient, brokerId: Int): Option[Broker] = {
     ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
diff --git core/src/test/scala/unit/kafka/admin/AdminTest.scala core/src/test/scala/unit/kafka/admin/AdminTest.scala
index e5cfc33..69973b8 100644
--- core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -22,7 +22,8 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{ZkUtils, TestUtils}
-import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -357,6 +358,53 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
     servers.foreach(_.shutdown())
   }
 
+  @Test
+  def testShutdownBroker() {
+    val expectedReplicaAssignment = Map(1  -> List("0", "1", "2"))
+    val topic = "test"
+    val partition = 1
+    // create brokers
+    val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
+    // create the topic
+    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
+
+    // broker 2 should be the leader since it was started first
+    var leaderBeforeShutdown = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get
+    var controllerId = ZkUtils.getController(zkClient)
+    var controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+    var partitionsRemaining = controller.shutdownBroker(2)
+    assertEquals(0, partitionsRemaining)
+    var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+    var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+    assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
+    assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
+
+    leaderBeforeShutdown = leaderAfterShutdown
+    controllerId = ZkUtils.getController(zkClient)
+    controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+    partitionsRemaining = controller.shutdownBroker(1)
+    assertEquals(0, partitionsRemaining)
+    topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+    leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+    assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
+    assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
+
+    leaderBeforeShutdown = leaderAfterShutdown
+    controllerId = ZkUtils.getController(zkClient)
+    controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+    partitionsRemaining = controller.shutdownBroker(0)
+    assertEquals(1, partitionsRemaining)
+    topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+    leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+    assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
+    assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
+
+    servers.foreach(_.shutdown())
+
+
+  }
+
   private def checkIfReassignPartitionPathExists(): Boolean = {
     ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
   }
diff --git core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index c50f91a..9d212bf 100644
--- core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -97,7 +97,7 @@ object SerializationTestUtils{
   }
 
   def createTestStopReplicaRequest() : StopReplicaRequest = {
-    new StopReplicaRequest(collection.immutable.Set((topic1, 0), (topic2, 0)))
+    new StopReplicaRequest(deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
   }
 
   def createTestStopReplicaResponse() : StopReplicaResponse = {
