Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(revision 1324859)
+++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(working copy)
@@ -24,7 +24,6 @@
 import org.junit.Test
 import kafka.api._
 import kafka.cluster.Broker
-import kafka.common.{InvalidConfigException, NoBrokersForPartitionException, InvalidPartitionException}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.producer.async._
 import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
@@ -35,6 +34,7 @@
 import collection.mutable.ListBuffer
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
+import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException, InvalidConfigException, QueueFullException}
 
 class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
@@ -56,7 +56,7 @@
     val mockEventHandler = new EventHandler[String,String] {
 
       def handle(events: Seq[ProducerData[String,String]]) {
-        Thread.sleep(1000000)
+        Thread.sleep(500)
       }
 
       def close {}
@@ -79,6 +79,8 @@
     }
     catch {
       case e: QueueFullException => //expected
+    }finally {
+      producer.close()
     }
   }
 
@@ -319,6 +321,8 @@
       fail("Should fail with ClassCastException due to incompatible Encoder")
     } catch {
       case e: ClassCastException =>
+    }finally {
+      producer.close()
     }
   }
 
Index: core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(revision 1324859)
+++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(working copy)
@@ -22,7 +22,8 @@
 import kafka.admin.CreateTopicCommand
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.{ZkUtils, ZKStringSerializer, Utils, TestUtils}
+import org.I0Itec.zkclient.ZkClient
 
 class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -35,27 +36,22 @@
   val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
   val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
 
-  var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
   override def setUp() {
     super.setUp()
-
-    // start both servers
-    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
-    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
-
-    servers ++= List(server1, server2)
   }
 
   override def tearDown() {
-    // shutdown the servers and delete data hosted on them
-    servers.map(server => server.shutdown())
-    servers.map(server => Utils.rm(server.config.logDir))
-
     super.tearDown()
   }
 
   def testLeaderElectionWithCreateTopic {
+    var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+    // start both servers
+    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
+    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+
+    servers ++= List(server1, server2)
     // start 2 brokers
     val topic = "new-topic"
     val partitionId = 0
@@ -64,15 +60,16 @@
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
 
     // wait until leader is elected
-    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 200)
+    assertTrue("Leader should get elected", leader.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
 
-    assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
-
     // kill the server hosting the preferred replica
     servers.head.shutdown()
 
     // check if leader moves to the other server
-    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 5000)
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
     Thread.sleep(zookeeper.tickTime)
@@ -81,7 +78,6 @@
     servers.head.startup()
 
     leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
-    // TODO: Once the optimization for preferred replica re-election is in, this check should change to broker 0
     assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
 
     // shutdown current leader (broker 1)
@@ -90,5 +86,41 @@
 
     // test if the leader is the preferred replica
     assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
+    // shutdown the servers and delete data hosted on them
+    servers.map(server => server.shutdown())
+    servers.map(server => Utils.rm(server.config.logDir))
   }
+
+  // Assuming leader election happens correctly, test if epoch changes as expected
+  def testEpoch() {
+    // keep switching leaders to see if epoch changes correctly
+    val topic = "new-topic"
+    val partitionId = 0
+
+    // setup 2 brokers in ZK
+    val brokers = TestUtils.createBrokersInZk(zkClient, List(brokerId1, brokerId2))
+
+    try {
+      // create topic with 1 partition, 2 replicas, one on each broker
+      CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+
+      var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+      assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
+      assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get)
+
+      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+      newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
+      assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
+      assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get)
+
+      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+      newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+      assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
+      assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get)
+
+    }finally {
+      TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
+    }
+
+  }
 }
\ No newline at end of file
Index: core/src/test/scala/unit/kafka/server/StateChangeTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/StateChangeTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/server/StateChangeTest.scala	(revision 0)
@@ -0,0 +1,111 @@
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import junit.framework.Assert._
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{ZKQueue, TestUtils}
+import kafka.common.QueueFullException
+
+class StateChangeTest extends JUnit3Suite with ZooKeeperTestHarness {
+
+  val brokerId1 = 0
+  val port1 = TestUtils.choosePort()
+  var stateChangeQ: ZKQueue = null
+  val config = new KafkaConfig(TestUtils.createBrokerConfig(brokerId1, port1))
+
+  override def setUp() {
+    super.setUp()
+
+    // create a queue
+    val queuePath = "/brokers/state/" + config.brokerId
+    stateChangeQ = new ZKQueue(zkClient, queuePath, 10)
+  }
+
+  override def tearDown() {
+    super.tearDown()
+  }
+
+  def testZkQueuePoll() {
+    // test put API
+    val itemPath = stateChangeQ.put("test:0:follower")
+    val item = itemPath.split("/").last.split("-").last.toInt
+    assertEquals(0, item)
+
+    // dequeue the above item
+    val dequeuedItem = stateChangeQ.poll()
+    assertTrue("Queue should have one item", dequeuedItem.isDefined)
+    assertEquals("test:0:follower", dequeuedItem.get._2)
+    assertEquals("item-0000000000", dequeuedItem.get._1)
+  }
+
+  def testZkQueuePeek() {
+    val itemPath = stateChangeQ.put("test:0:follower")
+    val item = itemPath.split("/").last.split("-").last.toInt
+    assertEquals(0, item)
+
+    val peekedItem = stateChangeQ.peek
+    assertTrue("Queue should have one item", peekedItem.isDefined)
+    assertEquals("test:0:follower", peekedItem.get._2)
+
+    val peekedItem2 = stateChangeQ.peek
+    assertTrue("Queue should have one item", peekedItem2.isDefined)
+    assertEquals("test:0:follower", peekedItem2.get._2)
+  }
+
+  def testZkQueueDrainAll() {
+    for(i <- 0 until 5) {
+      val itemPath = stateChangeQ.put("test:0:follower")
+      val item = itemPath.split("/").last.split("-").last.toInt
+      assertEquals(i, item)
+    }
+
+    val items = stateChangeQ.drainAll()
+    assertEquals(5, items.size)
+    for(i <- 0 until 5)
+      assertEquals("test:0:follower", items(i)._2)
+
+    for(i <- 5 until 10) {
+      val itemPath = stateChangeQ.put("test:1:follower")
+      val item = itemPath.split("/").last.split("-").last.toInt
+      assertEquals(i+5, item)
+    }
+
+    val moreItems = stateChangeQ.drainAll()
+    assertEquals(5, moreItems.size)
+    for(i <- 0 until 5)
+      assertEquals("test:1:follower", moreItems(i)._2)
+
+    val peekItem = stateChangeQ.peek
+    assertFalse("Queue should have no more items after drainAll", peekItem.isDefined)
+  }
+
+  def testZkQueueFull() {
+    for(i <- 0 until 10) {
+      val itemPath = stateChangeQ.put("test:0:follower")
+      val item = itemPath.split("/").last.split("-").last.toInt
+      assertEquals(i, item)
+    }
+
+    try {
+      stateChangeQ.put("test:0:follower")
+      fail("Queue should be full")
+    }catch {
+      case e:QueueFullException => // expected
+    }
+  }
+
+  // TODO: Do this after patch for delete topic/delete partition is in
+  def testStateChangeRequestValidity() {
+    // mock out the StateChangeRequestHandler
+
+    // setup 3 replicas for one topic partition
+
+    // shutdown follower 1
+
+    // restart leader to trigger epoch change
+
+    // start follower 1
+
+    // test follower 1 acted only on one become follower request
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/producer/Producer.scala
===================================================================
--- core/src/main/scala/kafka/producer/Producer.scala	(revision 1324859)
+++ core/src/main/scala/kafka/producer/Producer.scala	(working copy)
@@ -18,11 +18,11 @@
 
 import async._
 import kafka.utils._
-import kafka.common.InvalidConfigException
 import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
 import kafka.serializer.Encoder
 import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
 import org.I0Itec.zkclient.ZkClient
+import kafka.common.{QueueFullException, InvalidConfigException}
 
 class Producer[K,V](config: ProducerConfig,
                     private val eventHandler: EventHandler[K,V]) // for testing only
@@ -120,6 +120,7 @@
   def close() = {
     val canShutdown = hasShutdown.compareAndSet(false, true)
     if(canShutdown) {
+      info("Shutting down producer")
       if (producerSendThread != null)
         producerSendThread.shutdown
       eventHandler.close
Index: core/src/main/scala/kafka/producer/async/QueueFullException.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/QueueFullException.scala	(revision 1324859)
+++ core/src/main/scala/kafka/producer/async/QueueFullException.scala	(working copy)
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer.async
-
-/* Indicates the queue for sending messages is full of unsent messages */
-class QueueFullException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}
Index: core/src/main/scala/kafka/admin/AdminUtils.scala
===================================================================
--- core/src/main/scala/kafka/admin/AdminUtils.scala	(revision 1324859)
+++ core/src/main/scala/kafka/admin/AdminUtils.scala	(working copy)
@@ -69,7 +69,6 @@
         replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
       ret(i) = replicaList.reverse
     }
-
     ret
   }
 
@@ -102,14 +101,14 @@
 
         for (i <-0 until partitionMetadata.size) {
           val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString))
-          val inSyncReplicas = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionInSyncPath(topic, partitions(i).toString))
+          val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitions(i))
           val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitions(i))
           debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
 
           partitionMetadata(i) = new PartitionMetadata(partitions(i),
             leader match { case None => None case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l.toInt)).head) },
             getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)),
-            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(inSyncReplicas).map(id => id.toInt)),
+            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas),
             None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
         }
         Some(new TopicMetadata(topic, partitionMetadata))
@@ -117,7 +116,6 @@
         None
       }
     }
-
     metadataList.toList
   }
 
Index: core/src/main/scala/kafka/common/NoEpochForPartitionException.scala
===================================================================
--- core/src/main/scala/kafka/common/NoEpochForPartitionException.scala	(revision 0)
+++ core/src/main/scala/kafka/common/NoEpochForPartitionException.scala	(revision 0)
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when a get epoch request is made for partition, but no epoch exists for that partition
+ */
+class NoEpochForPartitionException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/common/QueueFullException.scala
===================================================================
--- core/src/main/scala/kafka/common/QueueFullException.scala	(revision 0)
+++ core/src/main/scala/kafka/common/QueueFullException.scala	(revision 0)
@@ -0,0 +1,23 @@
+package kafka.common
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Indicates the queue for sending messages is full of unsent messages */
+class QueueFullException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
Index: core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
===================================================================
--- core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala	(revision 1324859)
+++ core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala	(working copy)
@@ -59,7 +59,7 @@
         if(validateRebalancingOperation(zkClient, group))
           info("Rebalance operation successful !")
         else
-          error("Rebalance operation failed !")
+            error("Rebalance operation failed !")
       } catch {
         case e2: Throwable => error("Error while verifying current rebalancing operation", e2)
       }
Index: core/src/main/scala/kafka/utils/ZkUtils.scala
===================================================================
--- core/src/main/scala/kafka/utils/ZkUtils.scala	(revision 1324859)
+++ core/src/main/scala/kafka/utils/ZkUtils.scala	(working copy)
@@ -25,11 +25,13 @@
 import kafka.consumer.TopicCount
 import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
 import java.util.concurrent.locks.Condition
+import kafka.common.NoEpochForPartitionException
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
+  val BrokerStatePath = "/brokers/state"
 
   def getTopicPath(topic: String): String ={
     BrokerTopicsPath + "/" + topic
@@ -59,6 +61,10 @@
     getTopicPartitionPath(topic, partitionId) + "/" + "leader"
   }
 
+  def getBrokerStateChangePath(brokerId: Int): String = {
+    BrokerStatePath + "/" + brokerId
+  }
+
   def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
       ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
   }
@@ -69,11 +75,39 @@
   }
 
   def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = {
-    val leader = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
-    if(leader == null) None
-    else Some(leader.toInt)
+    val leaderAndEpoch = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
+    if(leaderAndEpoch == null) None
+    else {
+      val leaderAndEpochInfo = leaderAndEpoch.split(";")
+      Some(leaderAndEpochInfo.head.toInt)
+    }
   }
 
+  /**
+   * This API should read the epoch in the ISR path. It is sufficient to read the epoch in the ISR path, since if the
+   * leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some
+   * other broker will retry becoming leader with the same new epoch value.
+   */
+  def getEpochForPartition(client: ZkClient, topic: String, partition: Int): Int = {
+    val lastKnownEpoch = try {
+      val isrAndEpoch = readData(client, getTopicPartitionInSyncPath(topic, partition.toString))
+      if(isrAndEpoch != null) {
+        val isrAndEpochInfo = isrAndEpoch.split(";")
+        if(isrAndEpochInfo.last.isEmpty)
+          throw new NoEpochForPartitionException("No epoch in ISR path for topic %s partition %d is empty".format(topic, partition))
+        else
+          isrAndEpochInfo.last.toInt
+      }else {
+        throw new NoEpochForPartitionException("ISR path for topic %s partition %d is empty".format(topic, partition))
+      }
+    }catch {
+      case e: ZkNoNodeException =>
+        throw new NoEpochForPartitionException("No epoch since leader never existed for topic %s partition %d".format(topic, partition))
+      case e1 => throw e1
+    }
+    lastKnownEpoch
+  }
+
   def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
     val replicaListString = readDataMaybeNull(zkClient, getTopicPartitionReplicasPath(topic, partition.toString))
     if(replicaListString == null)
@@ -83,22 +117,61 @@
     }
   }
 
+  def getInSyncReplicasForPartition(client: ZkClient, topic: String, partition: Int): Seq[Int] = {
+    val replicaListAndEpochString = readDataMaybeNull(client, getTopicPartitionInSyncPath(topic, partition.toString))
+    if(replicaListAndEpochString == null)
+      Seq.empty[Int]
+    else {
+      val replicasAndEpochInfo = replicaListAndEpochString.split(";")
+      Utils.getCSVList(replicasAndEpochInfo.head).map(r => r.toInt)
+    }
+  }
+
   def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
     val replicas = getReplicasForPartition(zkClient, topic, partition)
     debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas))
     replicas.contains(brokerId.toString)
   }
 
-  def tryToBecomeLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
+  def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[Int] = {
     try {
-      createEphemeralPathExpectConflict(zkClient, getTopicPartitionLeaderPath(topic, partition.toString), brokerId.toString)
-      true
+      // NOTE: first increment epoch, then become leader
+      val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId)
+      // TODO: Check the order of updating epoch in leader and ISR path, for failure cases
+      createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString),
+        "%d;%d".format(brokerId, newEpoch))
+      val currentISR = getInSyncReplicasForPartition(client, topic, partition)
+      updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString),
+        "%s;%d".format(currentISR.mkString(","), newEpoch))
+      info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition))
+      Some(newEpoch)
     } catch {
-      case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); false
-      case oe => false
+      case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None
+      case oe => None
     }
   }
 
+  def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int) = {
+    // read previous epoch, increment it and write it to the leader path and the ISR path.
+    val epoch = try {
+      Some(getEpochForPartition(client, topic, partition))
+    }catch {
+      case e: NoEpochForPartitionException => None
+      case e1 => throw e1
+    }
+
+    val newEpoch = epoch match {
+      case Some(partitionEpoch) =>
+        debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, epoch))
+        partitionEpoch + 1
+      case None =>
+        // this is the first time leader is elected for this partition. So set epoch to 1
+        debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
+        1
+    }
+    newEpoch
+  }
+
   def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val broker = new Broker(id, creator, host, port)
@@ -186,7 +259,7 @@
   /**
    * Create an persistent node with the given path and data. Create parents if necessary.
    */
-  def createPersistentPath(client: ZkClient, path: String, data: String): Unit = {
+  def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
     try {
       client.createPersistent(path, data)
     }
@@ -198,6 +271,10 @@
     }
   }
 
+  def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = {
+    client.createPersistentSequential(path, data)
+  }
+
   /**
    * Update the value of a persistent node with the given path and data.
    * create parrent directory if necessary. Never throw NodeExistException.
Index: core/src/main/scala/kafka/utils/ZKQueue.scala
===================================================================
--- core/src/main/scala/kafka/utils/ZKQueue.scala	(revision 0)
+++ core/src/main/scala/kafka/utils/ZKQueue.scala	(revision 0)
@@ -0,0 +1,78 @@
+package kafka.utils
+
+import kafka.utils.ZkUtils._
+import org.I0Itec.zkclient.ZkClient
+import kafka.common.QueueFullException
+
+class ZKQueue(zkClient: ZkClient, path: String, size: Int) {
+  // create the queue in ZK, if one does not exist
+  makeSurePersistentPathExists(zkClient, path)
+
+  // TODO: This API will be used by the leader to enqueue state change requests to the followers
+  def put(data: String): String = {
+    // if queue is full, throw QueueFullException
+    if(isFull)
+      throw new QueueFullException("Queue is full. Item %s will be rejected".format(data))
+    val queueLocation = createSequentialPersistentPath(zkClient, path + "/item-", data)
+    debug("Added item %s to queue at location %s".format(data, queueLocation))
+    queueLocation
+  }
+
+  def poll(): Option[(String, String)] = {
+    val allItems = getChildren(zkClient, path).sorted
+    allItems.size match {
+      case 0 => None
+      case _ =>
+        val item = allItems.head
+        val stateChangePath = path + "/" + item
+        val request = ZkUtils.readData(zkClient, stateChangePath)
+        deletePath(zkClient, stateChangePath)
+        Some(item, request)
+    }
+  }
+
+  def peek: Option[(String, String)] = {
+    val allItems = getChildren(zkClient, path).sorted
+    allItems.size match {
+      case 0 => None
+      case _ =>
+        val item = allItems.head
+        val stateChangePath = path + "/" + item
+        Some(item, ZkUtils.readData(zkClient, stateChangePath))
+    }
+  }
+
+  def remove() = poll()
+
+  def drainAll(): Seq[(String, String)] = {
+    val allItems = getChildren(zkClient, path).sorted
+    allItems.size match {
+      case 0 => Seq.empty[(String, String)]
+      case _ => allItems.map { item =>
+        // read the data and delete the node
+        val stateChangePath = path + "/" + item
+        val data = ZkUtils.readData(zkClient, stateChangePath)
+        ZkUtils.deletePath(zkClient, stateChangePath)
+        (item, data)
+      }
+    }
+  }
+
+  def readAll(): Seq[(String, String)] = {
+    val allItems = getChildren(zkClient, path).sorted
+    allItems.size match {
+      case 0 => Seq.empty[(String, String)]
+      case _ => allItems.map { item =>
+        // read the data and delete the node
+        val stateChangePath = path + "/" + item
+        val data = ZkUtils.readData(zkClient, stateChangePath)
+        (item, data)
+      }
+    }
+  }
+
+  def isEmpty: Boolean = (readAll().size == 0)
+
+  // TODO: Implement the queue shrink operation if the queue is full, as part of create/delete topic
+  def isFull: Boolean = (readAll().size == size)
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/server/StateChangeRequestHandler.scala
===================================================================
--- core/src/main/scala/kafka/server/StateChangeRequestHandler.scala	(revision 0)
+++ core/src/main/scala/kafka/server/StateChangeRequestHandler.scala	(revision 0)
@@ -0,0 +1,47 @@
+package kafka.server
+
+import kafka.utils.{ZKQueue, Logging}
+
+
+class StateChangeRequestHandler(config: KafkaConfig, stateChangeQ: ZKQueue) extends Logging {
+
+  def handleStateChanges(requestValidityCheck: (Int, String) => Boolean,
+                         epochValidityCheck: (Int, String) => Boolean) {
+    // get outstanding state change requests for this broker
+    val queuedStateChanges = stateChangeQ.drainAll()
+
+    queuedStateChanges.filter(req => requestValidityCheck(req._1.toInt, req._2)).foreach {
+      request =>
+        val stateChangeRequestInfo = request._2.split(":")
+        val topic = stateChangeRequestInfo.head
+        val partition = stateChangeRequestInfo.take(2).last.toInt
+        val stateChange = stateChangeRequestInfo.last
+        val stateChangeRequest = StateChangeRequest.getStateChangeRequest(stateChange)
+
+        stateChangeRequest match {
+          case StartReplica =>
+            if(epochValidityCheck(request._1.toInt, request._2))
+              handleStartReplica(topic, partition)
+          case CloseReplica =>
+            /**
+             * close replica requests are sent as part of delete topic or partition reassignment process
+             * To ensure that a topic will be deleted even if the broker is offline, this state change should not
+             * be protected with the epoch validity check
+             */
+            handleCloseReplica(topic, partition)
+        }
+    }
+  }
+
+  def handleStartReplica(topic: String, partition: Int) {
+    info("Received start replica state change request for topic %s partition %d on broker %d"
+      .format(topic, partition, config.brokerId))
+    // TODO: implement this as part of create topic support or partition reassignment support. Until then, it is unused
+  }
+
+  def handleCloseReplica(topic: String, partition: Int) {
+    info("Received close replica state change request for topic %s partition %d on broker %d"
+      .format(topic, partition, config.brokerId))
+    // TODO: implement this as part of delete topic support. Until then, it is unused
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/server/StateChangeRequest.scala
===================================================================
--- core/src/main/scala/kafka/server/StateChangeRequest.scala	(revision 0)
+++ core/src/main/scala/kafka/server/StateChangeRequest.scala	(revision 0)
@@ -0,0 +1,21 @@
+package kafka.server
+
+object StateChangeRequest {
+  def getStateChangeRequest(request: String): StateChangeRequest = {
+    request match {
+      case StartReplica.request => StartReplica
+      case CloseReplica.request => CloseReplica
+      case _ => throw new kafka.common.UnknownCodecException("%d is an unknown state change request".format(request))
+    }
+  }
+}
+
+sealed trait StateChangeRequest { def request: String }
+
+/* The elected leader sends the start replica state change request to all the new replicas that have been assigned
+* a partition. Note that the followers must act on this request only if the request epoch == latest partition epoch or -1 */
+case object StartReplica extends StateChangeRequest { val request = "start-replica" }
+
+/* The elected leader sends the close replica state change request to all the replicas that have been un-assigned a partition
+*  OR if a topic has been deleted. Note that the followers must act on this request even if the epoch has changed */
+case object CloseReplica extends StateChangeRequest { val request = "close-replica" }
Index: core/src/main/scala/kafka/server/KafkaZooKeeper.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(revision 1324859)
+++ core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(working copy)
@@ -23,6 +23,7 @@
 import kafka.common.{InvalidPartitionException, KafkaZookeeperClient}
 import kafka.cluster.Replica
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
+import java.lang.IllegalStateException
 
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -37,17 +38,27 @@
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
   private var zkClient: ZkClient = null
   var topics: List[String] = Nil
-  val lock = new Object()
   var existingTopics: Set[String] = Set.empty[String]
   val leaderChangeListener = new LeaderChangeListener
   val topicPartitionsChangeListener = new TopicChangeListener
+  val stateChangeListener = new StateChangeListener
+  private var stateChangeHandler: StateChangeRequestHandler = null
+  private var stateChangeQ: ZKQueue = null
+
   private val topicListenerLock = new Object
   private val leaderChangeLock = new Object
+  private val stateChangeLock = new Object
 
   def startup() {
     /* start client */
     info("connecting to ZK: " + config.zkConnect)
     zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+    val stateChangePath = ZkUtils.getBrokerStateChangePath(config.brokerId)
+    stateChangeQ = new ZKQueue(zkClient, stateChangePath, config.stateChangeQSize)
+    stateChangeHandler = new StateChangeRequestHandler(config, stateChangeQ)
+    zkClient.subscribeChildChanges(stateChangePath, stateChangeListener)
+    // create state change path if one doesn't exist. This is needed here in the absence of a create cluster admin command
+    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.getBrokerStateChangePath(config.brokerId))
     zkClient.subscribeStateChanges(new SessionExpireListener)
     registerBrokerInZk()
     subscribeToTopicAndPartitionsChanges(true)
@@ -184,7 +195,6 @@
       case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader))
       case None => // leader election
         leaderElection(replica)
-
     }
   }
 
@@ -201,9 +211,12 @@
       }catch {
         case e => // ignoring
       }
-      if(ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)) {
-        info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId))
-        // TODO: Become leader as part of KAFKA-302
+      val newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)
+      newLeaderEpoch match {
+        case Some(epoch) =>
+          info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId))
+          // TODO: Become leader as part of KAFKA-302
+        case None =>
       }
     }
   }
@@ -233,6 +246,46 @@
     }
   }
 
+  class StateChangeListener extends IZkChildListener with Logging {
+
+    @throws(classOf[Exception])
+    def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+      stateChangeLock.synchronized {
+        debug("State change listener fired for path %s in broker %d".format(parentPath, config.brokerId))
+        import scala.collection.JavaConversions._
+        val outstandingRequests = asBuffer(curChilds).sorted
+        debug("Sorted list of state change requests %s to broker id %d".format(outstandingRequests.mkString(","), config.brokerId))
+
+        stateChangeHandler.handleStateChanges(ensureStateChangeRequestIsValidOnThisBroker, ensureEpochValidity)
+      }
+    }
+
+    private def ensureStateChangeRequestIsValidOnThisBroker(epoch: Int, request: String): Boolean = {
+      // get the topic and partition that this request is meant for
+      val requestInfo = request.split(":")
+      val topic = requestInfo.head
+      val partition = requestInfo.take(2).last.toInt
+
+      // check if this broker hosts a replica for this topic and partition
+      ZkUtils.isPartitionOnBroker(zkClient, topic, partition, config.brokerId)
+    }
+
+    private def ensureEpochValidity(epoch: Int, request: String): Boolean = {
+      // get the topic and partition that this request is meant for
+      val requestInfo = request.split(":")
+      val topic = requestInfo.head
+      val partition = requestInfo.take(2).last.toInt
+
+      // check if the request's epoch matches the current leader's epoch
+      val currentLeaderEpoch = ZkUtils.getEpochForPartition(zkClient, topic, partition)
+      val validEpoch = currentLeaderEpoch == epoch
+      if(epoch > currentLeaderEpoch)
+        throw new IllegalStateException(("Illegal epoch state. Request's epoch %d larger than registered epoch %d for " +
+          "topic %s partition %d").format(epoch, currentLeaderEpoch, topic, partition))
+      validEpoch
+    }
+  }
+
   class LeaderChangeListener extends IZkDataListener with Logging {
 
     @throws(classOf[Exception])
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1324859)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -105,4 +105,7 @@
   * leader election on all replicas minus the preferred replica */
   val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
 
+  /* size of the state change request queue in Zookeeper */
+  val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000)
+
  }
