diff --git core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index d6f0737..28dfb6b 100644
--- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -21,8 +21,6 @@ package kafka.api
 import java.nio._
 import kafka.utils._
 import kafka.api.ApiUtils._
-import collection.mutable.Map
-import collection.mutable.HashMap
 import kafka.cluster.Broker
 
 
@@ -35,7 +33,7 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int
   def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion)
 
   override def toString(): String = {
-    val jsonDataMap = new HashMap[String, String]
+    val jsonDataMap = new collection.mutable.HashMap[String, String]
     jsonDataMap.put("leader", leader.toString)
     jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
     jsonDataMap.put("ISR", isr.mkString(","))
@@ -47,12 +45,12 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int
 object PartitionStateInfo {
   def readFrom(buffer: ByteBuffer): PartitionStateInfo = {
     val leader = buffer.getInt
-    val leaderGenId = buffer.getInt
+    val leaderEpoch = buffer.getInt
     val isrString = readShortString(buffer)
     val isr = isrString.split(",").map(_.toInt).toList
     val zkVersion = buffer.getInt
     val replicationFactor = buffer.getInt
-    PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, isr, zkVersion), replicationFactor)
+    PartitionStateInfo(LeaderAndIsr(leader, leaderEpoch, isr, zkVersion), replicationFactor)
   }
 }
 
@@ -66,7 +64,8 @@ case class PartitionStateInfo(val leaderAndIsr: LeaderAndIsr, val replicationFac
   }
 
   def sizeInBytes(): Int = {
-    val size = 4 + 4 + (2 + leaderAndIsr.isr.mkString(",").length) + 4 + 4
+    val size = 4 /* leader broker id */ + 4 /* leader epoch */ + (2 + leaderAndIsr.isr.mkString(",").length) +
+      4 /* zk version */ + 4 /* replication factor */
     size
   }
 }
@@ -83,8 +82,9 @@ object LeaderAndIsrRequest {
     val versionId = buffer.getShort
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val controllerEpoch = buffer.getInt
     val partitionStateInfosCount = buffer.getInt
-    val partitionStateInfos = new HashMap[(String, Int), PartitionStateInfo]
+    val partitionStateInfos = new collection.mutable.HashMap[(String, Int), PartitionStateInfo]
 
     for(i <- 0 until partitionStateInfosCount){
       val topic = readShortString(buffer)
@@ -99,7 +99,7 @@ object LeaderAndIsrRequest {
     for (i <- 0 until leadersCount)
       leaders += Broker.readFrom(buffer)
 
-    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos, leaders)
+    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch)
   }
 }
 
@@ -108,17 +108,20 @@ case class LeaderAndIsrRequest (versionId: Short,
                                 clientId: String,
                                 ackTimeoutMs: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
-                                leaders: Set[Broker])
+                                leaders: Set[Broker],
+                                controllerEpoch: Int)
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker]) = {
-    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, partitionStateInfos, liveBrokers)
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerEpoch: Int) = {
+    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
+      partitionStateInfos, liveBrokers, controllerEpoch)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerEpoch)
     buffer.putInt(partitionStateInfos.size)
     for((key, value) <- partitionStateInfos){
       writeShortString(buffer, key._1)
@@ -130,12 +133,13 @@ case class LeaderAndIsrRequest (versionId: Short,
   }
 
   def sizeInBytes(): Int = {
-    var size = 1 + 2 + (2 + clientId.length) + 4 + 4
+    var size = 2 /* version id */ + (2 + clientId.length) /* client id */ + 4 /* ack timeout */ +
+      4 /* controller epoch */ + 4 /* number of partitions */
     for((key, value) <- partitionStateInfos)
-      size += (2 + key._1.length) + 4 + value.sizeInBytes
-    size += 4
+      size += (2 + key._1.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */
+    size += 4 /* number of leader brokers */
     for(broker <- leaders)
-      size += broker.sizeInBytes
+      size += broker.sizeInBytes /* broker info */
     size
   }
 }
\ No newline at end of file
diff --git core/src/main/scala/kafka/api/StopReplicaRequest.scala core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 7993b3e..9088fa9 100644
--- core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -33,6 +33,7 @@ object StopReplicaRequest extends Logging {
     val versionId = buffer.getShort
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val controllerEpoch = buffer.getInt
     val deletePartitions = buffer.get match {
       case 1 => true
       case 0 => false
@@ -44,7 +45,7 @@ object StopReplicaRequest extends Logging {
     (1 to topicPartitionPairCount) foreach { _ =>
       topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
     }
-    StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet)
+    StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch)
   }
 }
 
@@ -52,18 +53,20 @@ case class StopReplicaRequest(versionId: Short,
                               clientId: String,
                               ackTimeoutMs: Int,
                               deletePartitions: Boolean,
-                              partitions: Set[(String, Int)])
+                              partitions: Set[(String, Int)],
+                              controllerEpoch: Int)
         extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
 
-  def this(deletePartitions: Boolean, partitions: Set[(String, Int)]) = {
+  def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int) = {
     this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
-         deletePartitions, partitions)
+         deletePartitions, partitions, controllerEpoch)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerEpoch)
     buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
     buffer.putInt(partitions.size)
     for ((topic, partitionId) <- partitions){
@@ -77,6 +80,7 @@ case class StopReplicaRequest(versionId: Short,
       2 + /* versionId */
       ApiUtils.shortStringLength(clientId) +
       4 + /* ackTimeoutMs */
+      4 + /* controller epoch */
       1 + /* deletePartitions */
       4 /* partition count */
     for ((topic, partitionId) <- partitions){
diff --git core/src/main/scala/kafka/cluster/Partition.scala core/src/main/scala/kafka/cluster/Partition.scala
index 87cd0f8..10a045d 100644
--- core/src/main/scala/kafka/cluster/Partition.scala
+++ core/src/main/scala/kafka/cluster/Partition.scala
@@ -152,7 +152,7 @@ class Partition(val topic: String,
   def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, liveBrokers: Set[Broker]): Boolean = {
     leaderIsrUpdateLock synchronized {
       if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follwer request"
+        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follower request"
           .format(leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
@@ -292,7 +292,8 @@ class Partition(val topic: String,
     info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(", ")))
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
     val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndIsr.toString(), zkVersion)
+      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
+      ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, replicaManager.controllerEpoch), zkVersion)
     if (updateSucceeded){
       inSyncReplicas = newIsr
       zkVersion = newVersion
diff --git core/src/main/scala/kafka/common/ControllerMovedException.scala core/src/main/scala/kafka/common/ControllerMovedException.scala
new file mode 100644
index 0000000..39cf36d
--- /dev/null
+++ core/src/main/scala/kafka/common/ControllerMovedException.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.common
+
+class ControllerMovedException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this() = this(null, null)
+}
\ No newline at end of file
diff --git core/src/main/scala/kafka/common/ErrorMapping.scala core/src/main/scala/kafka/common/ErrorMapping.scala
index fc06e9e..c8769e0 100644
--- core/src/main/scala/kafka/common/ErrorMapping.scala
+++ core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -40,6 +40,7 @@ object ErrorMapping {
   val BrokerNotAvailableCode: Short = 8
   val ReplicaNotAvailableCode: Short = 9
   val MessageSizeTooLargeCode: Short = 10
+  val StaleControllerEpochCode: Short = 11
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -52,7 +53,8 @@ object ErrorMapping {
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
       classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
       classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
-      classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode
+      classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode,
+      classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */
diff --git core/src/main/scala/kafka/controller/ControllerChannelManager.scala core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index f1ba2ca..38b3770 100644
--- core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -183,13 +183,13 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
     }
   }
 
-  def sendRequestsToBrokers(liveBrokers: Set[Broker]) {
+  def sendRequestsToBrokers(controllerEpoch: Int, liveBrokers: Set[Broker]) {
     leaderAndIsrRequestMap.foreach { m =>
       val broker = m._1
-      val partitionStateInfos = m._2
+      val partitionStateInfos = m._2.toMap
       val leaderIds = partitionStateInfos.map(_._2.leaderAndIsr.leader).toSet
       val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders)
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch)
       debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
       sendRequest(broker, leaderAndIsrRequest, null)
     }
@@ -201,7 +201,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
             if (replicas.size > 0) {
               debug("The stop replica request (delete = %s) sent to broker %d is %s"
                 .format(deletePartitions, broker, replicas.mkString(",")))
-              sendRequest(broker, new StopReplicaRequest(deletePartitions, Set.empty[(String, Int)] ++ replicas), null)
+              sendRequest(broker, new StopReplicaRequest(deletePartitions,
+                Set.empty[(String, Int)] ++ replicas, controllerEpoch), null)
             }
         }
         m.clear()
diff --git core/src/main/scala/kafka/controller/KafkaController.scala core/src/main/scala/kafka/controller/KafkaController.scala
index c434f9d..4bcaab8 100644
--- core/src/main/scala/kafka/controller/KafkaController.scala
+++ core/src/main/scala/kafka/controller/KafkaController.scala
@@ -28,10 +28,10 @@ import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import com.yammer.metrics.core.Gauge
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import kafka.utils.{Utils, ZkUtils, Logging}
-import org.I0Itec.zkclient.exception.ZkNoNodeException
 import java.lang.{IllegalStateException, Object}
 import kafka.admin.PreferredReplicaLeaderElectionCommand
-import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException}
+import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
+import kafka.common._
 
 class ControllerContext(val zkClient: ZkClient,
                         var controllerChannelManager: ControllerChannelManager = null,
@@ -82,6 +82,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest)
+  var epoch = 1
 
   newGauge(
     "ActiveControllerCount",
@@ -177,7 +178,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
             }
           }
       }
-      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.liveBrokers)
 
       val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
       debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))
@@ -188,15 +189,18 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   /**
    * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
    * It does the following things on the become-controller state change -
-   * 1. Initializes the controller's context object that holds cache objects for current topics, live brokers and
+   * 1. Increments the controller epoch
+   * 2. Initializes the controller's context object that holds cache objects for current topics, live brokers and
    *    leaders for all existing partitions.
-   * 2. Starts the controller's channel manager
-   * 3. Starts the replica state machine
-   * 4. Starts the partition state machine
+   * 3. Starts the controller's channel manager
+   * 4. Starts the replica state machine
+   * 5. Starts the partition state machine
    */
   def onControllerFailover() {
     if(isRunning) {
       info("Broker %d starting become controller state transition".format(config.brokerId))
+      // increment the controller epoch
+      epoch = incrementControllerEpoch(zkClient)
       // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
       registerReassignedPartitionsListener()
       registerPreferredReplicaElectionListener()
@@ -384,6 +388,30 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     controllerContext.controllerChannelManager.sendRequest(brokerId, request, callback)
   }
 
+  def incrementControllerEpoch(zkClient: ZkClient): Int = {
+    var newControllerEpoch = 1
+    // if controller persistent path doesn't exist, create one
+    ZkUtils.pathExists(zkClient, ZkUtils.ControllerEpochPath) match {
+      case true => // read the current value and increment by one
+        val previousControllerData = ZkUtils.readData(zkClient, ZkUtils.ControllerEpochPath)
+        newControllerEpoch = previousControllerData._1.toInt + 1
+        val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.ControllerEpochPath,
+          newControllerEpoch.toString, previousControllerData._2.getVersion)
+        if(!updateSucceeded)
+          throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
+      case false => // if path doesn't exist, this is the first controller whose epoch should be 1
+        // the following call can still fail if another controller gets elected between checking if the path exists and
+        // trying to create the controller epoch path
+        try {
+          zkClient.createPersistent(ZkUtils.ControllerEpochPath, "1")
+        }catch {
+          case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
+            "Aborting controller startup procedure")
+        }
+    }
+    newControllerEpoch
+  }
+
   private def registerSessionExpirationListener() = {
     zkClient.subscribeStateChanges(new SessionExpirationListener())
   }
@@ -603,16 +631,23 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
       // refresh leader and isr from zookeeper again
-      val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
-      zkWriteCompleteOrUnnecessary = leaderAndIsrOpt match {
-        case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes
+      val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
+      zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
+        case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes
+          val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
+          val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
+          if(controllerEpoch > epoch)
+            throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
+              "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
+              "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
           if (leaderAndIsr.isr.contains(replicaId)) {
             val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
                                                leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
             // update the new leadership decision in zookeeper or retry
             val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
               zkClient,
-              ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(),
+              ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+              ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch),
               leaderAndIsr.zkVersion)
             newLeaderAndIsr.zkVersion = newVersion
 
@@ -865,6 +900,8 @@ case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
 
 case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
 
+case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int)
+
 object ControllerStat extends KafkaMetricsGroup {
   val offlinePartitionRate = newMeter("OfflinePartitionsPerSec",  "partitions", TimeUnit.SECONDS)
   val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",  "elections", TimeUnit.SECONDS)
diff --git core/src/main/scala/kafka/controller/PartitionStateMachine.scala core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ec3bee5..42cefa0 100644
--- core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -86,7 +86,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         partitionAndState => handleStateChange(partitionAndState._1.topic, partitionAndState._1.partition, OnlinePartition,
                                                offlinePartitionSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some partitions to the online state", e)
     }
@@ -105,7 +105,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       partitions.foreach { topicAndPartition =>
         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some partitions to %s state".format(targetState), e)
     }
@@ -235,7 +235,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         val leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
         try {
           ZkUtils.createPersistentPath(controllerContext.zkClient,
-            ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), leaderAndIsr.toString)
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
+            ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch))
           // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
           // took over and initialized this partition. This can happen if the current controller went into a long
           // GC pause
@@ -245,9 +246,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           partitionState.put(topicAndPartition, OnlinePartition)
         }catch {
           case e: ZkNodeExistsException =>
+            // read the controller epoch
+            val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
+              topicAndPartition.partition).get
             ControllerStat.offlinePartitionRate.mark()
             throw new StateChangeFailedException("Error while changing partition %s's state from New to Online"
-              .format(topicAndPartition) + " since Leader and ISR path already exists")
+              .format(topicAndPartition) + " since Leader and isr path already exists with value " +
+              "%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch))
         }
     }
   }
@@ -267,11 +272,18 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       var newLeaderAndIsr: LeaderAndIsr = null
       var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
       while(!zookeeperPathUpdateSucceeded) {
-        val currentLeaderAndIsr = getLeaderAndIsrOrThrowException(topic, partition)
+        val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition)
+        val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
+        val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
+        if(controllerEpoch > controller.epoch)
+          throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
+            "means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) +
+            "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
         // elect new leader or throw exception
         val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topic, partition, currentLeaderAndIsr)
         val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-          ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
+          ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+          ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion)
         newLeaderAndIsr = leaderAndIsr
         newLeaderAndIsr.zkVersion = newVersion
         zookeeperPathUpdateSucceeded = updateSucceeded
@@ -301,9 +313,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic))
   }
 
-  private def getLeaderAndIsrOrThrowException(topic: String, partition: Int): LeaderAndIsr = {
-    ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
-      case Some(currentLeaderAndIsr) => currentLeaderAndIsr
+  private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
+    ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
+      case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
       case None =>
         throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " +
           "[%s, %d] in %s state".format(topic, partition, partitionState(TopicAndPartition(topic, partition))))
diff --git core/src/main/scala/kafka/controller/ReplicaStateMachine.scala core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index cfa771a..be24561 100644
--- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -83,7 +83,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     try {
       brokerRequestBatch.newBatch()
       replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
-      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some replicas to %s state".format(targetState), e)
     }
diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala
index f192ed2..03e8b9a 100644
--- core/src/main/scala/kafka/server/KafkaApis.scala
+++ core/src/main/scala/kafka/server/KafkaApis.scala
@@ -24,7 +24,6 @@ import kafka.network._
 import kafka.utils.{Pool, SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
-import mutable.HashMap
 import kafka.network.RequestChannel.Response
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
@@ -70,8 +69,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestLogger.trace("Handling leader and ISR request " + leaderAndIsrRequest)
     trace("Handling leader and ISR request " + leaderAndIsrRequest)
     try {
-      val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
-      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, responseMap)
+      val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
+      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, response, error)
       requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
     } catch {
       case e: KafkaStorageException =>
@@ -87,13 +86,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestLogger.trace("Handling stop replica request " + stopReplicaRequest)
     trace("Handling stop replica request " + stopReplicaRequest)
 
-    val responseMap = new HashMap[(String, Int), Short]
-    for((topic, partitionId) <- stopReplicaRequest.partitions) {
-      val errorCode = replicaManager.stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
-      responseMap.put((topic, partitionId), errorCode)
-    }
-
-    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
+    val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
+    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, response, error)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
   }
 
diff --git core/src/main/scala/kafka/server/ReplicaManager.scala core/src/main/scala/kafka/server/ReplicaManager.scala
index 55a33f4..2e7ea6b 100644
--- core/src/main/scala/kafka/server/ReplicaManager.scala
+++ core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -18,6 +18,7 @@ package kafka.server
 
 import kafka.cluster.{Broker, Partition, Replica}
 import collection._
+import mutable.HashMap
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.utils._
@@ -26,7 +27,7 @@ import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.TimeUnit
 import kafka.common.{ReplicaNotAvailableException, UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
-import kafka.api.{PartitionStateInfo, LeaderAndIsrRequest}
+import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
 
 
 object ReplicaManager {
@@ -38,6 +39,7 @@ class ReplicaManager(val config: KafkaConfig,
                      val zkClient: ZkClient, 
                      kafkaScheduler: KafkaScheduler,
                      val logManager: LogManager) extends Logging with KafkaMetricsGroup {
+  var controllerEpoch = 1
   private val allPartitions = new Pool[(String, Int), Partition]
   private var leaderPartitions = new mutable.HashSet[Partition]()
   private val leaderPartitionsLock = new Object
@@ -111,6 +113,23 @@ class ReplicaManager(val config: KafkaConfig,
     errorCode
   }
 
+  def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = {
+    val responseMap = new collection.mutable.HashMap[(String, Int), Short]
+    if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
+      error("Received stop replica request from an old controller epoch %d.".format(stopReplicaRequest.controllerEpoch) +
+        " Latest known controller epoch is %d " + controllerEpoch)
+      (responseMap, ErrorMapping.StaleControllerEpochCode)
+    } else {
+      controllerEpoch = stopReplicaRequest.controllerEpoch
+      val responseMap = new HashMap[(String, Int), Short]
+      for((topic, partitionId) <- stopReplicaRequest.partitions){
+        val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
+        responseMap.put((topic, partitionId), errorCode)
+      }
+      (responseMap, ErrorMapping.NoError)
+    }
+  }
+
   def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition = {
     var partition = allPartitions.get((topic, partitionId))
     if (partition == null) {
@@ -159,42 +178,35 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): collection.Map[(String, Int), Short] = {
+  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = {
     info("Handling leader and isr request %s".format(leaderAndISRRequest))
     val responseMap = new collection.mutable.HashMap[(String, Int), Short]
+    if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
+      error("Received leader and isr request from an old controller epoch %d.".format(leaderAndISRRequest.controllerEpoch) +
+        " Latest known controller epoch is %d " + controllerEpoch)
+      (responseMap, ErrorMapping.StaleControllerEpochCode)
+    }else {
+      controllerEpoch = leaderAndISRRequest.controllerEpoch
+      for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos) {
+        var errorCode = ErrorMapping.NoError
+        val topic = topicAndPartition._1
+        val partitionId = topicAndPartition._2
 
-    for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos){
-      var errorCode = ErrorMapping.NoError
-      val topic = topicAndPartition._1
-      val partitionId = topicAndPartition._2
-
-      val requestedLeaderId = partitionStateInfo.leaderAndIsr.leader
-      try {
-        if(requestedLeaderId == config.brokerId)
-          makeLeader(topic, partitionId, partitionStateInfo)
-        else
-          makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
-      } catch {
-        case e =>
-          error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
-          errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+        val requestedLeaderId = partitionStateInfo.leaderAndIsr.leader
+        try {
+          if(requestedLeaderId == config.brokerId)
+            makeLeader(topic, partitionId, partitionStateInfo)
+          else
+            makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
+        } catch {
+          case e =>
+            error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
+            errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+        }
+        responseMap.put(topicAndPartition, errorCode)
       }
-      responseMap.put(topicAndPartition, errorCode)
+      (responseMap, ErrorMapping.NoError)
     }
-
-    /**
-     *  If IsInit flag is on, this means that the controller wants to treat topics not in the request
-     *  as deleted.
-     *  TODO: Handle this properly as part of KAFKA-330
-     */
-//    if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
-//      startHighWaterMarksCheckPointThread
-//      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.partitionStateInfos.contains(p._1)).map(entry => entry._1)
-//      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
-//      partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
-//    }
-
-    responseMap
   }
 
   private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = {
@@ -234,7 +246,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {
     val partitionOpt = getPartition(topic, partitionId)
-    if(partitionOpt.isDefined){
+    if(partitionOpt.isDefined) {
       partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset)
     } else {
       warn("While recording the follower position, the partition [%s, %d] hasn't been created, skip updating leader HW".format(topic, partitionId))
diff --git core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index ef04d6a..f5cc02c 100644
--- core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -44,8 +44,6 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
     }
   }
 
-  def amILeader : Boolean = leaderId == brokerId
-
   def elect: Boolean = {
     controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
     try {
@@ -56,10 +54,12 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
     } catch {
       case e: ZkNodeExistsException =>
         // If someone else has written the path, then
-        debug("Someone else was elected as leader other than " + brokerId)
         val data: String = controllerContext.zkClient.readData(electionPath, true)
-        if (data != null) leaderId = data.toInt
-      case e2 => throw e2
+        debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId))
+        if (data != null) {
+          leaderId = data.toInt
+        }
+      case e2 => error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
     }
     amILeader
   }
@@ -68,6 +68,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
     leaderId = -1
   }
 
+  def amILeader : Boolean = leaderId == brokerId
+
   /**
    * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will
    * have its own session expiration listener and handler
@@ -79,6 +81,10 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
      */
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
+      controllerContext.controllerLock synchronized {
+        leaderId = data.toString.toInt
+        info("New leader is %d".format(leaderId))
+      }
     }
 
     /**
diff --git core/src/main/scala/kafka/utils/ZkUtils.scala core/src/main/scala/kafka/utils/ZkUtils.scala
index c87ea69..2b5e25b 100644
--- core/src/main/scala/kafka/utils/ZkUtils.scala
+++ core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -24,17 +24,19 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import scala.collection._
 import kafka.api.LeaderAndIsr
+import mutable.HashMap
 import org.apache.zookeeper.data.Stat
 import java.util.concurrent.locks.{ReentrantLock, Condition}
-import kafka.controller.{PartitionAndReplica, ReassignedPartitionsContext}
 import kafka.admin._
 import kafka.common.{TopicAndPartition, KafkaException, NoEpochForPartitionException}
+import kafka.controller.{LeaderIsrAndControllerEpoch, PartitionAndReplica, ReassignedPartitionsContext}
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
   val ControllerPath = "/controller"
+  val ControllerEpochPath = "/controllerEpoch"
   val ReassignPartitionsPath = "/admin/reassign_partitions"
   val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
 
@@ -74,7 +76,7 @@ object ZkUtils extends Logging {
     brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
   }
 
-  def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
+  def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = {
     val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
     val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndIsrPath)
     val leaderAndIsrOpt = leaderAndIsrInfo._1
@@ -85,17 +87,30 @@ object ZkUtils extends Logging {
     }
   }
 
-  def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat): Option[LeaderAndIsr] = {
+  def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
+    val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
+    val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndIsrPath)
+    val leaderAndIsrOpt = leaderAndIsrInfo._1
+    val stat = leaderAndIsrInfo._2
+    leaderAndIsrOpt match {
+      case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, topic, partition, stat).map(_.leaderAndIsr)
+      case None => None
+    }
+  }
+
+  def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat)
+  : Option[LeaderIsrAndControllerEpoch] = {
     Json.parseFull(leaderAndIsrStr) match {
       case Some(m) =>
         val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
         val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
         val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
+        val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt
         val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
         val zkPathVersion = stat.getVersion
         debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
           isr.toString(), zkPathVersion, topic, partition))
-        Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion))
+        Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion), controllerEpoch))
       case None => None
     }
   }
@@ -189,6 +204,15 @@ object ZkUtils extends Logging {
     topicDirs.consumerOwnerDir + "/" + partition
   }
 
+  def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
+    val jsonDataMap = new HashMap[String, String]
+    jsonDataMap.put("leader", leaderAndIsr.leader.toString)
+    jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
+    jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(","))
+    jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
+    Utils.stringMapToJson(jsonDataMap)
+  }
+
   /**
    *  make sure a persistent path exists in ZK. Create the path if not exist.
    */
diff --git core/src/test/scala/unit/kafka/admin/AdminTest.scala core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 69973b8..17cabc7 100644
--- core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -159,7 +159,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
     // create the topic
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
     val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas)
     val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
     assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
@@ -189,7 +189,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
     TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
 
     val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
     newTopicMetadata.errorCode match {
diff --git core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index c568e50..c732884 100644
--- core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -87,7 +87,7 @@ object SerializationTestUtils{
     val leaderAndIsr2 = new LeaderAndIsr(leader2, 1, isr2, 2)
     val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
                   ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
-    new LeaderAndIsrRequest(map, collection.immutable.Set[Broker]())
+    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1)
   }
 
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
@@ -97,7 +97,7 @@ object SerializationTestUtils{
   }
 
   def createTestStopReplicaRequest() : StopReplicaRequest = {
-    new StopReplicaRequest(deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
+    new StopReplicaRequest(controllerEpoch = 1, deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
   }
 
   def createTestStopReplicaResponse() : StopReplicaResponse = {
diff --git core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 58961ad..cf5937c 100644
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -69,7 +69,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
     val leaderForPartitionMap = Map(
       0 -> configs.head.brokerId
     )
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
     val topicMetadata = mockLogManagerAndTestTopic(topic)
     assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
     assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
diff --git core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 8239b64..38a89a2 100644
--- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -23,6 +23,10 @@ import kafka.admin.CreateTopicCommand
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
 import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.controller.ControllerChannelManager
+import kafka.cluster.Broker
+import kafka.common.ErrorMapping
+import kafka.api._
 
 class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   val brokerId1 = 0
@@ -35,6 +39,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
+  var staleControllerEpochDetected = false
+
   override def setUp() {
     super.setUp()
     // start both servers
@@ -95,4 +101,48 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     else
       assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
   }
+
+  def testLeaderElectionWithStaleControllerEpoch() {
+    // start 2 brokers
+    val topic = "new-topic"
+    val partitionId = 0
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+
+    // wait until leader is elected
+    val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
+    val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+    debug("leader Epoc: " + leaderEpoch1)
+    debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
+    assertTrue("Leader should get elected", leader1.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
+    assertEquals("First epoch value should be 0", 0, leaderEpoch1)
+
+    // start another controller
+    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort()))
+    val brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, "localhost", s.config.port))
+    val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig)
+    controllerChannelManager.startup()
+    val staleControllerEpoch = 0
+    val leaderAndIsr = new collection.mutable.HashMap[(String, Int), LeaderAndIsr]
+    leaderAndIsr.put((topic, partitionId), new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)))
+    val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 2)).toMap
+    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, staleControllerEpoch)
+
+    controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
+    TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)
+    assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)
+
+    controllerChannelManager.shutdown()
+  }
+
+  private def staleControllerEpochCallback(response: RequestOrResponse): Unit = {
+    val leaderAndIsrResponse = response.asInstanceOf[LeaderAndIsrResponse]
+    staleControllerEpochDetected = leaderAndIsrResponse.errorCode match {
+      case ErrorMapping.StaleControllerEpochCode => true
+      case _ => false
+    }
+  }
 }
\ No newline at end of file
diff --git core/src/test/scala/unit/kafka/utils/TestUtils.scala core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a5c663c..b3ebea1 100644
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -372,7 +372,9 @@ object TestUtils extends Logging {
     new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*))
   }
 
-  def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) {
+  def makeLeaderForPartition(zkClient: ZkClient, topic: String,
+                             leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int],
+                             controllerEpoch: Int) {
     leaderPerPartitionMap.foreach
     {
       leaderForPartition => {
@@ -390,7 +392,7 @@ object TestUtils extends Logging {
             newLeaderAndIsr.zkVersion += 1
           }
           ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-            newLeaderAndIsr.toString)
+            ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
         } catch {
           case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe)
         }
diff --git core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 3600873..85eec6f 100644
--- core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package kafka.zk
 
@@ -35,7 +35,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
     try {
       ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
     } catch {                       
-      case e: Exception => println("Exception in creating ephemeral node")
+      case e: Exception =>
     }
 
     var testData: String = null
