diff --git core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 26f2bd8..d1c910f 100644
--- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -73,6 +73,7 @@ object LeaderAndIsrRequest {
     val versionId = buffer.getShort
     val clientId = Utils.readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val controllerEpoch = buffer.getInt
     val leaderAndISRRequestCount = buffer.getInt
     val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndIsr]
 
@@ -83,7 +84,7 @@ object LeaderAndIsrRequest {
 
       leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
     }
-    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, leaderAndISRInfos)
+    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, controllerEpoch, leaderAndISRInfos)
   }
 }
 
@@ -91,17 +92,20 @@ object LeaderAndIsrRequest {
 case class LeaderAndIsrRequest (versionId: Short,
                                 clientId: String,
                                 ackTimeoutMs: Int,
+                                controllerEpoch: Int,
                                 leaderAndISRInfos: Map[(String, Int), LeaderAndIsr])
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
-  def this(leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
-    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos)
+  def this(controllerEpoch: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
+    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
+      controllerEpoch, leaderAndISRInfos)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerEpoch)
     buffer.putInt(leaderAndISRInfos.size)
     for((key, value) <- leaderAndISRInfos){
       Utils.writeShortString(buffer, key._1, "UTF-8")
@@ -111,7 +115,7 @@ case class LeaderAndIsrRequest (versionId: Short,
   }
 
   def sizeInBytes(): Int = {
-    var size = 1 + 2 + (2 + clientId.length) + 4 + 4
+    var size = 1 + 2 + (2 + clientId.length) + 4 + 4 + 4
     for((key, value) <- leaderAndISRInfos)
       size += (2 + key._1.length) + 4 + value.sizeInBytes
     size
diff --git core/src/main/scala/kafka/api/StopReplicaRequest.scala core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 2f2ba44..52524c5 100644
--- core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -32,28 +32,32 @@ object StopReplicaRequest {
     val versionId = buffer.getShort
     val clientId = Utils.readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val controllerEpoch = buffer.getInt
     val topicPartitionPairCount = buffer.getInt
     val topicPartitionPairSet = new HashSet[(String, Int)]()
     for (i <- 0 until topicPartitionPairCount){
       topicPartitionPairSet.add((Utils.readShortString(buffer, "UTF-8"), buffer.getInt))
     }
-    new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet)
+    new StopReplicaRequest(versionId, clientId, ackTimeoutMs, controllerEpoch, topicPartitionPairSet)
   }
 }
 
 case class StopReplicaRequest(versionId: Short,
                               clientId: String,
                               ackTimeoutMs: Int,
+                              controllerEpoch: Int,
                               stopReplicaSet: Set[(String, Int)])
         extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
-  def this(stopReplicaSet: Set[(String, Int)]) = {
-    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet)
+  def this(controllerEpoch: Int, stopReplicaSet: Set[(String, Int)]) = {
+    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
+      controllerEpoch, stopReplicaSet)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerEpoch)
     buffer.putInt(stopReplicaSet.size)
     for ((topic, partitionId) <- stopReplicaSet){
       Utils.writeShortString(buffer, topic, "UTF-8")
@@ -62,7 +66,7 @@ case class StopReplicaRequest(versionId: Short,
   }
 
   def sizeInBytes(): Int = {
-    var size = 2 + (2 + clientId.length()) + 4 + 4
+    var size = 2 + (2 + clientId.length()) + 4 + 4 + 4
     for ((topic, partitionId) <- stopReplicaSet){
       size += (2 + topic.length()) + 4
     }
diff --git core/src/main/scala/kafka/cluster/Partition.scala core/src/main/scala/kafka/cluster/Partition.scala
index 86ae2aa..a2e9668 100644
--- core/src/main/scala/kafka/cluster/Partition.scala
+++ core/src/main/scala/kafka/cluster/Partition.scala
@@ -113,7 +113,8 @@ class Partition(val topic: String,
   /**
    *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader.
    */
-  def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr, isMakingLeader: Boolean): Boolean = {
+  def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr,
+                           controllerEpoch: Int, isMakingLeader: Boolean): Boolean = {
     leaderISRUpdateLock synchronized {
       if (leaderEpoch >= leaderAndISR.leaderEpoch){
         info("Current leaderEpoch [%d] is larger or equal to the requested leaderEpoch [%d], discard the become %s request"
@@ -290,7 +291,8 @@ class Partition(val topic: String,
     info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(",")))
     val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion)
     val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString, zkVersion)
+      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
+      ZkUtils.leaderAndIsrZkData(newLeaderAndISR, replicaManager.controllerEpoch), zkVersion)
     if (updateSucceeded){
       inSyncReplicas = newISR
       zkVersion = newVersion
diff --git core/src/main/scala/kafka/common/ErrorMapping.scala core/src/main/scala/kafka/common/ErrorMapping.scala
index ba6f352..44891c9 100644
--- core/src/main/scala/kafka/common/ErrorMapping.scala
+++ core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -40,6 +40,7 @@ object ErrorMapping {
   val BrokerNotAvailableCode: Short = 8
   val ReplicaNotAvailableCode: Short = 9
   val MessageSizeTooLargeCode: Short = 10
+  val StaleControllerEpochCode: Short = 11
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -52,7 +53,8 @@ object ErrorMapping {
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
       classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
       classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
-      classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode
+      classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode,
+      classOf[StaleControllerEpochException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */
diff --git core/src/main/scala/kafka/common/StaleControllerEpochException.scala core/src/main/scala/kafka/common/StaleControllerEpochException.scala
new file mode 100644
index 0000000..842afb8
--- /dev/null
+++ core/src/main/scala/kafka/common/StaleControllerEpochException.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class StaleControllerEpochException(message: String) extends RuntimeException(message) {
+  def this(message: String, cause: Throwable) = this(message + " Root cause -> " + kafka.utils.Utils.stackTrace(cause))
+  def this() = this(null)
+}
\ No newline at end of file
diff --git core/src/main/scala/kafka/common/StateChangeFailedException.scala core/src/main/scala/kafka/common/StateChangeFailedException.scala
index a78ca6b..ed25828 100644
--- core/src/main/scala/kafka/common/StateChangeFailedException.scala
+++ core/src/main/scala/kafka/common/StateChangeFailedException.scala
@@ -18,6 +18,6 @@
 package kafka.common
 
 class StateChangeFailedException(message: String) extends RuntimeException(message) {
-  def this(message: String, cause: Throwable) = this(message + " Root cause -> " + cause.toString)
+  def this(message: String, cause: Throwable) = this(message + " Root cause -> " + kafka.utils.Utils.stackTrace(cause))
   def this() = this(null)
 }
\ No newline at end of file
diff --git core/src/main/scala/kafka/controller/ControllerChannelManager.scala core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index f709780..3438be5 100644
--- core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -158,11 +158,11 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
     }
   }
 
-  def sendRequestsToBrokers() {
+  def sendRequestsToBrokers(controllerEpoch: Int) {
     brokerRequestMap.foreach { m =>
       val broker = m._1
       val leaderAndIsr = m._2
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr)
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerEpoch, leaderAndIsr)
       debug(("The leaderAndIsr request sent to broker %d is %s").format(broker, leaderAndIsrRequest))
       sendRequest(broker, leaderAndIsrRequest, null)
     }
diff --git core/src/main/scala/kafka/controller/KafkaController.scala core/src/main/scala/kafka/controller/KafkaController.scala
index 566beee..f4616d4 100644
--- core/src/main/scala/kafka/controller/KafkaController.scala
+++ core/src/main/scala/kafka/controller/KafkaController.scala
@@ -151,23 +151,23 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
 
   /* TODO: kafka-330  This API is unused until we introduce the delete topic functionality.
   remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
-  def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
-    val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
-    for((topicPartition, brokers) <- replicaAssignment){
-      for (broker <- brokers){
-        if (!brokerToPartitionToStopReplicaMap.contains(broker))
-          brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
-        brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
-      }
-      controllerContext.allLeaders.remove(topicPartition)
-      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2))
-    }
-    for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
-      val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
-      info("Handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
-      sendRequest(broker, stopReplicaRequest)
-    }
-  }
+//  def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
+//    val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
+//    for((topicPartition, brokers) <- replicaAssignment){
+//      for (broker <- brokers){
+//        if (!brokerToPartitionToStopReplicaMap.contains(broker))
+//          brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
+//        brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
+//      }
+//      controllerContext.allLeaders.remove(topicPartition)
+//      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2))
+//    }
+//    for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
+//      val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
+//      info("Handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
+//      sendRequest(broker, stopReplicaRequest)
+//    }
+//  }
 
   /**
    * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
@@ -206,6 +206,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     controllerContext.controllerChannelManager.sendRequest(brokerId, request, callback)
   }
 
+  def epoch = controllerElector.epoch
+
   private def registerSessionExpirationListener() = {
     zkClient.subscribeStateChanges(new SessionExpireListener())
   }
diff --git core/src/main/scala/kafka/controller/PartitionStateMachine.scala core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index a9c094c..6504ca1 100644
--- core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -84,7 +84,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         partitionAndState._2.equals(OfflinePartition) || partitionAndState._2.equals(NewPartition)).foreach {
         partitionAndState => handleStateChange(partitionAndState._1._1, partitionAndState._1._2, OnlinePartition)
       }
-      brokerRequestBatch.sendRequestsToBrokers()
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
     }catch {
       case e => error("Error while moving some partitions to the online state", e)
     }
@@ -102,7 +102,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       partitions.foreach { topicAndPartition =>
         handleStateChange(topicAndPartition._1, topicAndPartition._2, targetState)
       }
-      brokerRequestBatch.sendRequestsToBrokers()
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
     }catch {
       case e => error("Error while moving some partitions to %s state".format(targetState), e)
     }
@@ -172,9 +172,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       val partition = topicPartition._2
       // check if leader and isr path exists for partition. If not, then it is in NEW state
       ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
-        case Some(currentLeaderAndIsr) =>
+        case Some(currentLeaderIsrAndEpoch) =>
           // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
-          controllerContext.liveBrokerIds.contains(currentLeaderAndIsr.leader) match {
+          controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch._1.leader) match {
             case true => // leader is alive
               partitionState.put(topicPartition, OnlinePartition)
             case false =>
@@ -233,8 +233,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         var leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
         try {
           ZkUtils.createPersistentPath(controllerContext.zkClient,
-            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString)
-          // TODO: the above write can fail only if the current controller lost its zk session and the new controller
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+            ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch))
+          // the above write can fail only if the current controller lost its zk session and the new controller
           // took over and initialized this partition. This can happen if the current controller went into a long
           // GC pause
           brokerRequestBatch.addRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr)
@@ -242,9 +243,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           partitionState.put((topic, partition), OnlinePartition)
         }catch {
           case e: ZkNodeExistsException =>
+            // read the controller epoch
+            val leaderIsrAndEpoch = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition).get
             ControllerStat.offlinePartitionRate.mark()
             throw new StateChangeFailedException("Error while changing partition [%s, %d]'s state from New to Online"
-              .format(topic, partition) + " since Leader and ISR path already exists")
+              .format(topic, partition) + " since Leader and isr path already exists with value " +
+              "%s and controller epoch %d".format(leaderIsrAndEpoch._1.toString(), leaderIsrAndEpoch._2))
         }
     }
   }
@@ -301,9 +305,16 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   private def electLeaderForPartition(topic: String, partition: Int, assignedReplicas: Seq[Int]):LeaderAndIsr = {
     var zookeeperPathUpdateSucceeded: Boolean = false
     var newLeaderAndIsr: LeaderAndIsr = null
+    var retry = false
     while(!zookeeperPathUpdateSucceeded) {
       newLeaderAndIsr = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
-        case Some(currentLeaderAndIsr) =>
+        case Some(currentLeaderIsrAndEpoch) =>
+          val currentLeaderAndIsr = currentLeaderIsrAndEpoch._1
+          val controllerEpoch = currentLeaderIsrAndEpoch._2
+          if(retry && (controllerEpoch != controller.epoch))
+            throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
+              "means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) +
+              "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
           var newLeaderAndIsr: LeaderAndIsr = currentLeaderAndIsr
           val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
           val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
@@ -338,7 +349,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           // update the new leadership decision in zookeeper or retry
           val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
             ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-            newLeaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
+            ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion)
+          if(!updateSucceeded) retry = true
           newLeaderAndIsr.zkVersion = newVersion
           zookeeperPathUpdateSucceeded = updateSucceeded
           newLeaderAndIsr
diff --git core/src/main/scala/kafka/controller/ReplicaStateMachine.scala core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 3574d3a..b72d50d 100644
--- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -86,7 +86,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
         if(partitionsAssignedToThisBroker.size == 0)
           info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
       }
-      brokerRequestBatch.sendRequestsToBrokers()
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
     }catch {
       case e => error("Error while moving some replicas to %s state".format(targetState), e)
     }
@@ -105,12 +105,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
       targetState match {
         case OnlineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(OnlineReplica, OfflineReplica), targetState)
-          // check if the leader for this partition is alive or even exists
-          // NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper
-          // for the ISR anyways
           val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
           leaderAndIsrOpt match {
-            case Some(leaderAndIsr) =>
+            case Some(leaderIsrAndEpoch) =>
+              val leaderAndIsr = leaderIsrAndEpoch._1
               controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
                 case true => // leader is alive
                   brokerRequestBatch.addRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr)
@@ -125,18 +123,26 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           // As an optimization, the controller removes dead replicas from the ISR
           var zookeeperPathUpdateSucceeded: Boolean = false
           var newLeaderAndIsr: LeaderAndIsr = null
+          var retry = false
           while(!zookeeperPathUpdateSucceeded) {
             // refresh leader and isr from zookeeper again
             val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
             leaderAndIsrOpt match {
-              case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes
+              case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes
+                val leaderAndIsr = leaderIsrAndEpoch._1
+                val controllerEpoch = leaderIsrAndEpoch._2
+                if(retry && (controllerEpoch != controller.epoch))
+                  throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
+                    "means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) +
+                    "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
                 newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
                   leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
                 info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
                 // update the new leadership decision in zookeeper or retry
                 val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-                  ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString,
-                  leaderAndIsr.zkVersion)
+                  ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+                  ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controller.epoch), leaderAndIsr.zkVersion)
+                if(!updateSucceeded) retry = true
                 newLeaderAndIsr.zkVersion = newVersion
                 zookeeperPathUpdateSucceeded = updateSucceeded
               case None => throw new StateChangeFailedException("Failed to change state of replica %d".format(replicaId) +
diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala
index 149add7..32b72e0 100644
--- core/src/main/scala/kafka/server/KafkaApis.scala
+++ core/src/main/scala/kafka/server/KafkaApis.scala
@@ -70,8 +70,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest)
     trace("Handling leader and isr request " + leaderAndISRRequest)
 
-    val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndISRRequest)
-    val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap)
+    val (responseMap, error) = replicaManager.becomeLeaderOrFollower(leaderAndISRRequest)
+    val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap, error)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse)))
   }
 
@@ -82,13 +82,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestLogger.trace("Handling stop replica request " + stopReplicaRequest)
     trace("Handling stop replica request " + stopReplicaRequest)
 
-    val responseMap = new HashMap[(String, Int), Short]
-
-    for((topic, partitionId) <- stopReplicaRequest.stopReplicaSet){
-      val errorCode = replicaManager.stopReplica(topic, partitionId)
-      responseMap.put((topic, partitionId), errorCode)
-    }
-    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
+    val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
+    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, response, error)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
   }
 
diff --git core/src/main/scala/kafka/server/ReplicaManager.scala core/src/main/scala/kafka/server/ReplicaManager.scala
index f078b99..56d1532 100644
--- core/src/main/scala/kafka/server/ReplicaManager.scala
+++ core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -22,11 +22,11 @@ import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.utils._
 import kafka.log.LogManager
-import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr}
 import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.TimeUnit
+import kafka.api.{StopReplicaRequest, LeaderAndIsrRequest, LeaderAndIsr}
 
 object ReplicaManager {
   val UnknownLogEndOffset = -1L
@@ -39,7 +39,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
   private val leaderPartitionsLock = new Object
   val replicaFetcherManager = new ReplicaFetcherManager(config, this)
   this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
-
+  var controllerEpoch = 1
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
   info("Created highwatermark file %s".format(highWatermarkCheckpoint.name))
@@ -74,7 +74,23 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
     kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
   }
 
-  def stopReplica(topic: String, partitionId: Int): Short  = {
+  def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = {
+    val responseMap = new collection.mutable.HashMap[(String, Int), Short]
+    (stopReplicaRequest.controllerEpoch < controllerEpoch) match {
+      case true =>
+        error("Received stop replica request from an old controller with epoch " + stopReplicaRequest.controllerEpoch)
+        (responseMap, ErrorMapping.StaleControllerEpochCode)
+      case false =>
+        controllerEpoch = stopReplicaRequest.controllerEpoch
+        for((topic, partitionId) <- stopReplicaRequest.stopReplicaSet){
+          val errorCode = stopReplica(topic, partitionId)
+          responseMap.put((topic, partitionId), errorCode)
+        }
+        (responseMap, ErrorMapping.NoError)
+    }
+  }
+
+  private def stopReplica(topic: String, partitionId: Int): Short  = {
     trace("Handling stop replica for partition [%s, %d]".format(topic, partitionId))
     val errorCode = ErrorMapping.NoError
     getReplica(topic, partitionId) match {
@@ -137,48 +153,40 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
     }
   }
 
-  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): collection.Map[(String, Int), Short] = {
-    info("Handling leader and isr request %s".format(leaderAndISRRequest))
+  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = {
     val responseMap = new collection.mutable.HashMap[(String, Int), Short]
-
-    for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){
-      var errorCode = ErrorMapping.NoError
-      val topic = partitionInfo._1
-      val partitionId = partitionInfo._2
-
-      val requestedLeaderId = leaderAndISR.leader
-      try {
-        if(requestedLeaderId == config.brokerId)
-          makeLeader(topic, partitionId, leaderAndISR)
-        else
-          makeFollower(topic, partitionId, leaderAndISR)
-      } catch {
-        case e =>
-          error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
-          errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+    if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
+      error("Received leader and isr request from an old controller with epoch " + leaderAndISRRequest.controllerEpoch)
+      (responseMap, ErrorMapping.StaleControllerEpochCode)
+    }else {
+      info("Handling leader and isr request %s".format(leaderAndISRRequest))
+      controllerEpoch = leaderAndISRRequest.controllerEpoch
+      for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){
+        var errorCode = ErrorMapping.NoError
+        val topic = partitionInfo._1
+        val partitionId = partitionInfo._2
+
+        val requestedLeaderId = leaderAndISR.leader
+        try {
+          if(requestedLeaderId == config.brokerId)
+            makeLeader(topic, partitionId, leaderAndISR)
+          else
+            makeFollower(topic, partitionId, leaderAndISR)
+        } catch {
+          case e =>
+            error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
+            errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+        }
+        responseMap.put(partitionInfo, errorCode)
       }
-      responseMap.put(partitionInfo, errorCode)
+      (responseMap, ErrorMapping.NoError)
     }
-
-    /**
-     *  If IsInit flag is on, this means that the controller wants to treat topics not in the request
-     *  as deleted.
-     *  TODO: Handle this properly as part of KAFKA-330
-     */
-//    if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
-//      startHighWaterMarksCheckPointThread
-//      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1)
-//      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
-//      partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
-//    }
-
-    responseMap
   }
 
   private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = {
     info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
     val partition = getOrCreatePartition(topic, partitionId)
-    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, true)) {
+    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, controllerEpoch, true)) {
       // also add this partition to the list of partitions for which the leader is the current broker
       leaderPartitionsLock synchronized {
         leaderPartitions += partition
@@ -193,7 +201,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
                  .format(leaderBrokerId, topic, partitionId))
 
     val partition = getOrCreatePartition(topic, partitionId)
-    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, false)) {
+    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, controllerEpoch, false)) {
       // remove this replica's partition from the ISR expiration queue
       leaderPartitionsLock synchronized {
         leaderPartitions -= partition
diff --git core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index ef04d6a..f6839b1 100644
--- core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -32,6 +32,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
                              brokerId: Int)
   extends LeaderElector with Logging {
   var leaderId = -1
+  var currentLeaderEpoch = 1
   // create the election path in ZK, if one does not exist
   val index = electionPath.lastIndexOf("/")
   if (index > 0)
@@ -44,13 +45,13 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
     }
   }
 
-  def amILeader : Boolean = leaderId == brokerId
-
   def elect: Boolean = {
     controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
     try {
-      createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, brokerId.toString)
-      info(brokerId + " successfully elected as leader")
+      val newLeaderEpoch = currentLeaderEpoch + 1
+      createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, leaderAndEpoch(brokerId, newLeaderEpoch))
+      info(brokerId + " successfully elected as leader with epoch " + newLeaderEpoch)
+      currentLeaderEpoch = newLeaderEpoch
       leaderId = brokerId
       onBecomingLeader()
     } catch {
@@ -58,16 +59,34 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
         // If someone else has written the path, then
         debug("Someone else was elected as leader other than " + brokerId)
         val data: String = controllerContext.zkClient.readData(electionPath, true)
-        if (data != null) leaderId = data.toInt
+        if (data != null) {
+          val leaderAndEpoch = getLeaderAndEpoch(data)
+          leaderId = leaderAndEpoch._1
+          currentLeaderEpoch = leaderAndEpoch._2
+        }
       case e2 => throw e2
     }
     amILeader
   }
 
+  def epoch: Int = {
+    controllerContext.controllerLock synchronized {
+      currentLeaderEpoch
+    }
+  }
+
   def close = {
     leaderId = -1
   }
 
+  def amILeader : Boolean = leaderId == brokerId
+
+  private def leaderAndEpoch(leader: Int, epoch: Int): String = "%d,%d".format(leader, epoch)
+
+  private def getLeaderAndEpoch(data: String): (Int, Int) = {
+    val leaderAndEpochInfo = data.toString.split(",").map(_.toInt)
+    (leaderAndEpochInfo.head, leaderAndEpochInfo.last)
+  }
   /**
    * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will
    * have its own session expiration listener and handler
@@ -79,6 +98,12 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
      */
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
+      controllerContext.controllerLock synchronized {
+        val leaderAndEpoch = getLeaderAndEpoch(data.toString)
+        leaderId = leaderAndEpoch._1
+        currentLeaderEpoch = leaderAndEpoch._2
+        info("New leader is %d and epoch is %d".format(leaderId, currentLeaderEpoch))
+      }
     }
 
     /**
diff --git core/src/main/scala/kafka/utils/ZkUtils.scala core/src/main/scala/kafka/utils/ZkUtils.scala
index 66332a4..9c029b8 100644
--- core/src/main/scala/kafka/utils/ZkUtils.scala
+++ core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -24,6 +24,7 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import scala.collection._
 import kafka.api.LeaderAndIsr
+import mutable.HashMap
 import org.apache.zookeeper.data.Stat
 import java.util.concurrent.locks.{ReentrantLock, Condition}
 import kafka.common.{KafkaException, NoEpochForPartitionException}
@@ -70,7 +71,7 @@ object ZkUtils extends Logging {
     brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
   }
 
-  def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
+  def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[(LeaderAndIsr, Int)] = {
     val leaderAndISRPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
     val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndISRPath)
     val leaderAndIsrOpt = leaderAndIsrInfo._1
@@ -82,11 +83,12 @@ object ZkUtils extends Logging {
             val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
             val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
             val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
+            val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt
             val isr = Utils.getCSVList(isrString).map(r => r.toInt)
             val zkPathVersion = stat.getVersion
             debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
               isr.toString(), zkPathVersion, topic, partition))
-            Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion))
+            Some((LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion), controllerEpoch))
           case None => None
         }
       case None => None // TODO: Handle if leader and isr info is not available in zookeeper
@@ -182,6 +184,15 @@ object ZkUtils extends Logging {
     topicDirs.consumerOwnerDir + "/" + partition
   }
 
+  def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
+    val jsonDataMap = new HashMap[String, String]
+    jsonDataMap.put("leader", leaderAndIsr.leader.toString)
+    jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
+    jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(","))
+    jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
+    Utils.stringMapToJsonString(jsonDataMap)
+  }
+
   /**
    *  make sure a persistent path exists in ZK. Create the path if not exist.
    */
@@ -426,7 +437,7 @@ object ZkUtils extends Logging {
     for((topic, partitions) <- partitionsForTopics) {
       for(partition <- partitions) {
         ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition.toInt) match {
-          case Some(leaderAndIsr) => ret.put((topic, partition.toInt), leaderAndIsr)
+          case Some(leaderIsrAndEpoch) => ret.put((topic, partition.toInt), leaderIsrAndEpoch._1)
           case None =>
         }
       }
diff --git core/src/test/scala/unit/kafka/admin/AdminTest.scala core/src/test/scala/unit/kafka/admin/AdminTest.scala
index c9e2229..b132a5b 100644
--- core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -157,7 +157,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
     // create the topic
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
     val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
                                   .partitionsMetadata.map(p => p.replicas)
     val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
@@ -189,7 +189,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
     TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
 
     val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
     newTopicMetadata.errorCode match {
diff --git core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 203e0ff..f1f8075 100644
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -69,7 +69,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
     val leaderForPartitionMap = Map(
       0 -> configs.head.brokerId
     )
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
     val topicMetadata = mockLogManagerAndTestTopic(topic)
     assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
     assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
diff --git core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
index c463763..cc558a6 100644
--- core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
+++ core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
@@ -72,7 +72,7 @@ object RpcDataSerializationTestUtils{
     val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2)
     val map = Map(((topic1, 0), leaderAndISR1),
                   ((topic2, 0), leaderAndISR2))
-    new LeaderAndIsrRequest(map)
+    new LeaderAndIsrRequest(1, map)
   }
 
   def createTestLeaderAndISRResponse() : LeaderAndISRResponse = {
@@ -82,7 +82,7 @@ object RpcDataSerializationTestUtils{
   }
 
   def createTestStopReplicaRequest() : StopReplicaRequest = {
-    new StopReplicaRequest(Set((topic1, 0), (topic2, 0)))
+    new StopReplicaRequest(1, Set((topic1, 0), (topic2, 0)))
   }
 
   def createTestStopReplicaResponse() : StopReplicaResponse = {
diff --git core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index d926813..28475af 100644
--- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -23,6 +23,11 @@ import kafka.admin.CreateTopicCommand
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
 import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.controller.{ControllerChannelManager, KafkaController}
+import kafka.cluster.Broker
+import kafka.api.{LeaderAndISRResponse, RequestOrResponse, LeaderAndIsr, LeaderAndIsrRequest}
+import kafka.common.ErrorMapping
+import collection.mutable.HashMap
 
 class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   val brokerId1 = 0
@@ -35,6 +40,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
+  var staleControllerEpochDetected = false
+
   override def setUp() {
     super.setUp()
     // start both servers
@@ -93,4 +100,47 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     else
       assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
   }
+
+  def testLeaderElectionWithStaleControllerEpoch() {
+    // start 2 brokers
+    val topic = "new-topic"
+    val partitionId = 0
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+
+    // wait until leader is elected
+    val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
+    val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+    debug("leader Epoc: " + leaderEpoch1)
+    debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
+    assertTrue("Leader should get elected", leader1.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
+    assertEquals("First epoch value should be 0", 0, leaderEpoch1)
+
+    // start another controller
+    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort()))
+    val brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, "localhost", s.config.port))
+    val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig)
+    controllerChannelManager.startup()
+    val staleControllerEpoch = 0
+    val leaderAndIsr = new HashMap[(String, Int), LeaderAndIsr]
+    leaderAndIsr.put((topic, partitionId), new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)))
+    val leaderAndIsrRequest = new LeaderAndIsrRequest(staleControllerEpoch, leaderAndIsr)
+
+    controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
+    TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)
+    assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)
+
+    controllerChannelManager.shutdown()
+  }
+
+  private def staleControllerEpochCallback(response: RequestOrResponse): Unit = {
+    val leaderAndIsrResponse = response.asInstanceOf[LeaderAndISRResponse]
+    staleControllerEpochDetected = leaderAndIsrResponse.errorCode match {
+      case ErrorMapping.StaleControllerEpochCode => true
+      case _ => false
+    }
+  }
 }
\ No newline at end of file
diff --git core/src/test/scala/unit/kafka/utils/TestUtils.scala core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 0e47daf..c020ec6 100644
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -372,7 +372,9 @@ object TestUtils extends Logging {
     new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*))
   }
 
-  def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) {
+  def makeLeaderForPartition(zkClient: ZkClient, topic: String,
+                             leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int],
+                             controllerEpoch: Int) {
     leaderPerPartitionMap.foreach
     {
       leaderForPartition => {
@@ -384,12 +386,13 @@ object TestUtils extends Logging {
           if(currentLeaderAndISROpt == None)
             newLeaderAndISR = new LeaderAndIsr(leader, List(leader))
           else{
-            newLeaderAndISR = currentLeaderAndISROpt.get
+            newLeaderAndISR = currentLeaderAndISROpt.get._1
             newLeaderAndISR.leader = leader
             newLeaderAndISR.leaderEpoch += 1
             newLeaderAndISR.zkVersion += 1
           }
-          ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath( topic, partition), newLeaderAndISR.toString)
+          ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath( topic, partition),
+            ZkUtils.leaderAndIsrZkData(newLeaderAndISR, controllerEpoch))
         } catch {
           case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe)
         }
diff --git core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 3600873..85eec6f 100644
--- core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package kafka.zk
 
@@ -35,7 +35,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
     try {
       ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
     } catch {                       
-      case e: Exception => println("Exception in creating ephemeral node")
+      case e: Exception =>
     }
 
     var testData: String = null
