Index: core/src/test/scala/unit/kafka/server/StateChangeTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/StateChangeTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/server/StateChangeTest.scala	(revision 0)
@@ -0,0 +1,99 @@
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import junit.framework.Assert._
+import kafka.zk.ZooKeeperTestHarness
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZKQueue, TestUtils, ZKStringSerializer}
+
+class StateChangeTest extends JUnit3Suite with ZooKeeperTestHarness {
+
+  var zkClient: ZkClient = null
+  val brokerId1 = 0
+  val port1 = TestUtils.choosePort()
+  var stateChangeQ: ZKQueue = null
+  val config = new KafkaConfig(TestUtils.createBrokerConfig(brokerId1, port1))
+
+  override def setUp() {
+    super.setUp()
+
+    zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer)
+
+    // create a queue
+    val queuePath = "/brokers/state/" + config.brokerId
+    stateChangeQ = new ZKQueue(zookeeper.client, queuePath)
+  }
+
+  override def tearDown() {
+    super.tearDown()
+  }
+
+  def testZkQueuePoll() {
+    // test put API
+    val itemPath = stateChangeQ.put("state-", "test:0:follower")
+    val item = itemPath.split("/").last.split("-").last.toInt
+    assertEquals(0, item)
+
+    // dequeue the above item
+    val dequeuedItem = stateChangeQ.poll()
+    assertTrue("Queue should have one item", dequeuedItem.isDefined)
+    assertEquals("test:0:follower", dequeuedItem.get._2)
+    assertEquals("state-0000000000", dequeuedItem.get._1)
+  }
+
+  def testZkQueuePeek() {
+    val itemPath = stateChangeQ.put("state-", "test:0:follower")
+    val item = itemPath.split("/").last.split("-").last.toInt
+    assertEquals(0, item)
+
+    val peekedItem = stateChangeQ.peek
+    assertTrue("Queue should have one item", peekedItem.isDefined)
+    assertEquals("test:0:follower", peekedItem.get._2)
+
+    val peekedItem2 = stateChangeQ.peek
+    assertTrue("Queue should have one item", peekedItem2.isDefined)
+    assertEquals("test:0:follower", peekedItem2.get._2)
+  }
+
+  def testZkQueueDrainAll() {
+    for(i <- 0 until 5) {
+      val itemPath = stateChangeQ.put("state-", "test:0:follower")
+      val item = itemPath.split("/").last.split("-").last.toInt
+      assertEquals(i, item)
+    }
+
+    val items = stateChangeQ.drainAll()
+    assertEquals(5, items.size)
+    for(i <- 0 until 5)
+      assertEquals("test:0:follower", items(i)._2)
+
+    for(i <- 5 until 10) {
+      val itemPath = stateChangeQ.put("state-", "test:1:follower")
+      val item = itemPath.split("/").last.split("-").last.toInt
+      assertEquals(i+5, item)
+    }
+
+    val moreItems = stateChangeQ.drainAll()
+    assertEquals(5, moreItems.size)
+    for(i <- 0 until 5)
+      assertEquals("test:1:follower", moreItems(i)._2)
+
+    val peekItem = stateChangeQ.peek
+    assertFalse("Queue should have no more items after drainAll", peekItem.isDefined)
+  }
+
+  // TODO: Do this after patch for becomeLeader is in
+  def testStateChangeRequestValidity() {
+    // mock out the StateChangeRequestHandler
+
+    // setup 3 replicas for one topic partition
+
+    // shutdown follower 1
+
+    // restart leader to trigger epoch change
+
+    // start follower 1
+
+    // test follower 1 acted only on one become follower request
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/utils/ZkUtils.scala
===================================================================
--- core/src/main/scala/kafka/utils/ZkUtils.scala	(revision 1304473)
+++ core/src/main/scala/kafka/utils/ZkUtils.scala	(working copy)
@@ -25,11 +25,13 @@
 import kafka.consumer.TopicCount
 import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
 import java.util.concurrent.locks.Condition
+import java.lang.IllegalStateException
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
+  val BrokerStatePath = "/brokers/state"
 
   def getTopicPath(topic: String): String ={
     BrokerTopicsPath + "/" + topic
@@ -59,6 +61,14 @@
     getTopicPartitionPath(topic, partitionId) + "/" + "leader"
   }
 
+  def getBrokerStateChangePath(brokerId: Int): String = {
+    BrokerStatePath + "/" + brokerId
+  }
+
+  def getTopicPartitionLeaderEpochPath(topic: String, partition: String): String = {
+    getTopicPartitionPath(topic, partition) + "/epochs/epoch-"
+  }
+
   def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
       ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
   }
@@ -74,6 +84,14 @@
     else Some(leader.toInt)
   }
 
+  def getEpochForPartition(client: ZkClient, topic: String, partition: Int): Int = {
+    val allEpochPaths = getChildren(client, getTopicPartitionLeaderEpochPath(topic, partition.toString))
+    val allEpochs = allEpochPaths.map(e => e.split("-").last.toInt)
+    if(allEpochs.size == 0)
+      throw new IllegalStateException("Leader never existed for topic %s partition %d".format(topic, partition))
+    allEpochs.sorted.last
+  }
+
   def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
     val replicaListString = readDataMaybeNull(zkClient, getTopicPartitionReplicasPath(topic, partition.toString))
     if(replicaListString == null)
@@ -89,9 +107,11 @@
     replicas.contains(brokerId.toString)
   }
 
-  def tryToBecomeLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
+  def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
     try {
-      createEphemeralPathExpectConflict(zkClient, getTopicPartitionLeaderPath(topic, partition.toString), brokerId.toString)
+      createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString), brokerId.toString)
+      // increment epoch for topic partition
+      incrementEpochForPartition(client, topic, partition, brokerId)
       true
     } catch {
       case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); false
@@ -99,6 +119,11 @@
     }
   }
 
+  def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int) = {
+    createSequentialPersistentPath(client, getTopicPartitionLeaderEpochPath(topic, partition.toString),
+      leader.toString)
+  }
+
   def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val broker = new Broker(id, creator, host, port)
@@ -186,7 +211,7 @@
   /**
    * Create an persistent node with the given path and data. Create parents if necessary.
    */
-  def createPersistentPath(client: ZkClient, path: String, data: String): Unit = {
+  def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
     try {
       client.createPersistent(path, data)
     }
@@ -198,6 +223,10 @@
     }
   }
 
+  def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = {
+    client.createPersistentSequential(path, data)
+  }
+
   /**
    * Update the value of a persistent node with the given path and data.
    * create parrent directory if necessary. Never throw NodeExistException.
Index: core/src/main/scala/kafka/utils/ZKQueue.scala
===================================================================
--- core/src/main/scala/kafka/utils/ZKQueue.scala	(revision 0)
+++ core/src/main/scala/kafka/utils/ZKQueue.scala	(revision 0)
@@ -0,0 +1,55 @@
+package kafka.utils
+
+import kafka.utils.ZkUtils._
+import org.I0Itec.zkclient.ZkClient
+
+class ZKQueue(zkClient: ZkClient, path: String) {
+  // create the queue in ZK, if one does not exist
+  makeSurePersistentPathExists(zkClient, path)
+
+  /**
+   * TODO: This API will be used by the leader to enqueue state change requests to the followers
+   */
+  def put(item: String, data: String): String =
+    createSequentialPersistentPath(zkClient, path + "/" + item, data)
+
+  def poll(): Option[(String, String)] = {
+    val allRequests = getChildren(zkClient, path).sorted
+    allRequests.size match {
+      case 0 => None
+      case _ =>
+        val requestEpoch = allRequests.head
+        val stateChangePath = path + "/" + requestEpoch
+        val request = ZkUtils.readData(zkClient, stateChangePath)
+        deletePath(zkClient, stateChangePath)
+        Some(requestEpoch, request)
+    }
+  }
+
+  def peek: Option[(String, String)] = {
+    val allRequests = getChildren(zkClient, path).sorted
+    allRequests.size match {
+      case 0 => None
+      case _ =>
+        val requestEpoch = allRequests.head
+        val stateChangePath = path + "/" + requestEpoch
+        Some(requestEpoch, ZkUtils.readData(zkClient, stateChangePath))
+    }
+  }
+
+  def remove() = poll()
+
+  def drainAll(): Seq[(String, String)] = {
+    val allRequests = getChildren(zkClient, path).sorted
+    allRequests.size match {
+      case 0 => Seq.empty[(String, String)]
+      case _ => allRequests.map { requestEpoch =>
+        // read the data and delete the node
+        val stateChangePath = path + "/" + requestEpoch
+        val data = ZkUtils.readData(zkClient, stateChangePath)
+        ZkUtils.deletePath(zkClient, stateChangePath)
+        (requestEpoch, data)
+      }
+    }
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/server/StateChangeRequestHandler.scala
===================================================================
--- core/src/main/scala/kafka/server/StateChangeRequestHandler.scala	(revision 0)
+++ core/src/main/scala/kafka/server/StateChangeRequestHandler.scala	(revision 0)
@@ -0,0 +1,53 @@
+package kafka.server
+
+import kafka.utils.{ZKQueue, Logging}
+
+
+class StateChangeRequestHandler(config: KafkaConfig, stateChangeQ: ZKQueue) extends Logging {
+
+  def handleStateChanges(requestValidityCheck: (Int, String) => Boolean,
+                         epochValidityCheck: (Int, String) => Boolean) {
+    // get outstanding state change requests for this broker
+    val queuedStateChanges = stateChangeQ.drainAll()
+
+    queuedStateChanges.filter(req => requestValidityCheck(req._1.toInt, req._2)).foreach {
+      request =>
+        val stateChangeRequestInfo = request._2.split(":")
+        val topic = stateChangeRequestInfo.head
+        val partition = stateChangeRequestInfo.take(2).last.toInt
+        val stateChange = stateChangeRequestInfo.last
+        val stateChangeRequest = StateChangeRequest.getStateChangeRequest(stateChange)
+
+        stateChangeRequest match {
+          case BecomeFollower =>
+            if(epochValidityCheck(request._1.toInt, request._2))
+              handleBecomeFollower(topic, partition)
+          case StartReplica =>
+            handleStartReplica(topic, partition)
+          case CloseReplica =>
+            /**
+             * close replica requests are sent as part of delete topic or partition reassignment process
+             * To ensure that a topic will be deleted even if the broker is offline, this state change should not
+             * be protected with the epoch validity check
+             */
+            handleCloseReplica(topic, partition)
+        }
+    }
+  }
+
+  def handleBecomeFollower(topic: String, partition: Int) {
+    info("Received become follower state change request for topic %s partition %d on broker %d"
+      .format(topic, partition, config.brokerId))
+  }
+
+  def handleStartReplica(topic: String, partition: Int) {
+    info("Received start replica state change request for topic %s partition %d on broker %d"
+      .format(topic, partition, config.brokerId))
+    // TODO: implement this once the partition reassignment patch is in. Until then this is unused.
+  }
+
+  def handleCloseReplica(topic: String, partition: Int) {
+    info("Received close replica state change request for topic %s partition %d on broker %d"
+      .format(topic, partition, config.brokerId))
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/server/StateChangeRequest.scala
===================================================================
--- core/src/main/scala/kafka/server/StateChangeRequest.scala	(revision 0)
+++ core/src/main/scala/kafka/server/StateChangeRequest.scala	(revision 0)
@@ -0,0 +1,25 @@
+package kafka.server
+
+object StateChangeRequest {
+  def getStateChangeRequest(request: String): StateChangeRequest = {
+    request match {
+      case BecomeFollower.request => BecomeFollower
+      case StartReplica.request => StartReplica
+      case CloseReplica.request => CloseReplica
+      case _ => throw new kafka.common.UnknownCodecException("%d is an unknown state change request".format(request))
+    }
+  }
+}
+
+sealed trait StateChangeRequest { def request: String }
+
+/* The elected leader sends the become follower state change request to all the followers for a partition */
+case object BecomeFollower extends StateChangeRequest { val request = "follower" }
+
+/* The elected leader sends the start replica state change request to all the new replicas that have been assigned
+* a partition. Note that the followers must act on this request even if the epoch has changed */
+case object StartReplica extends StateChangeRequest { val request = "start-replica" }
+
+/* The elected leader sends the close replica state change request to all the replicas that have been un-assigned a partition
+*  OR if a topic has been deleted. Note that the followers must act on this request even if the epoch has changed */
+case object CloseReplica extends StateChangeRequest { val request = "close-replica" }
Index: core/src/main/scala/kafka/server/KafkaZooKeeper.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(revision 1304473)
+++ core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(working copy)
@@ -23,6 +23,7 @@
 import kafka.common.{InvalidPartitionException, KafkaZookeeperClient}
 import kafka.cluster.Replica
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
+import java.lang.IllegalStateException
 
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -37,17 +38,28 @@
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
   private var zkClient: ZkClient = null
   var topics: List[String] = Nil
-  val lock = new Object()
   var existingTopics: Set[String] = Set.empty[String]
   val leaderChangeListener = new LeaderChangeListener
   val topicPartitionsChangeListener = new TopicChangeListener
+  val stateChangeListener = new StateChangeListener
+  private var stateChangeHandler: StateChangeRequestHandler = null
+  private var stateChangeQ: ZKQueue = null
+
   private val topicListenerLock = new Object
   private val leaderChangeLock = new Object
+  private val stateChangeLock = new Object
 
   def startup() {
     /* start client */
     info("connecting to ZK: " + config.zkConnect)
     zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+    val stateChangePath = ZkUtils.getBrokerStateChangePath(config.brokerId)
+    stateChangeQ = new ZKQueue(zkClient, stateChangePath)
+    stateChangeHandler = new StateChangeRequestHandler(config, stateChangeQ)
+    zkClient.subscribeChildChanges(stateChangePath, stateChangeListener)
+    // create state change path if one doesn't exist
+    // TODO: This should be moved to the create cluster admin command, when we implement that
+    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.getBrokerStateChangePath(config.brokerId))
     zkClient.subscribeStateChanges(new SessionExpireListener)
     registerBrokerInZk()
     subscribeToTopicAndPartitionsChanges(true)
@@ -184,7 +196,6 @@
       case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader))
       case None => // leader election
         leaderElection(replica)
-
     }
   }
 
@@ -233,6 +244,46 @@
     }
   }
 
+  class StateChangeListener extends IZkChildListener with Logging {
+
+    @throws(classOf[Exception])
+    def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+      stateChangeLock.synchronized {
+        debug("State change listener fired for path %s in broker %d".format(parentPath, config.brokerId))
+        import scala.collection.JavaConversions._
+        val outstandingRequests = asBuffer(curChilds).sorted
+        debug("Sorted list of state change requests %s to broker id %d".format(outstandingRequests.mkString(","), config.brokerId))
+
+        stateChangeHandler.handleStateChanges(ensureStateChangeRequestIsValidOnThisBroker, ensureEpochValidity)
+      }
+    }
+
+    private def ensureStateChangeRequestIsValidOnThisBroker(epoch: Int, request: String): Boolean = {
+      // get the topic and partition that this request is meant for
+      val requestInfo = request.split(":")
+      val topic = requestInfo.head
+      val partition = requestInfo.take(2).last.toInt
+
+      // check if this broker hosts a replica for this topic and partition
+      ZkUtils.isPartitionOnBroker(zkClient, topic, partition, config.brokerId)
+    }
+
+    private def ensureEpochValidity(epoch: Int, request: String): Boolean = {
+      // get the topic and partition that this request is meant for
+      val requestInfo = request.split(":")
+      val topic = requestInfo.head
+      val partition = requestInfo.take(2).last.toInt
+
+      // check if the request's epoch matches the current leader's epoch
+      val currentLeaderEpoch = ZkUtils.getEpochForPartition(zkClient, topic, partition)
+      val validEpoch = currentLeaderEpoch == epoch
+      if(epoch > currentLeaderEpoch)
+        throw new IllegalStateException(("Illegal epoch state. Request's epoch %d larger than registered epoch %d for " +
+          "topic %s partition %d").format(epoch, currentLeaderEpoch, topic, partition))
+      validEpoch
+    }
+  }
+
   class LeaderChangeListener extends IZkDataListener with Logging {
 
     @throws(classOf[Exception])
