Index: core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala	(revision 1403469)
+++ core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala	(working copy)
@@ -87,7 +87,7 @@
     val leaderAndIsr2 = new LeaderAndIsr(leader2, 1, isr2, 2)
     val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
                   ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
-    new LeaderAndIsrRequest(map)
+    new LeaderAndIsrRequest(map, collection.immutable.Set[Broker]())
   }
 
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
Index: core/src/main/scala/kafka/cluster/Partition.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Partition.scala	(revision 1403469)
+++ core/src/main/scala/kafka/cluster/Partition.scala	(working copy)
@@ -112,73 +112,72 @@
 
 
   /**
-   *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader.
+   *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the leader in the following steps.
+   *  1. stop the existing replica fetcher
+   *  2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in ISR to be available)
+   *  3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
+   *  4. set the new leader and ISR
    */
-  def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, isMakingLeader: Boolean): Boolean = {
+  def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr): Boolean = {
     leaderIsrUpdateLock synchronized {
       if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become %s request"
-          .format(leaderEpoch, leaderAndIsr.leaderEpoch, if(isMakingLeader) "leader" else "follower"))
+        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become leader request"
+          .format(leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
-      if(isMakingLeader)
-        makeLeader(topic, partitionId, leaderAndIsr)
-      else
-        makeFollower(topic, partitionId, leaderAndIsr)
+      trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
+      // stop replica fetcher thread, if any
+      replicaFetcherManager.removeFetcher(topic, partitionId)
+
+      val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
+      // reset LogEndOffset for remote replicas
+      assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
+      inSyncReplicas = newInSyncReplicas
+      leaderEpoch = leaderAndIsr.leaderEpoch
+      zkVersion = leaderAndIsr.zkVersion
+      leaderReplicaIdOpt = Some(localBrokerId)
+      // we may need to increment high watermark since ISR could be down to 1
+      maybeIncrementLeaderHW(getReplica().get)
       true
     }
   }
 
   /**
-   *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the leader in the following steps.
-   *  1. stop the existing replica fetcher
-   *  2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in ISR to be available)
-   *  3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
-   *  4. set the new leader and ISR
-   */
-  private def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr) {
-    trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
-    // stop replica fetcher thread, if any
-    replicaFetcherManager.removeFetcher(topic, partitionId)
-
-    val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
-    // reset LogEndOffset for remote replicas
-    assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
-    inSyncReplicas = newInSyncReplicas
-    leaderEpoch = leaderAndIsr.leaderEpoch
-    zkVersion = leaderAndIsr.zkVersion
-    leaderReplicaIdOpt = Some(localBrokerId)
-    // we may need to increment high watermark since ISR could be down to 1
-    maybeIncrementLeaderHW(getReplica().get)
-  }
-
-  /**
+   *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the follower in the following steps.
    *  1. stop any existing fetcher on this partition from the local replica
    *  2. make sure local replica exists and truncate the log to high watermark
    *  3. set the leader and set ISR to empty
    *  4. start a fetcher to the new leader
    */
-  private def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr) = {
-    trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
-    val newLeaderBrokerId: Int = leaderAndIsr.leader
-    info("Starting the follower state transition to follow leader %d for topic %s partition %d"
-      .format(newLeaderBrokerId, topic, partitionId))
-    ZkUtils.getBrokerInfo(zkClient, newLeaderBrokerId) match {
-      case Some(leaderBroker) =>
-        // stop fetcher thread to previous leader
-        replicaFetcherManager.removeFetcher(topic, partitionId)
-        // make sure local replica exists
-        val localReplica = getOrCreateReplica()
-        localReplica.log.get.truncateTo(localReplica.highWatermark)
-        inSyncReplicas = Set.empty[Replica]
-        leaderEpoch = leaderAndIsr.leaderEpoch
-        zkVersion = leaderAndIsr.zkVersion
-        leaderReplicaIdOpt = Some(newLeaderBrokerId)
-        // start fetcher thread to current leader
-        replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
-      case None => // leader went down
-        warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) +
-        " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId))
+  def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, liveBrokers: Set[Broker]): Boolean = {
+    leaderIsrUpdateLock synchronized {
+      if (leaderEpoch >= leaderAndIsr.leaderEpoch){
+        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follwer request"
+          .format(leaderEpoch, leaderAndIsr.leaderEpoch))
+        return false
+      }
+      trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
+      val newLeaderBrokerId: Int = leaderAndIsr.leader
+      info("Starting the follower state transition to follow leader %d for topic %s partition %d"
+        .format(newLeaderBrokerId, topic, partitionId))
+      liveBrokers.find(_.id == newLeaderBrokerId) match {
+        case Some(leaderBroker) =>
+          // stop fetcher thread to previous leader
+          replicaFetcherManager.removeFetcher(topic, partitionId)
+          // make sure local replica exists
+          val localReplica = getOrCreateReplica()
+          localReplica.log.get.truncateTo(localReplica.highWatermark)
+          inSyncReplicas = Set.empty[Replica]
+          leaderEpoch = leaderAndIsr.leaderEpoch
+          zkVersion = leaderAndIsr.zkVersion
+          leaderReplicaIdOpt = Some(newLeaderBrokerId)
+          // start fetcher thread to current leader
+          replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
+        case None => // leader went down
+          warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) +
+          " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId))
+      }
+      true
     }
   }
 
@@ -325,4 +324,4 @@
     partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
     partitionString.toString()
   }
-}
\ No newline at end of file
+}
Index: core/src/main/scala/kafka/controller/PartitionStateMachine.scala
===================================================================
--- core/src/main/scala/kafka/controller/PartitionStateMachine.scala	(revision 1403469)
+++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala	(working copy)
@@ -86,7 +86,7 @@
         partitionAndState => handleStateChange(partitionAndState._1.topic, partitionAndState._1.partition, OnlinePartition,
                                                offlinePartitionSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers()
+      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some partitions to the online state", e)
     }
@@ -105,7 +105,7 @@
       partitions.foreach { topicAndPartition =>
         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers()
+      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some partitions to %s state".format(targetState), e)
     }
Index: core/src/main/scala/kafka/controller/KafkaController.scala
===================================================================
--- core/src/main/scala/kafka/controller/KafkaController.scala	(revision 1403469)
+++ core/src/main/scala/kafka/controller/KafkaController.scala	(working copy)
@@ -177,7 +177,7 @@
             }
           }
       }
-      brokerRequestBatch.sendRequestsToBrokers()
+      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
 
       val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
       debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))
Index: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
===================================================================
--- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(revision 1403469)
+++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(working copy)
@@ -83,7 +83,7 @@
     try {
       brokerRequestBatch.newBatch()
       replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
-      brokerRequestBatch.sendRequestsToBrokers()
+      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some replicas to %s state".format(targetState), e)
     }
Index: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
===================================================================
--- core/src/main/scala/kafka/controller/ControllerChannelManager.scala	(revision 1403469)
+++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala	(working copy)
@@ -183,11 +183,13 @@
     }
   }
 
-  def sendRequestsToBrokers() {
+  def sendRequestsToBrokers(liveBrokers: Set[Broker]) {
     leaderAndIsrRequestMap.foreach { m =>
       val broker = m._1
       val partitionStateInfos = m._2
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos)
+      val leaderIds = partitionStateInfos.map(_._2.leaderAndIsr.leader).toSeq
+      val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders)
       debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
       sendRequest(broker, leaderAndIsrRequest, null)
     }
Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(revision 1403469)
+++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(working copy)
@@ -24,7 +24,6 @@
 import java.io.{InputStream, ByteArrayOutputStream, DataOutputStream}
 import java.util.concurrent.atomic.AtomicLong
 import kafka.utils.IteratorTemplate
-import kafka.common.InvalidMessageSizeException
 
 object ByteBufferMessageSet {
   
Index: core/src/main/scala/kafka/server/ReplicaManager.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaManager.scala	(revision 1403469)
+++ core/src/main/scala/kafka/server/ReplicaManager.scala	(working copy)
@@ -16,7 +16,7 @@
  */
 package kafka.server
 
-import kafka.cluster.{Partition, Replica}
+import kafka.cluster.{Broker, Partition, Replica}
 import collection._
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.atomic.AtomicBoolean
@@ -173,7 +173,7 @@
         if(requestedLeaderId == config.brokerId)
           makeLeader(topic, partitionId, partitionStateInfo)
         else
-          makeFollower(topic, partitionId, partitionStateInfo)
+          makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
       } catch {
         case e =>
           error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
@@ -201,7 +201,7 @@
     val leaderAndIsr = partitionStateInfo.leaderAndIsr
     info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, true)) {
+    if (partition.makeLeader(topic, partitionId, leaderAndIsr)) {
       // also add this partition to the list of partitions for which the leader is the current broker
       leaderPartitionsLock synchronized {
         leaderPartitions += partition
@@ -210,14 +210,14 @@
     info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId))
   }
 
-  private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) {
+  private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo, liveBrokers: Set[Broker]) {
     val leaderAndIsr = partitionStateInfo.leaderAndIsr
     val leaderBrokerId: Int = leaderAndIsr.leader
     info("Starting the follower state transition to follow leader %d for topic %s partition %d"
                  .format(leaderBrokerId, topic, partitionId))
 
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, false)) {
+    if (partition.makeFollower(topic, partitionId, leaderAndIsr, liveBrokers)) {
       // remove this replica's partition from the ISR expiration queue
       leaderPartitionsLock synchronized {
         leaderPartitions -= partition
Index: core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala	(revision 1403469)
+++ core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala	(working copy)
@@ -23,6 +23,7 @@
 import kafka.api.ApiUtils._
 import collection.mutable.Map
 import collection.mutable.HashMap
+import kafka.cluster.Broker
 
 
 object LeaderAndIsr {
@@ -92,7 +93,13 @@
 
       partitionStateInfos.put((topic, partition), partitionStateInfo)
     }
-    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos)
+
+    val leadersCount = buffer.getInt
+    var leaders = Set[Broker]()
+    for (i <- 0 until leadersCount)
+      leaders += Broker.readFrom(buffer)
+
+    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos, leaders)
   }
 }
 
@@ -100,11 +107,12 @@
 case class LeaderAndIsrRequest (versionId: Short,
                                 clientId: String,
                                 ackTimeoutMs: Int,
-                                partitionStateInfos: Map[(String, Int), PartitionStateInfo])
+                                partitionStateInfos: Map[(String, Int), PartitionStateInfo],
+                                leaders: Set[Broker])
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo]) = {
-    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, partitionStateInfos)
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker]) = {
+    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, partitionStateInfos, liveBrokers)
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -117,12 +125,17 @@
       buffer.putInt(key._2)
       value.writeTo(buffer)
     }
+    buffer.putInt(leaders.size)
+    leaders.foreach(_.writeTo(buffer))
   }
 
   def sizeInBytes(): Int = {
     var size = 1 + 2 + (2 + clientId.length) + 4 + 4
     for((key, value) <- partitionStateInfos)
       size += (2 + key._1.length) + 4 + value.sizeInBytes
+    size += 4
+    for(broker <- leaders)
+      size += broker.sizeInBytes
     size
   }
 }
\ No newline at end of file
