Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala	(revision 1405048)
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala	(working copy)
@@ -372,7 +372,9 @@
     new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*))
   }
 
-  def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) {
+  def makeLeaderForPartition(zkClient: ZkClient, topic: String,
+                             leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int],
+                             controllerEpoch: Int) {
     leaderPerPartitionMap.foreach
     {
       leaderForPartition => {
@@ -390,7 +392,7 @@
             newLeaderAndIsr.zkVersion += 1
           }
           ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-            newLeaderAndIsr.toString)
+            ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
         } catch {
           case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe)
         }
Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(revision 1405048)
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(working copy)
@@ -69,7 +69,7 @@
     val leaderForPartitionMap = Map(
       0 -> configs.head.brokerId
     )
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
     val topicMetadata = mockLogManagerAndTestTopic(topic)
     assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
     assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
Index: core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala	(revision 1405048)
+++ core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala	(working copy)
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package kafka.zk
 
@@ -35,7 +35,7 @@
     try {
       ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
     } catch {                       
-      case e: Exception => println("Exception in creating ephemeral node")
+      case e: Exception =>
     }
 
     var testData: String = null
Index: core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(revision 1405048)
+++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(working copy)
@@ -23,6 +23,10 @@
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
 import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.controller.ControllerChannelManager
+import kafka.cluster.Broker
+import kafka.common.ErrorMapping
+import kafka.api._
 
 class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   val brokerId1 = 0
@@ -35,6 +39,8 @@
   val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
+  var staleControllerEpochDetected = false
+
   override def setUp() {
     super.setUp()
     // start both servers
@@ -95,4 +101,48 @@
     else
       assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
   }
+
+  def testLeaderElectionWithStaleControllerEpoch() {
+    // start 2 brokers
+    val topic = "new-topic"
+    val partitionId = 0
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+
+    // wait until leader is elected
+    val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
+    val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+    debug("leader Epoc: " + leaderEpoch1)
+    debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
+    assertTrue("Leader should get elected", leader1.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
+    assertEquals("First epoch value should be 0", 0, leaderEpoch1)
+
+    // start another controller
+    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort()))
+    val brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, "localhost", s.config.port))
+    val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig)
+    controllerChannelManager.startup()
+    val staleControllerEpoch = 0
+    val leaderAndIsr = new collection.mutable.HashMap[(String, Int), LeaderAndIsr]
+    leaderAndIsr.put((topic, partitionId), new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)))
+    val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 2)).toMap
+    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, staleControllerEpoch)
+
+    controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
+    TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)
+    assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)
+
+    controllerChannelManager.shutdown()
+  }
+
+  private def staleControllerEpochCallback(response: RequestOrResponse): Unit = {
+    val leaderAndIsrResponse = response.asInstanceOf[LeaderAndIsrResponse]
+    staleControllerEpochDetected = leaderAndIsrResponse.errorCode match {
+      case ErrorMapping.StaleControllerEpochCode => true
+      case _ => false
+    }
+  }
 }
\ No newline at end of file
Index: core/src/test/scala/unit/kafka/admin/AdminTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/admin/AdminTest.scala	(revision 1405048)
+++ core/src/test/scala/unit/kafka/admin/AdminTest.scala	(working copy)
@@ -159,7 +159,7 @@
     // create the topic
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
     val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas)
     val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
     assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
@@ -189,7 +189,7 @@
     TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
 
     val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
     newTopicMetadata.errorCode match {
Index: core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala	(revision 1405048)
+++ core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala	(working copy)
@@ -87,7 +87,7 @@
     val leaderAndIsr2 = new LeaderAndIsr(leader2, 1, isr2, 2)
     val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
                   ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
-    new LeaderAndIsrRequest(map, collection.immutable.Set[Broker]())
+    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1)
   }
 
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
@@ -97,13 +97,13 @@
   }
 
   def createTestStopReplicaRequest() : StopReplicaRequest = {
-    new StopReplicaRequest(deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
+    new StopReplicaRequest(controllerEpoch = 1, deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
   }
 
   def createTestStopReplicaResponse() : StopReplicaResponse = {
     val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
                           ((topic2, 0), ErrorMapping.NoError))
-    new StopReplicaResponse(1, responseMap)
+    new StopReplicaResponse(1, responseMap.toMap)
   }
 
   def createTestProducerRequest: ProducerRequest = {
Index: core/src/main/scala/kafka/cluster/Partition.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Partition.scala	(revision 1405048)
+++ core/src/main/scala/kafka/cluster/Partition.scala	(working copy)
@@ -24,6 +24,7 @@
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
 import kafka.common.ErrorMapping
+import kafka.controller.KafkaController
 
 
 /**
@@ -45,6 +46,8 @@
   private val leaderIsrUpdateLock = new Object
   private var zkVersion: Int = LeaderAndIsr.initialZKVersion
   private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
+  /* epoch of the controller that last changed the leader */
+  private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId)
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
@@ -118,7 +121,7 @@
    *  3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
    *  4. set the new leader and ISR
    */
-  def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr): Boolean = {
+  def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, cEpoch: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
       if (leaderEpoch >= leaderAndIsr.leaderEpoch){
         info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become leader request"
@@ -126,6 +129,9 @@
         return false
       }
       trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
+      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
+      // to maintain the decision maker controller's epoch in the zookeeper path
+      controllerEpoch = cEpoch
       // stop replica fetcher thread, if any
       replicaFetcherManager.removeFetcher(topic, partitionId)
 
@@ -149,14 +155,18 @@
    *  3. set the leader and set ISR to empty
    *  4. start a fetcher to the new leader
    */
-  def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, liveBrokers: Set[Broker]): Boolean = {
+  def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, liveBrokers: Set[Broker],
+                   cEpoch: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
       if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follwer request"
+        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follower request"
           .format(leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
       trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
+      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
+      // to maintain the decision maker controller's epoch in the zookeeper path
+      controllerEpoch = cEpoch
       val newLeaderBrokerId: Int = leaderAndIsr.leader
       info("Starting the follower state transition to follow leader %d for topic %s partition %d"
         .format(newLeaderBrokerId, topic, partitionId))
@@ -291,8 +301,10 @@
   private def updateIsr(newIsr: Set[Replica]) {
     info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(", ")))
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
+    // use the epoch of the controller that made the leadership decision, instead of the current controller epoch
     val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndIsr.toString(), zkVersion)
+      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
+      ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
     if (updateSucceeded){
       inSyncReplicas = newIsr
       zkVersion = newVersion
Index: core/src/main/scala/kafka/controller/PartitionStateMachine.scala
===================================================================
--- core/src/main/scala/kafka/controller/PartitionStateMachine.scala	(revision 1405048)
+++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala	(working copy)
@@ -86,7 +86,7 @@
         partitionAndState => handleStateChange(partitionAndState._1.topic, partitionAndState._1.partition, OnlinePartition,
                                                offlinePartitionSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some partitions to the online state", e)
     }
@@ -105,7 +105,7 @@
       partitions.foreach { topicAndPartition =>
         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some partitions to %s state".format(targetState), e)
     }
@@ -235,7 +235,8 @@
         val leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
         try {
           ZkUtils.createPersistentPath(controllerContext.zkClient,
-            ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), leaderAndIsr.toString)
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
+            ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch))
           // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
           // took over and initialized this partition. This can happen if the current controller went into a long
           // GC pause
@@ -245,9 +246,13 @@
           partitionState.put(topicAndPartition, OnlinePartition)
         }catch {
           case e: ZkNodeExistsException =>
+            // read the controller epoch
+            val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
+              topicAndPartition.partition).get
             ControllerStat.offlinePartitionRate.mark()
             throw new StateChangeFailedException("Error while changing partition %s's state from New to Online"
-              .format(topicAndPartition) + " since Leader and ISR path already exists")
+              .format(topicAndPartition) + " since Leader and isr path already exists with value " +
+              "%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch))
         }
     }
   }
@@ -267,11 +272,18 @@
       var newLeaderAndIsr: LeaderAndIsr = null
       var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
       while(!zookeeperPathUpdateSucceeded) {
-        val currentLeaderAndIsr = getLeaderAndIsrOrThrowException(topic, partition)
+        val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition)
+        val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
+        val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
+        if(controllerEpoch > controller.epoch)
+          throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
+            "means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) +
+            "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
         // elect new leader or throw exception
         val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topic, partition, currentLeaderAndIsr)
         val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-          ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
+          ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+          ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion)
         newLeaderAndIsr = leaderAndIsr
         newLeaderAndIsr.zkVersion = newVersion
         zookeeperPathUpdateSucceeded = updateSucceeded
@@ -301,9 +313,9 @@
     zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic))
   }
 
-  private def getLeaderAndIsrOrThrowException(topic: String, partition: Int): LeaderAndIsr = {
-    ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
-      case Some(currentLeaderAndIsr) => currentLeaderAndIsr
+  private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
+    ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
+      case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
       case None =>
         throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " +
           "[%s, %d] in %s state".format(topic, partition, partitionState(TopicAndPartition(topic, partition))))
Index: core/src/main/scala/kafka/controller/KafkaController.scala
===================================================================
--- core/src/main/scala/kafka/controller/KafkaController.scala	(revision 1405048)
+++ core/src/main/scala/kafka/controller/KafkaController.scala	(working copy)
@@ -28,14 +28,16 @@
 import com.yammer.metrics.core.Gauge
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import kafka.utils.{Utils, ZkUtils, Logging}
-import org.I0Itec.zkclient.exception.ZkNoNodeException
 import java.lang.{IllegalStateException, Object}
 import kafka.admin.PreferredReplicaLeaderElectionCommand
-import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException}
+import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
+import kafka.common._
 
 class ControllerContext(val zkClient: ZkClient,
                         var controllerChannelManager: ControllerChannelManager = null,
                         val controllerLock: Object = new Object,
+                        var epoch: Int = KafkaController.InitialControllerEpoch - 1,
+                        var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1,
                         var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
                         val brokerShutdownLock: Object = new Object,
                         var allTopics: Set[String] = Set.empty,
@@ -68,6 +70,8 @@
 
 object KafkaController {
   val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps"
+  val InitialControllerEpoch = 1
+  val InitialControllerEpochZkVersion = 0
 }
 
 class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
@@ -82,6 +86,7 @@
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest)
+  zkClient.subscribeDataChanges(ZkUtils.ControllerEpochPath, new ControllerEpochListener(this))
 
   newGauge(
     "ActiveControllerCount",
@@ -90,6 +95,8 @@
     }
   )
 
+  def epoch = controllerContext.epoch
+
   /**
    * JMX operation to initiate clean shutdown of a broker. On clean shutdown,
    * the controller first determines the partitions that the shutting down
@@ -177,7 +184,7 @@
             }
           }
       }
-      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.liveBrokers)
 
       val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
       debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))
@@ -188,15 +195,18 @@
   /**
    * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
    * It does the following things on the become-controller state change -
-   * 1. Initializes the controller's context object that holds cache objects for current topics, live brokers and
+   * 1. Increments the controller epoch
+   * 2. Initializes the controller's context object that holds cache objects for current topics, live brokers and
    *    leaders for all existing partitions.
-   * 2. Starts the controller's channel manager
-   * 3. Starts the replica state machine
-   * 4. Starts the partition state machine
+   * 3. Starts the controller's channel manager
+   * 4. Starts the replica state machine
+   * 5. Starts the partition state machine
    */
   def onControllerFailover() {
     if(isRunning) {
       info("Broker %d starting become controller state transition".format(config.brokerId))
+      // increment the controller epoch
+      incrementControllerEpoch(zkClient)
       // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
       registerReassignedPartitionsListener()
       registerPreferredReplicaElectionListener()
@@ -384,6 +394,33 @@
     controllerContext.controllerChannelManager.sendRequest(brokerId, request, callback)
   }
 
+  def incrementControllerEpoch(zkClient: ZkClient) = {
+    try {
+      var newControllerEpoch = controllerContext.epoch + 1
+      val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPathIfExists(zkClient,
+        ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion)
+      if(!updateSucceeded)
+        throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
+      else {
+        controllerContext.epochZkVersion = newVersion
+        controllerContext.epoch = newControllerEpoch
+      }
+    } catch {
+      case nne: ZkNoNodeException =>
+        // if path doesn't exist, this is the first controller whose epoch should be 1
+        // the following call can still fail if another controller gets elected between checking if the path exists and
+        // trying to create the controller epoch path
+        try {
+          zkClient.createPersistent(ZkUtils.ControllerEpochPath, KafkaController.InitialControllerEpoch.toString)
+          controllerContext.epoch = KafkaController.InitialControllerEpoch
+          controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
+        } catch {
+          case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
+            "Aborting controller startup procedure")
+        }
+    }
+  }
+
   private def registerSessionExpirationListener() = {
     zkClient.subscribeStateChanges(new SessionExpirationListener())
   }
@@ -603,16 +640,23 @@
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
       // refresh leader and isr from zookeeper again
-      val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
-      zkWriteCompleteOrUnnecessary = leaderAndIsrOpt match {
-        case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes
+      val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
+      zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
+        case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes
+          val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
+          val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
+          if(controllerEpoch > epoch)
+            throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
+              "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
+              "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
           if (leaderAndIsr.isr.contains(replicaId)) {
             val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
                                                leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
             // update the new leadership decision in zookeeper or retry
             val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
               zkClient,
-              ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(),
+              ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+              ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch),
               leaderAndIsr.zkVersion)
             newLeaderAndIsr.zkVersion = newVersion
 
@@ -860,11 +904,40 @@
   }
 }
 
+class ControllerEpochListener(controller: KafkaController) extends IZkDataListener with Logging {
+  this.logIdent = "[ControllerEpochListener on " + controller.config.brokerId + "]: "
+  val controllerContext = controller.controllerContext
+
+  /**
+   * Invoked when a controller updates the epoch value
+   * @throws Exception On any error.
+   */
+  @throws(classOf[Exception])
+  def handleDataChange(dataPath: String, data: Object) {
+    debug("Controller epoch listener fired with new epoch " + data.toString)
+    controllerContext.controllerLock synchronized {
+      // read the epoch path to get the zk version
+      controllerContext.epoch = data.toString.toInt
+      controllerContext.epochZkVersion = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath)._2.getVersion
+    }
+  }
+
+  /**
+   * @throws Exception
+   *             On any error.
+   */
+  @throws(classOf[Exception])
+  def handleDataDeleted(dataPath: String) {
+  }
+}
+
 case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
                                        var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)
 
 case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
 
+case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int)
+
 object ControllerStat extends KafkaMetricsGroup {
   val offlinePartitionRate = newMeter("OfflinePartitionsPerSec",  "partitions", TimeUnit.SECONDS)
   val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",  "elections", TimeUnit.SECONDS)
Index: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
===================================================================
--- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(revision 1405048)
+++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(working copy)
@@ -83,7 +83,7 @@
     try {
       brokerRequestBatch.newBatch()
       replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
-      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some replicas to %s state".format(targetState), e)
     }
Index: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
===================================================================
--- core/src/main/scala/kafka/controller/ControllerChannelManager.scala	(revision 1405048)
+++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala	(working copy)
@@ -183,13 +183,13 @@
     }
   }
 
-  def sendRequestsToBrokers(liveBrokers: Set[Broker]) {
+  def sendRequestsToBrokers(controllerEpoch: Int, liveBrokers: Set[Broker]) {
     leaderAndIsrRequestMap.foreach { m =>
       val broker = m._1
-      val partitionStateInfos = m._2
+      val partitionStateInfos = m._2.toMap
       val leaderIds = partitionStateInfos.map(_._2.leaderAndIsr.leader).toSet
       val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders)
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch)
       debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
       sendRequest(broker, leaderAndIsrRequest, null)
     }
@@ -201,7 +201,8 @@
             if (replicas.size > 0) {
               debug("The stop replica request (delete = %s) sent to broker %d is %s"
                 .format(deletePartitions, broker, replicas.mkString(",")))
-              sendRequest(broker, new StopReplicaRequest(deletePartitions, Set.empty[(String, Int)] ++ replicas), null)
+              sendRequest(broker, new StopReplicaRequest(deletePartitions,
+                Set.empty[(String, Int)] ++ replicas, controllerEpoch), null)
             }
         }
         m.clear()
Index: core/src/main/scala/kafka/common/ErrorMapping.scala
===================================================================
--- core/src/main/scala/kafka/common/ErrorMapping.scala	(revision 1405048)
+++ core/src/main/scala/kafka/common/ErrorMapping.scala	(working copy)
@@ -40,6 +40,7 @@
   val BrokerNotAvailableCode: Short = 8
   val ReplicaNotAvailableCode: Short = 9
   val MessageSizeTooLargeCode: Short = 10
+  val StaleControllerEpochCode: Short = 11
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -52,7 +53,8 @@
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
       classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
       classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
-      classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode
+      classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode,
+      classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */
Index: core/src/main/scala/kafka/utils/ZkUtils.scala
===================================================================
--- core/src/main/scala/kafka/utils/ZkUtils.scala	(revision 1405048)
+++ core/src/main/scala/kafka/utils/ZkUtils.scala	(working copy)
@@ -24,17 +24,19 @@
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import scala.collection._
 import kafka.api.LeaderAndIsr
+import mutable.HashMap
 import org.apache.zookeeper.data.Stat
 import java.util.concurrent.locks.{ReentrantLock, Condition}
-import kafka.controller.{PartitionAndReplica, ReassignedPartitionsContext}
 import kafka.admin._
 import kafka.common.{TopicAndPartition, KafkaException, NoEpochForPartitionException}
+import kafka.controller.{LeaderIsrAndControllerEpoch, PartitionAndReplica, ReassignedPartitionsContext}
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
   val ControllerPath = "/controller"
+  val ControllerEpochPath = "/controllerEpoch"
   val ReassignPartitionsPath = "/admin/reassign_partitions"
   val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
 
@@ -74,7 +76,7 @@
     brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
   }
 
-  def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
+  def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = {
     val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
     val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndIsrPath)
     val leaderAndIsrOpt = leaderAndIsrInfo._1
@@ -85,17 +87,30 @@
     }
   }
 
-  def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat): Option[LeaderAndIsr] = {
+  def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
+    val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
+    val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndIsrPath)
+    val leaderAndIsrOpt = leaderAndIsrInfo._1
+    val stat = leaderAndIsrInfo._2
+    leaderAndIsrOpt match {
+      case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, topic, partition, stat).map(_.leaderAndIsr)
+      case None => None
+    }
+  }
+
+  def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat)
+  : Option[LeaderIsrAndControllerEpoch] = {
     Json.parseFull(leaderAndIsrStr) match {
       case Some(m) =>
         val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
         val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
         val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
+        val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt
         val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
         val zkPathVersion = stat.getVersion
         debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
           isr.toString(), zkPathVersion, topic, partition))
-        Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion))
+        Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion), controllerEpoch))
       case None => None
     }
   }
@@ -189,6 +204,15 @@
     topicDirs.consumerOwnerDir + "/" + partition
   }
 
+  def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
+    val jsonDataMap = new HashMap[String, String]
+    jsonDataMap.put("leader", leaderAndIsr.leader.toString)
+    jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
+    jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(","))
+    jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
+    Utils.stringMapToJson(jsonDataMap)
+  }
+
   /**
    *  make sure a persistent path exists in ZK. Create the path if not exist.
    */
@@ -314,6 +338,25 @@
   }
 
   /**
+   * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the current
+   * version is not the expected version, etc.) return (false, -1). If path doesn't exist, throws ZkNoNodeException
+   */
+  def conditionalUpdatePersistentPathIfExists(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = {
+    try {
+      val stat = client.writeData(path, data, expectVersion)
+      info("Conditional update of zkPath %s with value %s and expected version %d succeeded, returning the new version: %d"
+        .format(path, data, expectVersion, stat.getVersion))
+      (true, stat.getVersion)
+    } catch {
+      case nne: ZkNoNodeException => throw nne
+      case e: Exception =>
+        error("Conditional update of zkPath %s with data %s and expected version %d failed".format(path, data,
+          expectVersion), e)
+        (false, -1)
+    }
+  }
+
+  /**
    * Update the value of a persistent node with the given path and data.
    * create parrent directory if necessary. Never throw NodeExistException.
    */
Index: core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
===================================================================
--- core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala	(revision 1405048)
+++ core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala	(working copy)
@@ -44,8 +44,6 @@
     }
   }
 
-  def amILeader : Boolean = leaderId == brokerId
-
   def elect: Boolean = {
     controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
     try {
@@ -56,10 +54,12 @@
     } catch {
       case e: ZkNodeExistsException =>
         // If someone else has written the path, then
-        debug("Someone else was elected as leader other than " + brokerId)
         val data: String = controllerContext.zkClient.readData(electionPath, true)
-        if (data != null) leaderId = data.toInt
-      case e2 => throw e2
+        debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId))
+        if (data != null) {
+          leaderId = data.toInt
+        }
+      case e2 => error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
     }
     amILeader
   }
@@ -68,6 +68,8 @@
     leaderId = -1
   }
 
+  def amILeader : Boolean = leaderId == brokerId
+
   /**
    * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will
    * have its own session expiration listener and handler
@@ -79,6 +81,10 @@
      */
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
+      controllerContext.controllerLock synchronized {
+        leaderId = data.toString.toInt
+        info("New leader is %d".format(leaderId))
+      }
     }
 
     /**
Index: core/src/main/scala/kafka/server/ReplicaManager.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaManager.scala	(revision 1405048)
+++ core/src/main/scala/kafka/server/ReplicaManager.scala	(working copy)
@@ -18,6 +18,7 @@
 
 import kafka.cluster.{Broker, Partition, Replica}
 import collection._
+import mutable.HashMap
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.utils._
@@ -26,7 +27,8 @@
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.TimeUnit
 import kafka.common.{ReplicaNotAvailableException, UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
-import kafka.api.{PartitionStateInfo, LeaderAndIsrRequest}
+import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
+import kafka.controller.KafkaController
 
 
 object ReplicaManager {
@@ -38,6 +40,8 @@
                      val zkClient: ZkClient, 
                      kafkaScheduler: KafkaScheduler,
                      val logManager: LogManager) extends Logging with KafkaMetricsGroup {
+  /* epoch of the controller that last changed the leader */
+  @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   private val allPartitions = new Pool[(String, Int), Partition]
   private var leaderPartitions = new mutable.HashSet[Partition]()
   private val leaderPartitionsLock = new Object
@@ -111,6 +115,23 @@
     errorCode
   }
 
+  def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = {
+    val responseMap = new collection.mutable.HashMap[(String, Int), Short]
+    if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
+      error("Received stop replica request from an old controller epoch %d.".format(stopReplicaRequest.controllerEpoch) +
+        " Latest known controller epoch is %d " + controllerEpoch)
+      (responseMap, ErrorMapping.StaleControllerEpochCode)
+    } else {
+      controllerEpoch = stopReplicaRequest.controllerEpoch
+      val responseMap = new HashMap[(String, Int), Short]
+      for((topic, partitionId) <- stopReplicaRequest.partitions){
+        val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
+        responseMap.put((topic, partitionId), errorCode)
+      }
+      (responseMap, ErrorMapping.NoError)
+    }
+  }
+
   def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition = {
     var partition = allPartitions.get((topic, partitionId))
     if (partition == null) {
@@ -159,49 +180,42 @@
     }
   }
 
-  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): collection.Map[(String, Int), Short] = {
+  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = {
     info("Handling leader and isr request %s".format(leaderAndISRRequest))
     val responseMap = new collection.mutable.HashMap[(String, Int), Short]
+    if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
+      error("Received leader and isr request from an old controller epoch %d.".format(leaderAndISRRequest.controllerEpoch) +
+        " Latest known controller epoch is %d " + controllerEpoch)
+      (responseMap, ErrorMapping.StaleControllerEpochCode)
+    }else {
+      controllerEpoch = leaderAndISRRequest.controllerEpoch
+      for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos) {
+        var errorCode = ErrorMapping.NoError
+        val topic = topicAndPartition._1
+        val partitionId = topicAndPartition._2
 
-    for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos){
-      var errorCode = ErrorMapping.NoError
-      val topic = topicAndPartition._1
-      val partitionId = topicAndPartition._2
-
-      val requestedLeaderId = partitionStateInfo.leaderAndIsr.leader
-      try {
-        if(requestedLeaderId == config.brokerId)
-          makeLeader(topic, partitionId, partitionStateInfo)
-        else
-          makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
-      } catch {
-        case e =>
-          error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
-          errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+        val requestedLeaderId = partitionStateInfo.leaderAndIsr.leader
+        try {
+          if(requestedLeaderId == config.brokerId)
+            makeLeader(topic, partitionId, partitionStateInfo, leaderAndISRRequest.controllerEpoch)
+          else
+            makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders, leaderAndISRRequest.controllerEpoch)
+        } catch {
+          case e =>
+            error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
+            errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+        }
+        responseMap.put(topicAndPartition, errorCode)
       }
-      responseMap.put(topicAndPartition, errorCode)
+      (responseMap, ErrorMapping.NoError)
     }
-
-    /**
-     *  If IsInit flag is on, this means that the controller wants to treat topics not in the request
-     *  as deleted.
-     *  TODO: Handle this properly as part of KAFKA-330
-     */
-//    if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
-//      startHighWaterMarksCheckPointThread
-//      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.partitionStateInfos.contains(p._1)).map(entry => entry._1)
-//      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
-//      partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
-//    }
-
-    responseMap
   }
 
-  private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = {
+  private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo, cEpoch: Int) = {
     val leaderAndIsr = partitionStateInfo.leaderAndIsr
     info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeLeader(topic, partitionId, leaderAndIsr)) {
+    if (partition.makeLeader(topic, partitionId, leaderAndIsr, cEpoch)) {
       // also add this partition to the list of partitions for which the leader is the current broker
       leaderPartitionsLock synchronized {
         leaderPartitions += partition
@@ -210,14 +224,15 @@
     info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId))
   }
 
-  private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo, liveBrokers: Set[Broker]) {
+  private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo,
+                           liveBrokers: Set[Broker], cEpoch: Int) {
     val leaderAndIsr = partitionStateInfo.leaderAndIsr
     val leaderBrokerId: Int = leaderAndIsr.leader
     info("Starting the follower state transition to follow leader %d for topic %s partition %d"
                  .format(leaderBrokerId, topic, partitionId))
 
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeFollower(topic, partitionId, leaderAndIsr, liveBrokers)) {
+    if (partition.makeFollower(topic, partitionId, leaderAndIsr, liveBrokers, cEpoch)) {
       // remove this replica's partition from the ISR expiration queue
       leaderPartitionsLock synchronized {
         leaderPartitions -= partition
@@ -234,7 +249,7 @@
 
   def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {
     val partitionOpt = getPartition(topic, partitionId)
-    if(partitionOpt.isDefined){
+    if(partitionOpt.isDefined) {
       partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset)
     } else {
       warn("While recording the follower position, the partition [%s, %d] hasn't been created, skip updating leader HW".format(topic, partitionId))
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1405048)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -24,7 +24,6 @@
 import kafka.utils.{Pool, SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
-import mutable.HashMap
 import kafka.network.RequestChannel.Response
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
@@ -127,8 +126,8 @@
       requestLogger.trace("Handling leader and ISR request " + leaderAndIsrRequest)
     trace("Handling leader and ISR request " + leaderAndIsrRequest)
     try {
-      val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
-      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, responseMap)
+      val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
+      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, response, error)
       requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
     } catch {
       case e: KafkaStorageException =>
@@ -144,13 +143,8 @@
       requestLogger.trace("Handling stop replica request " + stopReplicaRequest)
     trace("Handling stop replica request " + stopReplicaRequest)
 
-    val responseMap = new HashMap[(String, Int), Short]
-    for((topic, partitionId) <- stopReplicaRequest.partitions) {
-      val errorCode = replicaManager.stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
-      responseMap.put((topic, partitionId), errorCode)
-    }
-
-    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
+    val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
+    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, response.toMap, error)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
   }
 
Index: core/src/main/scala/kafka/api/StopReplicaRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/StopReplicaRequest.scala	(revision 1405048)
+++ core/src/main/scala/kafka/api/StopReplicaRequest.scala	(working copy)
@@ -33,6 +33,7 @@
     val versionId = buffer.getShort
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val controllerEpoch = buffer.getInt
     val deletePartitions = buffer.get match {
       case 1 => true
       case 0 => false
@@ -44,7 +45,7 @@
     (1 to topicPartitionPairCount) foreach { _ =>
       topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
     }
-    StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet)
+    StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch)
   }
 }
 
@@ -52,18 +53,20 @@
                               clientId: String,
                               ackTimeoutMs: Int,
                               deletePartitions: Boolean,
-                              partitions: Set[(String, Int)])
+                              partitions: Set[(String, Int)],
+                              controllerEpoch: Int)
         extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
 
-  def this(deletePartitions: Boolean, partitions: Set[(String, Int)]) = {
+  def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int) = {
     this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
-         deletePartitions, partitions)
+         deletePartitions, partitions, controllerEpoch)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerEpoch)
     buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
     buffer.putInt(partitions.size)
     for ((topic, partitionId) <- partitions){
@@ -77,6 +80,7 @@
       2 + /* versionId */
       ApiUtils.shortStringLength(clientId) +
       4 + /* ackTimeoutMs */
+      4 + /* controller epoch */
       1 + /* deletePartitions */
       4 /* partition count */
     for ((topic, partitionId) <- partitions){
Index: core/src/main/scala/kafka/api/StopReplicaResponse.scala
===================================================================
--- core/src/main/scala/kafka/api/StopReplicaResponse.scala	(revision 1405048)
+++ core/src/main/scala/kafka/api/StopReplicaResponse.scala	(working copy)
@@ -19,13 +19,15 @@
 
 import java.nio.ByteBuffer
 import collection.mutable.HashMap
-import collection.Map
+import collection.immutable.Map
+import kafka.common.ErrorMapping
 import kafka.api.ApiUtils._
 
 
 object StopReplicaResponse {
   def readFrom(buffer: ByteBuffer): StopReplicaResponse = {
     val versionId = buffer.getShort
+    val errorCode = buffer.getShort
     val numEntries = buffer.getInt
 
     val responseMap = new HashMap[(String, Int), Short]()
@@ -35,23 +37,31 @@
       val partitionErrorCode = buffer.getShort()
       responseMap.put((topic, partition), partitionErrorCode)
     }
-    new StopReplicaResponse(versionId, responseMap)
+    new StopReplicaResponse(versionId, responseMap.toMap, errorCode)
   }
 }
 
 
 case class StopReplicaResponse(val versionId: Short,
-                               val responseMap: Map[(String, Int), Short]) extends RequestOrResponse{
+                               val responseMap: Map[(String, Int), Short],
+                               val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
   def sizeInBytes(): Int ={
-    var size = 2 + 4
-    for ((key, value) <- responseMap){
-      size += (2 + key._1.length) + 4 + 2
+    var size =
+      2 /* version id */ +
+      2 /* error code */ +
+      4 /* number of responses */
+    for ((key, value) <- responseMap) {
+      size +=
+        2 + key._1.length /* topic */ +
+        4 /* partition */ +
+        2 /* error code for this partition */
     }
     size
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
+    buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)
     for ((key:(String, Int), value) <- responseMap){
       writeShortString(buffer, key._1)
Index: core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala	(revision 1405048)
+++ core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala	(working copy)
@@ -21,8 +21,6 @@
 import java.nio._
 import kafka.utils._
 import kafka.api.ApiUtils._
-import collection.mutable.Map
-import collection.mutable.HashMap
 import kafka.cluster.Broker
 
 
@@ -35,7 +33,7 @@
   def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion)
 
   override def toString(): String = {
-    val jsonDataMap = new HashMap[String, String]
+    val jsonDataMap = new collection.mutable.HashMap[String, String]
     jsonDataMap.put("leader", leader.toString)
     jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
     jsonDataMap.put("ISR", isr.mkString(","))
@@ -47,12 +45,12 @@
 object PartitionStateInfo {
   def readFrom(buffer: ByteBuffer): PartitionStateInfo = {
     val leader = buffer.getInt
-    val leaderGenId = buffer.getInt
+    val leaderEpoch = buffer.getInt
     val isrString = readShortString(buffer)
     val isr = isrString.split(",").map(_.toInt).toList
     val zkVersion = buffer.getInt
     val replicationFactor = buffer.getInt
-    PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, isr, zkVersion), replicationFactor)
+    PartitionStateInfo(LeaderAndIsr(leader, leaderEpoch, isr, zkVersion), replicationFactor)
   }
 }
 
@@ -66,7 +64,12 @@
   }
 
   def sizeInBytes(): Int = {
-    val size = 4 + 4 + (2 + leaderAndIsr.isr.mkString(",").length) + 4 + 4
+    val size =
+      4 /* leader broker id */ +
+      4 /* leader epoch */ +
+      (2 + leaderAndIsr.isr.mkString(",").length) +
+      4 /* zk version */ +
+      4 /* replication factor */
     size
   }
 }
@@ -83,8 +86,9 @@
     val versionId = buffer.getShort
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val controllerEpoch = buffer.getInt
     val partitionStateInfosCount = buffer.getInt
-    val partitionStateInfos = new HashMap[(String, Int), PartitionStateInfo]
+    val partitionStateInfos = new collection.mutable.HashMap[(String, Int), PartitionStateInfo]
 
     for(i <- 0 until partitionStateInfosCount){
       val topic = readShortString(buffer)
@@ -99,7 +103,7 @@
     for (i <- 0 until leadersCount)
       leaders += Broker.readFrom(buffer)
 
-    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos, leaders)
+    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch)
   }
 }
 
@@ -108,17 +112,20 @@
                                 clientId: String,
                                 ackTimeoutMs: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
-                                leaders: Set[Broker])
+                                leaders: Set[Broker],
+                                controllerEpoch: Int)
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker]) = {
-    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, partitionStateInfos, liveBrokers)
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerEpoch: Int) = {
+    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
+      partitionStateInfos, liveBrokers, controllerEpoch)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerEpoch)
     buffer.putInt(partitionStateInfos.size)
     for((key, value) <- partitionStateInfos){
       writeShortString(buffer, key._1)
@@ -130,12 +137,17 @@
   }
 
   def sizeInBytes(): Int = {
-    var size = 1 + 2 + (2 + clientId.length) + 4 + 4
+    var size =
+      2 /* version id */ +
+      (2 + clientId.length) /* client id */ +
+      4 /* ack timeout */ +
+      4 /* controller epoch */ +
+      4 /* number of partitions */
     for((key, value) <- partitionStateInfos)
-      size += (2 + key._1.length) + 4 + value.sizeInBytes
-    size += 4
+      size += (2 + key._1.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */
+    size += 4 /* number of leader brokers */
     for(broker <- leaders)
-      size += broker.sizeInBytes
+      size += broker.sizeInBytes /* broker info */
     size
   }
 }
\ No newline at end of file
Index: core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
===================================================================
--- core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala	(revision 1405048)
+++ core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala	(working copy)
@@ -17,6 +17,7 @@
 
 package kafka.api
 
+import kafka.common.ErrorMapping
 import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
 import collection.mutable.HashMap
@@ -26,6 +27,7 @@
 object LeaderAndIsrResponse {
   def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = {
     val versionId = buffer.getShort
+    val errorCode = buffer.getShort
     val numEntries = buffer.getInt
     val responseMap = new HashMap[(String, Int), Short]()
     for (i<- 0 until numEntries){
@@ -34,24 +36,32 @@
       val partitionErrorCode = buffer.getShort
       responseMap.put((topic, partition), partitionErrorCode)
     }
-    new LeaderAndIsrResponse(versionId, responseMap)
+    new LeaderAndIsrResponse(versionId, responseMap, errorCode)
   }
 }
 
 
 case class LeaderAndIsrResponse(versionId: Short,
-                                responseMap: Map[(String, Int), Short])
+                                responseMap: Map[(String, Int), Short],
+                                errorCode: Short = ErrorMapping.NoError)
         extends RequestOrResponse {
   def sizeInBytes(): Int ={
-    var size =  2 + 4
-    for ((key, value) <- responseMap){
-      size += 2 + key._1.length + 4 + 2
+    var size =
+      2 /* version id */ +
+      2 /* error code */ +
+      4 /* number of responses */
+    for ((key, value) <- responseMap) {
+      size +=
+        2 + key._1.length /* topic */ +
+        4 /* partition */ +
+        2 /* error code for this partition */
     }
     size
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
+    buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)
     for ((key:(String, Int), value) <- responseMap){
       writeShortString(buffer, key._1)
