Index: core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala	(revision 1386941)
+++ core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala	(working copy)
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
 package kafka.server
 
 import org.scalatest.junit.JUnit3Suite
@@ -111,7 +127,8 @@
     server1.startup()
 
     leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
-    assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
+    assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it can move to broker 0",
+      leader.isDefined && (leader.get == 0 || leader.get == 1))
 
     assertEquals(30L, hwFile1.read(topic, 0))
     // since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet
@@ -120,7 +137,8 @@
 
     server2.startup()
     leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
-    assertEquals("Leader must remain on broker 0", 0, leader.getOrElse(-1))
+    assertTrue("Leader must remain on broker 0, in case of zookeeper session expiration it can move to broker 1",
+      leader.isDefined && (leader.get == 0 || leader.get == 1))
 
     sendMessages()
     // give some time for follower 1 to record leader HW of 60
@@ -249,4 +267,4 @@
       producer.send(new ProducerData[Int, Message](topic, 0, sent1))
     }
   }
-}
\ No newline at end of file
+}
Index: core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala	(revision 1386941)
+++ core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala	(working copy)
@@ -72,7 +72,7 @@
     val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2)
     val map = Map(((topic1, 0), leaderAndISR1),
                   ((topic2, 0), leaderAndISR2))
-    new LeaderAndIsrRequest( LeaderAndIsrRequest.NotInit, map)
+    new LeaderAndIsrRequest(map)
   }
 
   def createTestLeaderAndISRResponse() : LeaderAndISRResponse = {
Index: core/src/main/scala/kafka/controller/PartitionStateMachine.scala
===================================================================
--- core/src/main/scala/kafka/controller/PartitionStateMachine.scala	(revision 0)
+++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala	(revision 0)
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.controller
+
+import collection._
+import kafka.api.LeaderAndIsr
+import kafka.utils.{Logging, ZkUtils}
+import org.I0Itec.zkclient.IZkChildListener
+import collection.JavaConversions._
+import kafka.common.{StateChangeFailedException, PartitionOfflineException, KafkaException}
+import java.util.concurrent.atomic.AtomicBoolean
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+
+/**
+ * This class represents the state machine for partitions. It defines the states that a partition can be in, and
+ * transitions to move the partition to another legal state. The different states that a partition can be in are -
+ * 1. NonExistentPartition: This state indicates that the partition was either never created or was created and then
+ *                          deleted. Valid previous state, if one exists, is OfflinePartition
+ * 2. NewPartition        : After creation, the partition is in the NewPartition state. In this state, the partition should have
+ *                          replicas assigned to it, but no leader/isr yet. Valid previous states are NonExistentPartition
+ * 3. OnlinePartition     : Once a leader is elected for a partition, it is in the OnlinePartition state.
+ *                          Valid previous states are NewPartition/OfflinePartition
+ * 4. OfflinePartition    : If, after successful leader election, the leader for partition dies, then the partition
+ *                          moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition
+ */
+class PartitionStateMachine(controller: KafkaController) extends Logging {
+  this.logIdent = "[Partition state machine on Controller " + controller.config.brokerId + "]: "
+  private val controllerContext = controller.controllerContext
+  private val zkClient = controllerContext.zkClient
+  var partitionState: mutable.Map[(String, Int), PartitionState] = mutable.Map.empty
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
+  private var isShuttingDown = new AtomicBoolean(false)
+
+  /**
+   * Invoked on successful controller election. First registers a topic change listener since that triggers all
+   * state transitions for partitions. Initializes the state of partitions by reading from zookeeper. Then triggers
+   * the OnlinePartition state change for all new or offline partitions.
+   */
+  def startup() {
+    isShuttingDown.set(false)
+    // initialize partition state
+    initializePartitionState()
+    // try to move partitions to online state
+    triggerOnlinePartitionStateChange()
+    info("Started partition state machine with initial state -> " + partitionState.toString())
+  }
+
+  // register topic and partition change listeners
+  def registerListeners() {
+    registerTopicChangeListener()
+  }
+
+  /**
+   * Invoked on controller shutdown.
+   */
+  def shutdown() {
+    isShuttingDown.compareAndSet(false, true)
+    partitionState.clear()
+  }
+
+  /**
+   * This API invokes the OnlinePartition state change on all partitions in either the NewPartition or OfflinePartition
+   * state. This is called on a successful controller election and on broker changes
+   */
+  def triggerOnlinePartitionStateChange() {
+    try {
+      brokerRequestBatch.newBatch()
+      // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state
+      partitionState.filter(partitionAndState =>
+        partitionAndState._2.equals(OfflinePartition) || partitionAndState._2.equals(NewPartition)).foreach {
+        partitionAndState => handleStateChange(partitionAndState._1._1, partitionAndState._1._2, OnlinePartition)
+      }
+      brokerRequestBatch.sendRequestsToBrokers()
+    }catch {
+      case e => error("Error while moving some partitions to the online state", e)
+    }
+  }
+
+  /**
+   * This API is invoked by the partition change zookeeper listener
+   * @param partitions   The list of partitions that need to be transitioned to the target state
+   * @param targetState  The state that the partitions should be moved to
+   */
+  def handleStateChanges(partitions: Seq[(String, Int)], targetState: PartitionState) {
+    info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
+    try {
+      brokerRequestBatch.newBatch()
+      partitions.foreach { topicAndPartition =>
+        handleStateChange(topicAndPartition._1, topicAndPartition._2, targetState)
+      }
+      brokerRequestBatch.sendRequestsToBrokers()
+    }catch {
+      case e => error("Error while moving some partitions to %s state".format(targetState), e)
+    }
+  }
+
+  /**
+   * This API exercises the partition's state machine. It ensures that every state transition happens from a legal
+   * previous state to the target state.
+   * @param topic       The topic of the partition for which the state transition is invoked
+   * @param partition   The partition for which the state transition is invoked
+   * @param targetState The end state that the partition should be moved to
+   */
+  private def handleStateChange(topic: String, partition: Int, targetState: PartitionState) {
+    try {
+      partitionState.getOrElseUpdate((topic, partition), NonExistentPartition)
+      targetState match {
+        case NewPartition =>
+          // pre: partition did not exist before this
+          // post: partition has been assigned replicas
+          assertValidPreviousStates(topic, partition, List(NonExistentPartition), NewPartition)
+          assignReplicasToPartitions(topic, partition)
+          partitionState.put((topic, partition), NewPartition)
+          info("Partition [%s, %d] state changed from NotExists to New with assigned replicas ".format(topic, partition) +
+            "%s".format(controllerContext.partitionReplicaAssignment(topic, partition).mkString(",")))
+        case OnlinePartition =>
+          // pre: partition should be in New state
+          assertValidPreviousStates(topic, partition, List(NewPartition, OfflinePartition), OnlinePartition)
+          partitionState(topic, partition) match {
+            case NewPartition =>
+              // initialize leader and isr path for new partition
+              initializeLeaderAndIsrForPartition(topic, partition, brokerRequestBatch)
+            case OfflinePartition =>
+              electLeaderForOfflinePartition(topic, partition, brokerRequestBatch)
+            case _ => // should never come here since illegal previous states are checked above
+          }
+          info("Partition [%s, %d] state changed from %s to Online with leader %d".format(topic, partition,
+            partitionState(topic, partition), controllerContext.allLeaders(topic, partition)))
+          partitionState.put((topic, partition), OnlinePartition)
+           // post: partition has a leader
+        case OfflinePartition =>
+          // pre: partition should be in Online state
+          assertValidPreviousStates(topic, partition, List(NewPartition, OnlinePartition), OfflinePartition)
+          // should be called when the leader for a partition is no longer alive
+          info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition))
+          partitionState.put((topic, partition), OfflinePartition)
+          // post: partition has no alive leader
+        case NonExistentPartition =>
+          // pre: partition could be in either of the above states
+          assertValidPreviousStates(topic, partition, List(OfflinePartition), NonExistentPartition)
+          info("Partition [%s, %d] state changed from Offline to NotExists".format(topic, partition))
+          partitionState.put((topic, partition), NonExistentPartition)
+          // post: partition state is deleted from all brokers and zookeeper
+      }
+    }catch {
+      case e => error("State change for partition [%s, %d] ".format(topic, partition) +
+        "from %s to %s failed".format(partitionState(topic, partition), targetState), e)
+    }
+  }
+
+  /**
+   * Invoked on startup of the partition's state machine to set the initial state for all existing partitions in
+   * zookeeper
+   */
+  private def initializePartitionState() {
+    for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
+      val topic = topicPartition._1
+      val partition = topicPartition._2
+      // check if leader and isr path exists for partition. If not, then it is in NEW state
+      ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
+        case Some(currentLeaderAndIsr) =>
+          // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
+          controllerContext.liveBrokerIds.contains(currentLeaderAndIsr.leader) match {
+            case true => // leader is alive
+              partitionState.put(topicPartition, OnlinePartition)
+            case false =>
+              partitionState.put(topicPartition, OfflinePartition)
+          }
+        case None =>
+          partitionState.put(topicPartition, NewPartition)
+      }
+    }
+  }
+
+  private def assertValidPreviousStates(topic: String, partition: Int, fromStates: Seq[PartitionState],
+                                        targetState: PartitionState) {
+    if(!fromStates.contains(partitionState((topic, partition))))
+      throw new IllegalStateException("Partition [%s, %d] should be in the %s states before moving to %s state"
+        .format(topic, partition, fromStates.mkString(","), targetState) + ". Instead it is in %s state"
+        .format(partitionState((topic, partition))))
+  }
+
+  /**
+   * Invoked on the NonExistentPartition->NewPartition state transition to update the controller's cache with the
+   * partition's replica assignment.
+   * @topic     The topic of the partition whose replica assignment is to be cached
+   * @partition The partition whose replica assignment is to be cached
+   */
+  private def assignReplicasToPartitions(topic: String, partition: Int) {
+    val assignedReplicas = ZkUtils.getReplicasForPartition(controllerContext.zkClient, topic, partition)
+    controllerContext.partitionReplicaAssignment += (topic, partition) -> assignedReplicas
+  }
+
+  /**
+   * Invoked on the NewPartition->OnlinePartition state change. When a partition is in the New state, it does not have
+   * a leader and isr path in zookeeper. Once the partition moves to the OnlinePartition state, it's leader and isr
+   * path gets initialized and it never goes back to the NewPartition state. From here, it can only go to the
+   * OfflinePartition state.
+   * @topic               The topic of the partition whose leader and isr path is to be initialized
+   * @partition           The partition whose leader and isr path is to be initialized
+   * @brokerRequestBatch  The object that holds the leader and isr requests to be sent to each broker as a result of
+   *                      this state change
+   */
+  private def initializeLeaderAndIsrForPartition(topic: String, partition: Int,
+                                                 brokerRequestBatch: ControllerBrokerRequestBatch) {
+    debug("Initializing leader and isr for partition [%s, %d]".format(topic, partition))
+    val replicaAssignment = controllerContext.partitionReplicaAssignment((topic, partition))
+    val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
+    liveAssignedReplicas.size match {
+      case 0 =>
+        ControllerStat.offlinePartitionRate.mark()
+        throw new StateChangeFailedException(("During state change of partition (%s, %d) from NEW to ONLINE, assigned replicas are " +
+          "[%s], live brokers are [%s]. No assigned replica is alive").format(topic, partition,
+          replicaAssignment.mkString(","), controllerContext.liveBrokerIds))
+      case _ =>
+        debug("Live assigned replicas for partition [%s, %d] are: [%s]".format(topic, partition, liveAssignedReplicas))
+        // make the first replica in the list of assigned replicas, the leader
+        val leader = liveAssignedReplicas.head
+        var leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
+        try {
+          ZkUtils.createPersistentPath(controllerContext.zkClient,
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString)
+          // TODO: the above write can fail only if the current controller lost its zk session and the new controller
+          // took over and initialized this partition. This can happen if the current controller went into a long
+          // GC pause
+          brokerRequestBatch.addRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr)
+          controllerContext.allLeaders.put((topic, partition), leaderAndIsr.leader)
+          partitionState.put((topic, partition), OnlinePartition)
+        }catch {
+          case e: ZkNodeExistsException =>
+            ControllerStat.offlinePartitionRate.mark()
+            throw new StateChangeFailedException("Error while changing partition [%s, %d]'s state from New to Online"
+              .format(topic, partition) + " since Leader and ISR path already exists")
+        }
+    }
+  }
+
+  /**
+   * Invoked on the OfflinePartition->OnlinePartition state change. It invokes the leader election API to elect a leader
+   * for the input offline partition
+   * @topic               The topic of the offline partition
+   * @partition           The offline partition
+   * @brokerRequestBatch  The object that holds the leader and isr requests to be sent to each broker as a result of
+   *                      this state change
+   */
+  private def electLeaderForOfflinePartition(topic: String, partition: Int,
+                                             brokerRequestBatch: ControllerBrokerRequestBatch) {
+    /** handle leader election for the partitions whose leader is no longer alive **/
+    info("Electing leader for Offline partition [%s, %d]".format(topic, partition))
+    try {
+      controllerContext.partitionReplicaAssignment.get((topic, partition)) match {
+        case Some(assignedReplicas) =>
+          val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+          try {
+            // elect new leader or throw exception
+            val newLeaderAndIsr = electLeaderForPartition(topic, partition, assignedReplicas)
+            info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
+            // store new leader and isr info in cache
+            brokerRequestBatch.addRequestForBrokers(liveAssignedReplicasToThisPartition, topic, partition,
+              newLeaderAndIsr)
+          }catch {
+            case e => throw new StateChangeFailedException(("Error while electing leader for partition" +
+              " [%s, %d]").format(topic, partition), e)
+          }
+        case None => throw new KafkaException(("While handling broker changes, the " +
+          "partition [%s, %d] doesn't have assigned replicas. The replica assignment for other partitions is %s")
+          .format(topic, partition, controllerContext.partitionReplicaAssignment))
+      }
+    }catch {
+      case e => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
+        .format(topic, partition) + " Marking this partition offline")
+    }
+    debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
+  }
+
+  /**
+   * @param topic                      The topic of the partition whose leader needs to be elected
+   * @param partition                  The partition whose leader needs to be elected
+   * @param assignedReplicas           The list of replicas assigned to the input partition
+   * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
+   * This API selects a new leader for the input partition -
+   * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader
+   * 2. Else, it picks some alive broker from the assigned replica list as the new leader
+   * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException
+   * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
+   */
+  private def electLeaderForPartition(topic: String, partition: Int, assignedReplicas: Seq[Int]):LeaderAndIsr = {
+    var zookeeperPathUpdateSucceeded: Boolean = false
+    var newLeaderAndIsr: LeaderAndIsr = null
+    while(!zookeeperPathUpdateSucceeded) {
+      newLeaderAndIsr = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
+        case Some(currentLeaderAndIsr) =>
+          var newLeaderAndIsr: LeaderAndIsr = currentLeaderAndIsr
+          val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+          val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
+          val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
+          val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
+          debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]"
+            .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr,
+            currentLeaderIsrZkPathVersion))
+          newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
+            case true =>
+              debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s"
+                .format(liveAssignedReplicasToThisPartition.mkString(",")))
+              liveAssignedReplicasToThisPartition.isEmpty match {
+                case true =>
+                  ControllerStat.offlinePartitionRate.mark()
+                  throw new PartitionOfflineException(("No replica for partition " +
+                  "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) +
+                  " Assigned replicas are: [%s]".format(assignedReplicas))
+                case false =>
+                  ControllerStat.uncleanLeaderElectionRate.mark()
+                  val newLeader = liveAssignedReplicasToThisPartition.head
+                  warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
+                    "There's potential data loss")
+                  new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
+              }
+            case false =>
+              val newLeader = liveBrokersInIsr.head
+              debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader)
+              new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
+          }
+          info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
+          // update the new leadership decision in zookeeper or retry
+          val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+            newLeaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
+          newLeaderAndIsr.zkVersion = newVersion
+          zookeeperPathUpdateSucceeded = updateSucceeded
+          newLeaderAndIsr
+        case None =>
+          throw new StateChangeFailedException("On broker changes, " +
+            "there's no leaderAndISR information for partition (%s, %d) in zookeeper".format(topic, partition))
+      }
+    }
+    // update the leader cache
+    controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
+    newLeaderAndIsr
+  }
+
+  private def registerTopicChangeListener() = {
+    zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, new TopicChangeListener())
+  }
+
+  def registerPartitionChangeListener(topic: String) = {
+    zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic))
+  }
+
+  /**
+   * This is the zookeeper listener that triggers all the state transitions for a partition
+   */
+  class TopicChangeListener extends IZkChildListener with Logging {
+    this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: "
+
+    @throws(classOf[Exception])
+    def handleChildChange(parentPath : String, children : java.util.List[String]) {
+      if(!isShuttingDown.get()) {
+        controllerContext.controllerLock synchronized {
+          try {
+            debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
+            val currentChildren = JavaConversions.asBuffer(children).toSet
+            val newTopics = currentChildren -- controllerContext.allTopics
+            val deletedTopics = controllerContext.allTopics -- currentChildren
+            //        val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1))
+            controllerContext.allTopics = currentChildren
+
+            val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
+            controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1._1))
+            controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
+            info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
+              deletedTopics, addedPartitionReplicaAssignment))
+            if(newTopics.size > 0)
+              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSeq)
+          } catch {
+            case e => error("Error while handling new topic", e )
+          }
+          // TODO: kafka-330  Handle deleted topics
+          // handleDeletedTopics(deletedTopics, deletedPartitionReplicaAssignment)
+        }
+      }
+    }
+  }
+
+  class PartitionChangeListener(topic: String) extends IZkChildListener with Logging {
+    this.logIdent = "[Controller " + controller.config.brokerId + "], "
+
+    @throws(classOf[Exception])
+    def handleChildChange(parentPath : String, children : java.util.List[String]) {
+      controllerContext.controllerLock synchronized {
+        // TODO: To be completed as part of KAFKA-41
+      }
+    }
+  }
+}
+
+sealed trait PartitionState { def state: Byte }
+case object NewPartition extends PartitionState { val state: Byte = 0 }
+case object OnlinePartition extends PartitionState { val state: Byte = 1 }
+case object OfflinePartition extends PartitionState { val state: Byte = 2 }
+case object NonExistentPartition extends PartitionState { val state: Byte = 3 }
+
+
Index: core/src/main/scala/kafka/controller/KafkaController.scala
===================================================================
--- core/src/main/scala/kafka/controller/KafkaController.scala	(revision 0)
+++ core/src/main/scala/kafka/controller/KafkaController.scala	(revision 0)
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import collection._
+import collection.immutable.Set
+import kafka.cluster.Broker
+import kafka.api._
+import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+import kafka.utils.{ZkUtils, Logging}
+import java.lang.Object
+import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
+import java.util.concurrent.TimeUnit
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import com.yammer.metrics.core.Gauge
+
+class ControllerContext(val zkClient: ZkClient,
+                        var controllerChannelManager: ControllerChannelManager = null,
+                        val controllerLock: Object = new Object,
+                        var liveBrokers: Set[Broker] = null,
+                        var liveBrokerIds: Set[Int] = null,
+                        var allTopics: Set[String] = null,
+                        var partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null,
+                        var allLeaders: mutable.Map[(String, Int), Int] = null)
+
+class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup {
+  this.logIdent = "[Controller " + config.brokerId + "], "
+  private var isRunning = true
+  val controllerContext = new ControllerContext(zkClient)
+  private val partitionStateMachine = new PartitionStateMachine(this)
+  private val replicaStateMachine = new ReplicaStateMachine(this)
+  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
+    config.brokerId)
+
+  newGauge(
+    "ActiveControllerCount",
+    new Gauge[Int] {
+      def value() = if (isActive) 1 else 0
+    }
+  )
+
+  /**
+   * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
+   * It does the following things on the become-controller state change -
+   * 1. Initializes the controller's context object that holds cache objects for current topics, live brokers and
+   *    leaders for all existing partitions.
+   * 2. Starts the controller's channel manager
+   * 3. Starts the replica state machine
+   * 4. Starts the partition state machine
+   */
+  def onControllerFailover() {
+    if(isRunning) {
+      info("Broker %d starting become controller state transition".format(config.brokerId))
+      // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
+      partitionStateMachine.registerListeners()
+      replicaStateMachine.registerListeners()
+      initializeControllerContext()
+      partitionStateMachine.startup()
+      replicaStateMachine.startup()
+      info("Broker %d is ready to serve as the new controller".format(config.brokerId))
+    }else
+      info("Controller has been shut down, aborting startup/failover")
+  }
+
+  /**
+   * Returns true if this broker is the current controller.
+   */
+  def isActive(): Boolean = {
+    controllerContext.controllerChannelManager != null
+  }
+
+  /**
+   * This callback is invoked by the replica state machine's broker change listener, with the list of newly started
+   * brokers as input. It does the following -
+   * 1. Updates the leader and ISR cache. We have to do this since we don't register zookeeper listeners to update
+   *    leader and ISR for every partition as they take place
+   * 2. Triggers the OnlinePartition state change for all new/offline partitions
+   * 3. Invokes the OnlineReplica state change on the input list of newly started brokers
+   */
+  def onBrokerStartup(newBrokers: Seq[Int]) {
+    info("New broker startup callback for %s".format(newBrokers.mkString(",")))
+    // update leader and isr cache for broker
+    updateLeaderAndIsrCache()
+    // update partition state machine
+    partitionStateMachine.triggerOnlinePartitionStateChange()
+    replicaStateMachine.handleStateChanges(newBrokers, OnlineReplica)
+  }
+
+  /**
+   * This callback is invoked by the replica state machine's broker change listener with the list of failed brokers
+   * as input. It does the following -
+   * 1. Updates the leader and ISR cache. We have to do this since we don't register zookeeper listeners to update
+   *    leader and ISR for every partition as they take place
+   * 2. Mark partitions with dead leaders offline
+   * 3. Triggers the OnlinePartition state change for all new/offline partitions
+   * 4. Invokes the OfflineReplica state change on the input list of newly started brokers
+   */
+  def onBrokerFailure(deadBrokers: Seq[Int]) {
+    info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
+    // update leader and isr cache for broker
+    updateLeaderAndIsrCache()
+    // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
+    val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader =>
+      deadBrokers.contains(partitionAndLeader._2)).map(_._1).toSeq
+    partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
+    // trigger OnlinePartition state changes for offline or new partitions
+    partitionStateMachine.triggerOnlinePartitionStateChange()
+    // handle dead replicas
+    replicaStateMachine.handleStateChanges(deadBrokers, OfflineReplica)
+  }
+
+  /**
+   * This callback is invoked by the partition state machine's topic change listener with the list of failed brokers
+   * as input. It does the following -
+   * 1. Registers partition change listener. This is not required until KAFKA-347
+   * 2. Invokes the new partition callback
+   */
+  def onNewTopicCreation(topics: Set[String], newPartitions: Seq[(String, Int)]) {
+    info("New topic creation callback for %s".format(newPartitions.mkString(",")))
+    // subscribe to partition changes
+    topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
+    onNewPartitionCreation(newPartitions)
+  }
+
+  /**
+   * This callback is invoked by the topic change callback with the list of failed brokers as input.
+   * It does the following -
+   * 1. Move the newly created partitions to the NewPartition state
+   * 2. Move the newly created partitions from NewPartition->OnlinePartition state
+   */
+  def onNewPartitionCreation(newPartitions: Seq[(String, Int)]) {
+    info("New partition creation callback for %s".format(newPartitions.mkString(",")))
+    partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
+    partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition)
+  }
+
+  /* TODO: kafka-330  This API is unused until we introduce the delete topic functionality.
+  remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
+  def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
+    val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
+    for((topicPartition, brokers) <- replicaAssignment){
+      for (broker <- brokers){
+        if (!brokerToPartitionToStopReplicaMap.contains(broker))
+          brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
+        brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
+      }
+      controllerContext.allLeaders.remove(topicPartition)
+      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2))
+    }
+    for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
+      val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
+      info("Handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
+      sendRequest(broker, stopReplicaRequest)
+    }
+  }
+
+  /**
+   * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
+   * is the controller. It merely registers the session expiration listener and starts the controller leader
+   * elector
+   */
+  def startup() = {
+    controllerContext.controllerLock synchronized {
+      info("Controller starting up");
+      registerSessionExpirationListener()
+      isRunning = true
+      controllerElector.startup
+      info("Controller startup complete")
+    }
+  }
+
+  /**
+   * Invoked when the controller module of a Kafka server is shutting down. If the broker was the current controller,
+   * it shuts down the partition and replica state machines. If not, those are a no-op. In addition to that, it also
+   * shuts down the controller channel manager, if one exists (i.e. if it was the current controller)
+   */
+  def shutdown() = {
+    controllerContext.controllerLock synchronized {
+      isRunning = false
+      partitionStateMachine.shutdown()
+      replicaStateMachine.shutdown()
+      if(controllerContext.controllerChannelManager != null) {
+        controllerContext.controllerChannelManager.shutdown()
+        controllerContext.controllerChannelManager = null
+        info("Controller shutdown complete")
+      }
+    }
+  }
+
+  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
+    controllerContext.controllerChannelManager.sendRequest(brokerId, request, callback)
+  }
+
+  private def registerSessionExpirationListener() = {
+    zkClient.subscribeStateChanges(new SessionExpireListener())
+  }
+
+  private def initializeControllerContext() {
+    controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
+    controllerContext.liveBrokerIds = controllerContext.liveBrokers.map(_.id)
+    controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
+    controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,
+      controllerContext.allTopics.toSeq)
+    controllerContext.allLeaders = new mutable.HashMap[(String, Int), Int]
+    // update the leader and isr cache for all existing partitions from Zookeeper
+    updateLeaderAndIsrCache()
+    // start the channel manager
+    startChannelManager()
+    info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
+    info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
+  }
+
+  private def startChannelManager() {
+    controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext.liveBrokers, config)
+    controllerContext.controllerChannelManager.startup()
+  }
+
+  private def updateLeaderAndIsrCache() {
+    val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.allTopics.toSeq)
+    for((topicPartition, leaderAndIsr) <- leaderAndIsrInfo) {
+      // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it
+      controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
+        case true =>
+          controllerContext.allLeaders.put(topicPartition, leaderAndIsr.leader)
+        case false =>
+          debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderAndIsr.leader) +
+            "partition [%s, %d] is dead, just ignore it".format(topicPartition._1, topicPartition._2))
+      }
+    }
+  }
+
+  class SessionExpireListener() extends IZkStateListener with Logging {
+    this.logIdent = "[Controller " + config.brokerId + "], "
+    @throws(classOf[Exception])
+    def handleStateChanged(state: KeeperState) {
+      // do nothing, since zkclient will do reconnect for us.
+    }
+
+    /**
+     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
+     * any ephemeral nodes here.
+     *
+     * @throws Exception
+     *             On any error.
+     */
+    @throws(classOf[Exception])
+    def handleNewSession() {
+      controllerContext.controllerLock synchronized {
+        partitionStateMachine.shutdown()
+        replicaStateMachine.shutdown()
+        if(controllerContext.controllerChannelManager != null) {
+          info("session expires, clean up the state")
+          controllerContext.controllerChannelManager.shutdown()
+          controllerContext.controllerChannelManager = null
+        }
+        controllerElector.elect
+      }
+    }
+  }
+}
+
+object ControllerStat extends KafkaMetricsGroup {
+  val offlinePartitionRate = newMeter("OfflinePartitionsPerSec",  "partitions", TimeUnit.SECONDS)
+  val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",  "elections", TimeUnit.SECONDS)
+  val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+}
Index: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
===================================================================
--- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(revision 0)
+++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(revision 0)
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.controller
+
+import collection._
+import kafka.utils.{ZkUtils, Logging}
+import collection.JavaConversions._
+import kafka.api.LeaderAndIsr
+import kafka.common.StateChangeFailedException
+import java.util.concurrent.atomic.AtomicBoolean
+import org.I0Itec.zkclient.{IZkChildListener}
+
+/**
+ * This class represents the state machine for replicas. It defines the states that a replica can be in, and
+ * transitions to move the replica to another legal state. The different states that a replica can be in are -
+ * 1. OnlineReplica     : Once a replica is started, it is in this state. Valid previous state are OnlineReplica or
+ *                        OfflineReplica
+ * 2. OfflineReplica    : If a replica dies, it moves to this state. Valid previous state is OnlineReplica
+ */
+class ReplicaStateMachine(controller: KafkaController) extends Logging {
+  this.logIdent = "[Replica state machine on Controller " + controller.config.brokerId + "]: "
+  private val controllerContext = controller.controllerContext
+  private val zkClient = controllerContext.zkClient
+  var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
+  private var isShuttingDown = new AtomicBoolean(false)
+
+  /**
+   * Invoked on successful controller election. First registers a broker change listener since that triggers all
+   * state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.
+   * Then triggers the OnlineReplica state change for all replicas.
+   */
+  def startup() {
+    isShuttingDown.set(false)
+    // initialize replica state
+    initializeReplicaState()
+    // move all Online replicas to Online
+    handleStateChanges(controllerContext.liveBrokerIds.toSeq, OnlineReplica)
+    info("Started replica state machine with initial state -> " + replicaState.toString())
+  }
+
+  // register broker change listener
+  def registerListeners() {
+    registerBrokerChangeListener()
+  }
+
+  /**
+   * Invoked on controller shutdown.
+   */
+  def shutdown() {
+    isShuttingDown.compareAndSet(false, true)
+    replicaState.clear()
+  }
+
+  /**
+   * This API is invoked by the broker change controller callbacks and the startup API of the state machine
+   * @param brokerIds    The list of brokers that need to be transitioned to the target state
+   * @param targetState  The state that the replicas should be moved to
+   * The controller's allLeaders cache should have been updated before this
+   */
+  def handleStateChanges(brokerIds: Seq[Int], targetState: ReplicaState) {
+    info("Invoking state change to %s for brokers %s".format(targetState, brokerIds.mkString(",")))
+    try {
+      brokerRequestBatch.newBatch()
+      brokerIds.foreach { brokerId =>
+        // read all the partitions and their assigned replicas into a map organized by
+        // { replica id -> partition 1, partition 2...
+        val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(controllerContext.allTopics.toSeq, brokerId)
+        partitionsAssignedToThisBroker.foreach { topicAndPartition =>
+          handleStateChange(topicAndPartition._1, topicAndPartition._2, brokerId, targetState)
+        }
+        if(partitionsAssignedToThisBroker.size == 0)
+          info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
+      }
+      brokerRequestBatch.sendRequestsToBrokers(true)
+    }catch {
+      case e => error("Error while moving some replicas to %s state".format(targetState), e)
+    }
+  }
+
+  /**
+   * This API exercises the replica's state machine. It ensures that every state transition happens from a legal
+   * previous state to the target state.
+   * @param topic       The topic of the replica for which the state transition is invoked
+   * @param partition   The partition of the replica for which the state transition is invoked
+   * @param replicaId   The replica for which the state transition is invoked
+   * @param targetState The end state that the replica should be moved to
+   */
+  private def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) {
+    try {
+      targetState match {
+        case OnlineReplica =>
+          assertValidPreviousStates(topic, partition, replicaId, List(OnlineReplica, OfflineReplica), targetState)
+          // check if the leader for this partition is alive or even exists
+          // NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper
+          // for the ISR anyways
+          val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
+          leaderAndIsrOpt match {
+            case Some(leaderAndIsr) =>
+              controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
+                case true => // leader is alive
+                  brokerRequestBatch.addRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr)
+                  replicaState.put((topic, partition, replicaId), OnlineReplica)
+                  info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
+                case false => // ignore partitions whose leader is not alive
+              }
+            case None => // ignore partitions who don't have a leader yet
+          }
+        case OfflineReplica =>
+          assertValidPreviousStates(topic, partition, replicaId, List(OnlineReplica), targetState)
+          // As an optimization, the controller removes dead replicas from the ISR
+          var zookeeperPathUpdateSucceeded: Boolean = false
+          var newLeaderAndIsr: LeaderAndIsr = null
+          while(!zookeeperPathUpdateSucceeded) {
+            // refresh leader and isr from zookeeper again
+            val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
+            leaderAndIsrOpt match {
+              case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes
+                newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
+                  leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
+                info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
+                // update the new leadership decision in zookeeper or retry
+                val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+                  ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString,
+                  leaderAndIsr.zkVersion)
+                newLeaderAndIsr.zkVersion = newVersion
+                zookeeperPathUpdateSucceeded = updateSucceeded
+              case None => throw new StateChangeFailedException("Failed to change state of replica %d".format(replicaId) +
+                " for partition [%s, %d] since the leader and isr path in zookeeper is empty".format(topic, partition))
+            }
+          }
+          // send the shrunk ISR state change request only to the leader
+          brokerRequestBatch.addRequestForBrokers(List(newLeaderAndIsr.leader), topic, partition, newLeaderAndIsr)
+          // update the local leader and isr cache
+          controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
+          replicaState.put((topic, partition, replicaId), OfflineReplica)
+          info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition))
+          info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition))
+      }
+    }catch {
+      case e => error("Error while changing state of replica %d for partition ".format(replicaId) +
+        "[%s, %d] to %s".format(topic, partition, targetState), e)
+    }
+  }
+
+  private def assertValidPreviousStates(topic: String, partition: Int, replicaId: Int, fromStates: Seq[ReplicaState],
+                                        targetState: ReplicaState) {
+    assert(fromStates.contains(replicaState((topic, partition, replicaId))),
+      "Replica %s for partition [%s, %d] should be in the %s states before moving to %s state"
+        .format(replicaId, topic, partition, fromStates.mkString(","), targetState) +
+        ". Instead it is in %s state".format(replicaState((topic, partition, replicaId))))
+  }
+
+  private def registerBrokerChangeListener() = {
+    zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, new BrokerChangeListener())
+  }
+
+  /**
+   * Invoked on startup of the replica's state machine to set the initial state for replicas of all existing partitions
+   * in zookeeper
+   */
+  private def initializeReplicaState() {
+    for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
+      val topic = topicPartition._1
+      val partition = topicPartition._2
+      assignedReplicas.foreach { replicaId =>
+        controllerContext.liveBrokerIds.contains(replicaId) match {
+          case true => replicaState.put((topic, partition, replicaId), OnlineReplica)
+          case false => replicaState.put((topic, partition, replicaId), OfflineReplica)
+        }
+      }
+    }
+  }
+
+  def getPartitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[(String, Int)] = {
+    controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq
+  }
+
+  /**
+   * This is the zookeeper listener that triggers all the state transitions for a replica
+   */
+  class BrokerChangeListener() extends IZkChildListener with Logging {
+    this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
+    def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
+      ControllerStat.leaderElectionTimer.time {
+        info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
+        if(!isShuttingDown.get()) {
+          controllerContext.controllerLock synchronized {
+            try {
+              val curBrokerIds = currentBrokerList.map(_.toInt).toSet
+              val newBrokerIds = curBrokerIds -- controllerContext.liveBrokerIds
+              val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+              val deadBrokerIds = controllerContext.liveBrokerIds -- curBrokerIds
+              controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+              controllerContext.liveBrokerIds = controllerContext.liveBrokers.map(_.id)
+              info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s"
+                .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
+              newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
+              deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
+              if(newBrokerIds.size > 0)
+                controller.onBrokerStartup(newBrokerIds.toSeq)
+              if(deadBrokerIds.size > 0)
+                controller.onBrokerFailure(deadBrokerIds.toSeq)
+            } catch {
+              case e => error("Error while handling broker changes", e)
+            }
+          }
+        }
+      }
+    }
+  }
+}
+
+sealed trait ReplicaState { def state: Byte }
+case object OnlineReplica extends ReplicaState { val state: Byte = 1 }
+case object OfflineReplica extends ReplicaState { val state: Byte = 2 }
+
+
Index: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
===================================================================
--- core/src/main/scala/kafka/controller/ControllerChannelManager.scala	(revision 0)
+++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala	(revision 0)
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.controller
+
+import kafka.network.{Receive, BlockingChannel}
+import kafka.utils.{Logging, ShutdownableThread}
+import collection.mutable.HashMap
+import kafka.cluster.Broker
+import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
+import kafka.server.KafkaConfig
+import collection.mutable
+import kafka.api._
+
+class ControllerChannelManager private (config: KafkaConfig) extends Logging {
+  private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
+  private val brokerLock = new Object
+  this.logIdent = "[Channel manager on controller " + config.brokerId + "], "
+
+  def this(allBrokers: Set[Broker], config : KafkaConfig) {
+    this(config)
+    allBrokers.foreach(addNewBroker(_))
+  }
+
+  def startup() = {
+    brokerLock synchronized {
+      brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
+    }
+  }
+
+  def shutdown() = {
+    brokerLock synchronized {
+      brokerStateInfo.foreach(brokerState => removeExistingBroker(brokerState._1))
+    }
+  }
+
+  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) {
+    brokerLock synchronized {
+      brokerStateInfo(brokerId).messageQueue.put((request, callback))
+    }
+  }
+
+  def addBroker(broker: Broker) {
+    // be careful here. Maybe the startup() API has already started the request send thread
+    brokerLock synchronized {
+      if(!brokerStateInfo.contains(broker.id)) {
+        addNewBroker(broker)
+        startRequestSendThread(broker.id)
+      }
+    }
+  }
+
+  def removeBroker(brokerId: Int) {
+    brokerLock synchronized {
+      removeExistingBroker(brokerId)
+    }
+  }
+
+  private def addNewBroker(broker: Broker) {
+    val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)
+    val channel = new BlockingChannel(broker.host, broker.port,
+      BlockingChannel.UseDefaultBufferSize,
+      BlockingChannel.UseDefaultBufferSize,
+      config.controllerSocketTimeoutMs)
+    channel.connect()
+    val requestThread = new RequestSendThread(config.brokerId, broker.id, messageQueue, channel)
+    requestThread.setDaemon(false)
+    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread))
+  }
+
+  private def removeExistingBroker(brokerId: Int) {
+    try {
+      brokerStateInfo(brokerId).channel.disconnect()
+      brokerStateInfo(brokerId).requestSendThread.shutdown()
+      brokerStateInfo.remove(brokerId)
+    }catch {
+      case e => error("Error while removing broker by the controller", e)
+    }
+  }
+
+  private def startRequestSendThread(brokerId: Int) {
+    val requestThread = brokerStateInfo(brokerId).requestSendThread
+    if(requestThread.getState == Thread.State.NEW)
+      requestThread.start()
+  }
+}
+
+class RequestSendThread(val controllerId: Int,
+                        val toBrokerId: Int,
+                        val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
+                        val channel: BlockingChannel)
+  extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) {
+  private val lock = new Object()
+
+  override def doWork(): Unit = {
+    val queueItem = queue.take()
+    val request = queueItem._1
+    val callback = queueItem._2
+
+    var receive: Receive = null
+
+    try{
+      lock synchronized {
+        channel.send(request)
+        receive = channel.receive()
+        var response: RequestOrResponse = null
+        request.requestId.get match {
+          case RequestKeys.LeaderAndIsrKey =>
+            response = LeaderAndISRResponse.readFrom(receive.buffer)
+          case RequestKeys.StopReplicaKey =>
+            response = StopReplicaResponse.readFrom(receive.buffer)
+        }
+        trace("got a response %s".format(controllerId, response, toBrokerId))
+
+        if(callback != null){
+          callback(response)
+        }
+      }
+    } catch {
+      case e =>
+        // log it and let it go. Let controller shut it down.
+        debug("Exception occurs", e)
+    }
+  }
+}
+
+// TODO: When we add more types of requests, we can generalize this class a bit. Right now, it just handles LeaderAndIsr
+// request
+class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit)
+  extends  Logging {
+  val brokerRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
+
+  def newBatch() {
+    // raise error if the previous batch is not empty
+    if(brokerRequestMap.size > 0)
+      throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
+        "a new one. Some state changes %s might be lost ".format(brokerRequestMap.toString()))
+    brokerRequestMap.clear()
+  }
+
+  def addRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr) {
+    brokerIds.foreach { brokerId =>
+      brokerRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), LeaderAndIsr])
+      brokerRequestMap(brokerId).put((topic, partition), leaderAndIsr)
+    }
+  }
+
+  def sendRequestsToBrokers(isInit: Boolean = false) {
+    brokerRequestMap.foreach { m =>
+      val broker = m._1
+      val leaderAndIsr = m._2
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr)
+      debug(("The leaderAndIsr request sent to broker %d is %s").format(broker, leaderAndIsrRequest))
+      sendRequest(broker, leaderAndIsrRequest, null)
+    }
+    brokerRequestMap.clear()
+  }
+}
+
+case class ControllerBrokerStateInfo(channel: BlockingChannel,
+                                     broker: Broker,
+                                     messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
+                                     requestSendThread: RequestSendThread)
+
Index: core/src/main/scala/kafka/common/StateChangeFailedException.scala
===================================================================
--- core/src/main/scala/kafka/common/StateChangeFailedException.scala	(revision 0)
+++ core/src/main/scala/kafka/common/StateChangeFailedException.scala	(revision 0)
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class StateChangeFailedException(message: String) extends RuntimeException(message) {
+  def this(message: String, cause: Throwable) = this(message + " Root cause -> " + cause.toString)
+  def this() = this(null)
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/admin/CreateTopicCommand.scala
===================================================================
--- core/src/main/scala/kafka/admin/CreateTopicCommand.scala	(revision 1386941)
+++ core/src/main/scala/kafka/admin/CreateTopicCommand.scala	(working copy)
@@ -99,7 +99,6 @@
       AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
     else
       getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
-
     debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment))
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient)
   }
Index: core/src/main/scala/kafka/admin/AdminUtils.scala
===================================================================
--- core/src/main/scala/kafka/admin/AdminUtils.scala	(revision 1386941)
+++ core/src/main/scala/kafka/admin/AdminUtils.scala	(working copy)
@@ -89,8 +89,8 @@
   def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[TopicMetadata] = {
     val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
     topics.map { topic =>
-      if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
-        val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic).iterator).get(topic).get
+      if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
+        val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
         val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
 
         val partitionMetadata = sortedPartitions.map { partitionMap =>
Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(revision 1386941)
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(working copy)
@@ -410,7 +410,7 @@
     private def rebalance(cluster: Cluster): Boolean = {
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
-      val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
+      val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq)
 
       /**
        * fetchers must be stopped to avoid data duplication, since if the current
Index: core/src/main/scala/kafka/utils/ZkUtils.scala
===================================================================
--- core/src/main/scala/kafka/utils/ZkUtils.scala	(revision 1386941)
+++ core/src/main/scala/kafka/utils/ZkUtils.scala	(working copy)
@@ -419,7 +419,7 @@
     ret
   }
 
-  def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Iterator[String]):
+  def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Seq[String]):
   mutable.Map[(String, Int), LeaderAndIsr] = {
     val ret = new mutable.HashMap[(String, Int), LeaderAndIsr]
     val partitionsForTopics = getPartitionsForTopics(zkClient, topics)
@@ -434,7 +434,28 @@
     ret
   }
 
-  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]):
+  def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[(String, Int), Seq[Int]] = {
+    val ret = new mutable.HashMap[(String, Int), Seq[Int]]
+    topics.foreach { topic =>
+      val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+      jsonPartitionMapOpt match {
+        case Some(jsonPartitionMap) =>
+          SyncJSON.parseFull(jsonPartitionMap) match {
+            case Some(m) =>
+              val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
+              for((partition, replicas) <- replicaMap){
+                ret.put((topic, partition.toInt), replicas.map(_.toInt))
+                debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
+              }
+            case None =>
+          }
+        case None =>
+      }
+    }
+    ret
+  }
+
+  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]):
   mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
     val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
     topics.foreach{ topic =>
@@ -466,7 +487,7 @@
     ret
   }
 
-  def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[Int]] = {
+  def getPartitionsForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
     getPartitionAssignmentForTopics(zkClient, topics).map
     { topicAndPartitionMap =>
       val topic = topicAndPartitionMap._1
@@ -477,20 +498,17 @@
   }
 
   def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int):
-  Map[(String, Int), Seq[Int]] = {
-    val ret = new mutable.HashMap[(String, Int), Seq[Int]]
-    val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics.iterator)
-    topicsAndPartitions.map
-    {
-      topicAndPartitionMap =>
-        val topic = topicAndPartitionMap._1
-        val partitionMap = topicAndPartitionMap._2
-        val relevantPartitionsMap = partitionMap.filter( m => m._2.contains(brokerId) )
-        for((relevantPartition, replicaAssignment) <- relevantPartitionsMap){
-          ret.put((topic, relevantPartition), replicaAssignment)
-        }
-    }
-    ret
+    Seq[(String, Int)] = {
+    val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics)
+    topicsAndPartitions.map { topicAndPartitionMap =>
+      val topic = topicAndPartitionMap._1
+      val partitionMap = topicAndPartitionMap._2
+      val relevantPartitionsMap = partitionMap.filter( m => m._2.contains(brokerId) )
+      val relevantPartitions = relevantPartitionsMap.map(_._1)
+      for(relevantPartition <- relevantPartitions) yield {
+        (topic, relevantPartition)
+      }
+    }.flatten[(String, Int)].toSeq
   }
 
   def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
Index: core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
===================================================================
--- core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala	(revision 1386941)
+++ core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala	(working copy)
@@ -44,7 +44,7 @@
 
   private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = {
     val cluster = ZkUtils.getCluster(zkClient)
-    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic).iterator)
+    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic))
     var partitions: Seq[Int] = Nil
 
     partitionsPerTopicMap.get(topic) match {
Index: core/src/main/scala/kafka/utils/ShutdownableThread.scala
===================================================================
--- core/src/main/scala/kafka/utils/ShutdownableThread.scala	(revision 1386941)
+++ core/src/main/scala/kafka/utils/ShutdownableThread.scala	(working copy)
@@ -33,7 +33,7 @@
     isRunning.set(false)
     interrupt()
     shutdownLatch.await()
-    info("Shutted down completed")
+    info("Shutdown completed")
   }
 
     /**
Index: core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
===================================================================
--- core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala	(revision 1386941)
+++ core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala	(working copy)
@@ -79,7 +79,7 @@
      * under /consumers/[consumer_group]/owners/[topic]/[broker_id-partition_id]
      */
     val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group)
-    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keys.iterator)
+    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keySet.toSeq)
 
     partitionsPerTopicMap.foreach { partitionsForTopic =>
       val topic = partitionsForTopic._1
Index: core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
===================================================================
--- core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala	(revision 0)
+++ core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala	(revision 0)
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.ZkUtils._
+import kafka.utils.Logging
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import org.I0Itec.zkclient.{IZkDataListener}
+import kafka.controller.ControllerContext
+
+/**
+ * This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle
+ * session expiration, instead it assumes the caller will handle it by probably try to re-elect again. If the existing
+ * leader is dead, this class will handle automatic re-election and if it succeeds, it invokes the leader state change
+ * callback
+ */
+class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: String, onBecomingLeader: () => Unit,
+                             brokerId: Int)
+  extends LeaderElector with Logging {
+  var leaderId = -1
+  // create the election path in ZK, if one does not exist
+  val index = electionPath.lastIndexOf("/")
+  if (index > 0)
+    makeSurePersistentPathExists(controllerContext.zkClient, electionPath.substring(0, index))
+  val leaderChangeListener = new LeaderChangeListener
+
+  def startup {
+    controllerContext.controllerLock synchronized {
+      elect
+    }
+  }
+
+  def amILeader : Boolean = leaderId == brokerId
+
+  def elect: Boolean = {
+    controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
+    try {
+      createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, brokerId.toString)
+      info(brokerId + " successfully elected as leader")
+      leaderId = brokerId
+      onBecomingLeader()
+    } catch {
+      case e: ZkNodeExistsException =>
+        // If someone else has written the path, then
+        debug("Someone else was elected as leader other than " + brokerId)
+        val data: String = controllerContext.zkClient.readData(electionPath, true)
+        if (data != null) leaderId = data.toInt
+      case e2 => throw e2
+    }
+    amILeader
+  }
+
+  def close = {
+    leaderId = -1
+  }
+
+  /**
+   * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will
+   * have its own session expiration listener and handler
+   */
+  class LeaderChangeListener extends IZkDataListener with Logging {
+    /**
+     * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
+     * @throws Exception On any error.
+     */
+    @throws(classOf[Exception])
+    def handleDataChange(dataPath: String, data: Object) {
+      controllerContext.controllerLock synchronized {
+        debug("%s leader change listener fired for path %s to handle data changed: record the new leader %s in memory"
+          .format(brokerId, dataPath, data))
+        val oldLeader = leaderId
+        leaderId = data.toString.toInt
+        if (leaderId == brokerId) {
+          debug("Now " + brokerId + " is the new leader")
+        }
+      }
+    }
+
+    /**
+     * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
+     * @throws Exception
+     *             On any error.
+     */
+    @throws(classOf[Exception])
+    def handleDataDeleted(dataPath: String) {
+      controllerContext.controllerLock synchronized {
+        debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
+          .format(brokerId, dataPath))
+        elect
+      }
+    }
+  }
+}
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1386941)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -24,6 +24,7 @@
 import java.util.concurrent._
 import atomic.AtomicBoolean
 import org.I0Itec.zkclient.ZkClient
+import kafka.controller.KafkaController
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
Index: core/src/main/scala/kafka/server/ReplicaManager.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaManager.scala	(revision 1386941)
+++ core/src/main/scala/kafka/server/ReplicaManager.scala	(working copy)
@@ -163,13 +163,14 @@
     /**
      *  If IsInit flag is on, this means that the controller wants to treat topics not in the request
      *  as deleted.
+     *  TODO: Handle this properly as part of KAFKA-330
      */
-    if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
-      startHighWaterMarksCheckPointThread
-      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1)
-      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
-      partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
-    }
+//    if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
+//      startHighWaterMarksCheckPointThread
+//      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1)
+//      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
+//      partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
+//    }
 
     responseMap
   }
Index: core/src/main/scala/kafka/server/LeaderElector.scala
===================================================================
--- core/src/main/scala/kafka/server/LeaderElector.scala	(revision 0)
+++ core/src/main/scala/kafka/server/LeaderElector.scala	(revision 0)
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.Logging
+
+/**
+ * This trait defines a leader elector If the existing leader is dead, this class will handle automatic
+ * re-election and if it succeeds, it invokes the leader state change callback
+ */
+trait LeaderElector extends Logging {
+  def startup
+
+  def amILeader : Boolean
+
+//  def electAndBecomeLeader: Unit
+//
+  def elect: Boolean
+
+  def close
+}
Index: core/src/main/scala/kafka/server/KafkaController.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaController.scala	(revision 1386941)
+++ core/src/main/scala/kafka/server/KafkaController.scala	(working copy)
@@ -1,615 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import collection.mutable.HashMap
-import collection._
-import collection.immutable.Set
-import kafka.cluster.Broker
-import kafka.api._
-import kafka.network.{Receive, BlockingChannel}
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import org.I0Itec.zkclient.{IZkStateListener, ZkClient, IZkDataListener, IZkChildListener}
-import org.apache.zookeeper.Watcher.Event.KeeperState
-import collection.JavaConversions._
-import kafka.utils.{ShutdownableThread, ZkUtils, Logging}
-import java.lang.Object
-import com.yammer.metrics.core.Gauge
-import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue}
-import kafka.common.{PartitionOfflineException, KafkaException}
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
-
-
-class RequestSendThread(val controllerId: Int,
-                        val toBrokerId: Int,
-                        val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
-                        val channel: BlockingChannel)
-  extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) {
-  private val lock = new Object()
-
-  override def doWork(): Unit = {
-    val queueItem = queue.take()
-    val request = queueItem._1
-    val callback = queueItem._2
-
-    var receive: Receive = null
-
-    try{
-      lock synchronized {
-        channel.send(request)
-        receive = channel.receive()
-        var response: RequestOrResponse = null
-        request.requestId.get match {
-          case RequestKeys.LeaderAndIsrKey =>
-            response = LeaderAndISRResponse.readFrom(receive.buffer)
-          case RequestKeys.StopReplicaKey =>
-            response = StopReplicaResponse.readFrom(receive.buffer)
-        }
-        trace("got a response %s".format(controllerId, response, toBrokerId))
-
-        if(callback != null){
-          callback(response)
-        }
-      }
-    } catch {
-      case e =>
-        // log it and let it go. Let controller shut it down.
-        debug("Exception occurs", e)
-    }
-  }
-}
-
-class ControllerChannelManager private (config: KafkaConfig) extends Logging {
-  private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
-  private val brokerLock = new Object
-  this.logIdent = "[Channel manager on controller " + config.brokerId + "], "
-
-  def this(allBrokers: Set[Broker], config : KafkaConfig) {
-    this(config)
-    allBrokers.foreach(addNewBroker(_))
-  }
-
-  def startup() = {
-    brokerLock synchronized {
-      brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
-    }
-  }
-
-  def shutdown() = {
-    brokerLock synchronized {
-      brokerStateInfo.foreach(brokerState => removeExistingBroker(brokerState._1))
-    }
-  }
-
-  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) {
-    brokerLock synchronized {
-      brokerStateInfo(brokerId).messageQueue.put((request, callback))
-    }
-  }
-
-  def addBroker(broker: Broker) {
-    brokerLock synchronized {
-      addNewBroker(broker)
-      startRequestSendThread(broker.id)
-    }
-  }
-
-  def removeBroker(brokerId: Int) {
-    brokerLock synchronized {
-      removeExistingBroker(brokerId)
-    }
-  }
-
-  private def addNewBroker(broker: Broker) {
-    val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)
-    val channel = new BlockingChannel(broker.host, broker.port,
-      BlockingChannel.UseDefaultBufferSize,
-      BlockingChannel.UseDefaultBufferSize,
-      config.controllerSocketTimeoutMs)
-    channel.connect()
-    val requestThread = new RequestSendThread(config.brokerId, broker.id, messageQueue, channel)
-    requestThread.setDaemon(false)
-    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread))
-  }
-
-  private def removeExistingBroker(brokerId: Int) {
-    try {
-      brokerStateInfo(brokerId).channel.disconnect()
-      brokerStateInfo(brokerId).requestSendThread.shutdown()
-      brokerStateInfo.remove(brokerId)
-    }catch {
-      case e => error("Error while removing broker by the controller", e)
-    }
-  }
-
-  private def startRequestSendThread(brokerId: Int) {
-    brokerStateInfo(brokerId).requestSendThread.start()
-  }
-}
-
-case class ControllerBrokerStateInfo(channel: BlockingChannel,
-                                     broker: Broker,
-                                     messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
-                                     requestSendThread: RequestSendThread)
-
-class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup{
-  this.logIdent = "[Controller " + config.brokerId + "], "
-  private var isRunning = true
-  private val controllerLock = new Object
-  private var controllerChannelManager: ControllerChannelManager = null
-  private var liveBrokers : Set[Broker] = null
-  private var liveBrokerIds : Set[Int] = null
-  private var allTopics: Set[String] = null
-  private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null
-  private var allLeaders: mutable.Map[(String, Int), Int] = null
-
-  newGauge(
-    "ActiveControllerCount",
-    new Gauge[Int] {
-      def value() = if (isActive) 1 else 0
-    }
-  )
-
-  // Return true if this controller succeeds in the controller leader election
-  private def tryToBecomeController(): Boolean = {
-    val controllerStatus =
-      try {
-        ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString)
-        // Only the broker elected as the new controller can execute following code, otherwise
-        // some exception will be thrown.
-        registerBrokerChangeListener()
-        registerTopicChangeListener()
-        liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
-        liveBrokerIds = liveBrokers.map(_.id)
-        info("Currently active brokers in the cluster: %s".format(liveBrokerIds))
-        allTopics = ZkUtils.getAllTopics(zkClient).toSet
-        info("Current list of topics in the cluster: %s".format(allTopics))
-        allPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics.iterator)
-        info("Partition replica assignment: %s".format(allPartitionReplicaAssignment))
-        allLeaders = new mutable.HashMap[(String, Int), Int]
-        controllerChannelManager = new ControllerChannelManager(liveBrokers, config)
-        controllerChannelManager.startup()
-        true
-      } catch {
-        case e: ZkNodeExistsException =>
-          registerControllerExistsListener()
-          false
-        case e2 => throw e2
-      }
-    controllerStatus
-  }
-
-  private def controllerRegisterOrFailover() {
-    if(isRunning) {
-      if(tryToBecomeController()) {
-        readAndSendLeaderAndIsrFromZookeeper(liveBrokerIds, allTopics)
-        onBrokerChange()
-        // If there are some partition with leader not initialized, init the leader for them
-        val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(m => !allLeaders.contains(m._1))
-        debug("work on init leaders: %s, current cache for all leader is: %s".format(partitionReplicaAssignment.toString(), allLeaders))
-        initLeaders(partitionReplicaAssignment)
-      }
-    }else
-      info("Controller has been shut down, aborting startup procedure")
-  }
-
-  def isActive(): Boolean = {
-    controllerChannelManager != null
-  }
-
-  def startup() = {
-    controllerLock synchronized {
-      info("Controller starting up");
-      registerSessionExpirationListener()
-      registerControllerExistsListener()
-      isRunning = true
-      controllerRegisterOrFailover()
-      info("Controller startup complete")
-    }
-  }
-
-  def shutdown() = {
-    controllerLock synchronized {
-      if(controllerChannelManager != null) {
-        info("Controller shutting down")
-        controllerChannelManager.shutdown()
-        controllerChannelManager = null
-        info("Controller shutdown complete")
-      }
-      isRunning = false
-    }
-  }
-
-  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
-    controllerChannelManager.sendRequest(brokerId, request, callback)
-  }
-
-  private def registerBrokerChangeListener() = {
-    zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, new BrokerChangeListener())
-  }
-
-  private def registerTopicChangeListener() = {
-    zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, new TopicChangeListener())
-  }
-
-  private def registerSessionExpirationListener() = {
-    zkClient.subscribeStateChanges(new SessionExpireListener())
-  }
-
-  private def registerControllerExistsListener(){
-    zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistsListener())
-  }
-
-  class SessionExpireListener() extends IZkStateListener with Logging {
-    this.logIdent = "[Controller " + config.brokerId + "], "
-    @throws(classOf[Exception])
-    def handleStateChanged(state: KeeperState) {
-      // do nothing, since zkclient will do reconnect for us.
-    }
-
-    /**
-     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
-     * any ephemeral nodes here.
-     *
-     * @throws Exception
-     *             On any error.
-     */
-    @throws(classOf[Exception])
-    def handleNewSession() {
-      controllerLock synchronized {
-        if(controllerChannelManager != null) {
-          info("session expires, clean up the state")
-          controllerChannelManager.shutdown()
-          controllerChannelManager = null
-        }
-        controllerRegisterOrFailover()
-      }
-    }
-  }
-
-  /**
-   * @param brokerIds The set of currently active brokers in the cluster, as known to the controller
-   * @param topics The set of topics known to the controller by reading from zookeeper
-   * This API reads the list of partitions that exist for all the topics in the specified list of input topics.
-   * For each of those partitions, it reads the assigned replica list so that it can send the appropriate leader and
-   * isr state change request to all the brokers in the assigned replica list. It arranges the leader and isr state
-   * change requests by broker id. At the end, it circles through this map, sending the required INIT state change requests
-   * to each broker. This API is called when -
-   * 1. A new broker starts up
-   * 2. A new controller is elected
-   */
-  private def readAndSendLeaderAndIsrFromZookeeper(brokerIds: Set[Int], topics: Set[String]) = {
-    val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, topics.iterator)
-    val brokerToLeaderAndIsrInfoMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
-    for((topicPartition, leaderAndIsr) <- leaderAndIsrInfo) {
-      // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it
-      liveBrokerIds.contains(leaderAndIsr.leader) match {
-        case true =>
-          val brokersAssignedToThisPartitionOpt = allPartitionReplicaAssignment.get(topicPartition)
-          brokersAssignedToThisPartitionOpt match {
-            case Some(brokersAssignedToThisPartition) =>
-              val relatedBrokersAssignedToThisPartition = brokersAssignedToThisPartitionOpt.get.filter(brokerIds.contains(_))
-              relatedBrokersAssignedToThisPartition.foreach(b => {
-                brokerToLeaderAndIsrInfoMap.getOrElseUpdate(b, new mutable.HashMap[(String, Int), LeaderAndIsr])
-                brokerToLeaderAndIsrInfoMap(b).put(topicPartition, leaderAndIsr)
-              })
-              allLeaders.put(topicPartition, leaderAndIsr.leader)
-            case None => warn(("While refreshing controller's leader and isr cache, no replica assignment was found " +
-              "for partition [%s, %d]. Rest of the partition replica assignment is %s").format(topicPartition._1,
-              topicPartition._2, allPartitionReplicaAssignment))
-          }
-        case false =>
-          debug("While refreshing controller's leader and isr cache, broker %d is not alive any more, just ignore it"
-            .format(leaderAndIsr.leader))
-      }
-    }
-    debug(("While refreshing controller's leader and isr cache, the state change requests for each broker is " +
-      "[%s]").format(brokerToLeaderAndIsrInfoMap.toString()))
-
-    brokerToLeaderAndIsrInfoMap.foreach(m =>{
-      val broker = m._1
-      val leaderAndIsrs = m._2
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(LeaderAndIsrRequest.IsInit, leaderAndIsrs)
-      info("After refreshing controller's leader and isr cache, the leader and ISR change state change request sent to" +
-        " new broker [%s] is [%s]".format(broker, leaderAndIsrRequest.toString))
-      sendRequest(broker, leaderAndIsrRequest)
-    })
-    info("After refreshing controller's leader and isr cache for brokers %s, the leaders assignment is %s"
-      .format(brokerIds, allLeaders))
-  }
-
-  private def initLeaders(partitionReplicaAssignment: collection.mutable.Map[(String, Int), Seq[Int]]) {
-    val brokerToLeaderAndISRInfoMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int),LeaderAndIsr]]
-    for((topicPartition, replicaAssignment) <- partitionReplicaAssignment) {
-      val liveAssignedReplicas = replicaAssignment.filter(r => liveBrokerIds.contains(r))
-      debug("for topic [%s], partition [%d], live assigned replicas are: [%s]"
-        .format(topicPartition._1,
-        topicPartition._2,
-        liveAssignedReplicas))
-      if(!liveAssignedReplicas.isEmpty) {
-        debug("live assigned replica is not empty, check zkClient: %s".format(zkClient))
-        val leader = liveAssignedReplicas.head
-        var leaderAndISR: LeaderAndIsr = null
-        var updateLeaderISRZKPathSucceeded: Boolean = false
-        while(!updateLeaderISRZKPathSucceeded) {
-          val curLeaderAndISROpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topicPartition._1, topicPartition._2)
-          debug("curLeaderAndISROpt is %s, zkClient is %s ".format(curLeaderAndISROpt, zkClient))
-          if(curLeaderAndISROpt == None){
-            debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is empty".format(topicPartition._1, topicPartition._2))
-            leaderAndISR = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
-            ZkUtils.createPersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2), leaderAndISR.toString)
-            updateLeaderISRZKPathSucceeded = true
-          } else {
-            debug("During initializing leader of parition (%s, %d),".format(topicPartition._1, topicPartition._2) +
-              " the current leader and isr in zookeeper is not empty")
-            val curZkPathVersion = curLeaderAndISROpt.get.zkVersion
-            leaderAndISR = new LeaderAndIsr(leader, curLeaderAndISROpt.get.leaderEpoch + 1,liveAssignedReplicas.toList,
-              curLeaderAndISROpt.get.zkVersion + 1)
-            val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-              ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2),
-              leaderAndISR.toString, curZkPathVersion)
-            if(updateSucceeded) {
-              leaderAndISR.zkVersion = newVersion
-            }
-            updateLeaderISRZKPathSucceeded = updateSucceeded
-          }
-        }
-        liveAssignedReplicas.foreach(b => {
-          if(!brokerToLeaderAndISRInfoMap.contains(b))
-            brokerToLeaderAndISRInfoMap.put(b, new mutable.HashMap[(String, Int), LeaderAndIsr])
-          brokerToLeaderAndISRInfoMap(b).put(topicPartition, leaderAndISR)
-        }
-        )
-        allLeaders.put(topicPartition, leaderAndISR.leader)
-      }
-      else{
-        warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), liveBrokerIds))
-        ControllerStat.offlinePartitionRate.mark()
-      }
-    }
-
-    info("after leaders initialization for partition replica assignments %s, the cached leaders in controller is %s, and the broker to request map is: %s".format(partitionReplicaAssignment, allLeaders, brokerToLeaderAndISRInfoMap))
-    brokerToLeaderAndISRInfoMap.foreach(m =>{
-      val broker = m._1
-      val leaderAndISRs = m._2
-      val leaderAndISRRequest = new LeaderAndIsrRequest(LeaderAndIsrRequest.NotInit, leaderAndISRs)
-      info("at initializing leaders for new partitions, the leaderAndISR request sent to broker %d is %s".format(broker, leaderAndISRRequest))
-      sendRequest(broker, leaderAndISRRequest)
-    })
-  }
-
-  /**
-   * @param newBrokers The list of brokers that are started up. This is an optional argument that can be empty when
-   * new controller is being elected
-   * The purpose of this API is to send the leader state change request to all live replicas of partitions that
-   * currently don't have an alive leader. It first finds the partitions with dead leaders, then it looks up the list
-   * of assigned replicas for those partitions that are alive. It reads the leader and isr info for those partitions
-   * from zookeeper.
-   * It can happen that when the controller is in the middle of updating the new leader info in zookeeper,
-   * the leader changes the ISR for the partition. Due to this, the zookeeper path's version will be different than
-   * what was known to the controller. So it's new leader update will fail. The controller retries the leader election
-   * based on the new ISR until it's leader update in zookeeper succeeds.
-   * Once the write to zookeeper succeeds, it sends the leader state change request to the live assigned replicas for
-   * each affected partition.
-   */
-  private def onBrokerChange(newBrokers: Set[Int] = Set.empty[Int]) {
-    /** handle the new brokers, send request for them to initialize the local log **/
-    if(newBrokers.size != 0)
-      readAndSendLeaderAndIsrFromZookeeper(newBrokers, allTopics)
-
-    /** handle leader election for the partitions whose leader is no longer alive **/
-    val brokerToLeaderAndIsrInfoMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
-    // retain only partitions whose leaders are not alive
-    val partitionsWithDeadLeaders = allLeaders.filter(partitionAndLeader => !liveBrokerIds.contains(partitionAndLeader._2))
-    partitionsWithDeadLeaders.foreach { partitionAndLeader =>
-      val topic = partitionAndLeader._1._1
-      val partition = partitionAndLeader._1._2
-
-      try {
-        allPartitionReplicaAssignment.get((topic, partition)) match {
-          case Some(assignedReplicas) =>
-            val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => liveBrokerIds.contains(r))
-            ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
-              case Some(currentLeaderAndIsr) =>
-                try {
-                  // elect new leader or throw exception
-                  val newLeaderAndIsr = electLeaderForPartition(topic, partition, currentLeaderAndIsr, assignedReplicas)
-                  // store new leader and isr info in cache
-                  liveAssignedReplicasToThisPartition.foreach { b =>
-                    brokerToLeaderAndIsrInfoMap.getOrElseUpdate(b, new mutable.HashMap[(String, Int), LeaderAndIsr])
-                    brokerToLeaderAndIsrInfoMap(b).put((topic, partition), newLeaderAndIsr)
-                  }
-                }catch {
-                  case e => error("Error while electing leader for partition [%s, %d]".format(topic, partition))
-                }
-              case None => throw new KafkaException(("On broker changes, " +
-                "there's no leaderAndISR information for partition (%s, %d) in zookeeper").format(topic, partition))
-            }
-          case None => throw new KafkaException(("While handling broker changes, the " +
-            "partition [%s, %d] doesn't have assigned replicas. The replica assignment for other partitions is %s")
-            .format(topic, partition, allPartitionReplicaAssignment))
-        }
-      }catch {
-        case e: PartitionOfflineException =>
-          error("All replicas for partition [%s, %d] are dead.".format(topic, partition) +
-            " Marking this partition offline")
-      }
-    }
-    debug("After leader election, leader cache is updated to %s".format(allLeaders))
-    brokerToLeaderAndIsrInfoMap.foreach(m => {
-      val broker = m._1
-      val leaderAndISRInfo = m._2
-      val leaderAndISRRequest = new LeaderAndIsrRequest(LeaderAndIsrRequest.NotInit, leaderAndISRInfo)
-      sendRequest(broker, leaderAndISRRequest)
-      info("On broker changes, the LeaderAndIsrRequest send to broker [%d] is [%s]".format(broker, leaderAndISRRequest))
-    })
-  }
-
-  /**
-   * @param topic                      The topic of the partition whose leader needs to be elected
-   * @param partition                  The partition whose leader needs to be elected
-   * @param currentLeaderAndIsr        The leader and isr information stored for this partition in zookeeper
-   * @param assignedReplicas           The list of replicas assigned to the input partition
-   * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
-   * This API selects a new leader for the input partition -
-   * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader
-   * 2. Else, it picks some alive broker from the assigned replica list as the new leader
-   * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException
-   * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
-   * TODO: If a leader cannot be elected for a partition, it should be marked offline and exposed through some metric
-   */
-  private def electLeaderForPartition(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr,
-                                      assignedReplicas: Seq[Int]):LeaderAndIsr = {
-    var zookeeperPathUpdateSucceeded: Boolean = false
-    var newLeaderAndIsr: LeaderAndIsr = currentLeaderAndIsr
-    while(!zookeeperPathUpdateSucceeded) {
-      val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => liveBrokerIds.contains(r))
-      val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => liveBrokerIds.contains(r))
-      val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
-      val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
-      debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]"
-        .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr,
-        currentLeaderIsrZkPathVersion))
-      newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
-        case true =>
-          debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s"
-            .format(liveAssignedReplicasToThisPartition.mkString(",")))
-          liveAssignedReplicasToThisPartition.isEmpty match {
-            case true =>
-              ControllerStat.offlinePartitionRate.mark()
-              throw new PartitionOfflineException(("No replica for partition " +
-                "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, liveBrokerIds)) +
-                " Assigned replicas are: [%s]".format(assignedReplicas))
-            case false =>
-              ControllerStat.uncleanLeaderElectionRate.mark()
-              val newLeader = liveAssignedReplicasToThisPartition.head
-              warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
-                "There's potential data loss")
-              new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
-          }
-        case false =>
-          val newLeader = liveBrokersInIsr.head
-          debug("Some broker in ISR is alive, picking the leader from the ISR: " + newLeader)
-          new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
-      }
-      info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
-      // update the new leadership decision in zookeeper or retry
-      val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-        ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-        newLeaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
-      newLeaderAndIsr.zkVersion = newVersion
-      zookeeperPathUpdateSucceeded = updateSucceeded
-    }
-    // update the leader cache
-    allLeaders.put((topic, partition), newLeaderAndIsr.leader)
-    newLeaderAndIsr
-  }
-
-  class BrokerChangeListener() extends IZkChildListener with Logging {
-    this.logIdent = "[Controller " + config.brokerId + "], "
-    def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
-      ControllerStat.leaderElectionTimer.time {
-        controllerLock synchronized {
-          val curBrokerIds = currentBrokerList.map(_.toInt).toSet
-          val newBrokerIds = curBrokerIds -- liveBrokerIds
-          val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
-          val deletedBrokerIds = liveBrokerIds -- curBrokerIds
-          liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
-          liveBrokerIds = liveBrokers.map(_.id)
-          info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s"
-            .format(newBrokerIds.mkString(","), deletedBrokerIds.mkString(","), liveBrokerIds.mkString(",")))
-          newBrokers.foreach(controllerChannelManager.addBroker(_))
-          deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_))
-          onBrokerChange(newBrokerIds)
-        }
-      }
-    }
-  }
-
-  private def handleNewTopics(topics: Set[String], partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
-    // get relevant partitions to this broker
-    val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => topics.contains(p._1._1))
-    debug("handling new topics, the partition replica assignment to be handled is %s".format(partitionReplicaAssignment))
-    initLeaders(partitionReplicaAssignment)
-  }
-
-  private def handleDeletedTopics(topics: Set[String], partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
-    val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
-    for((topicPartition, brokers) <- partitionReplicaAssignment){
-      for (broker <- brokers){
-        if (!brokerToPartitionToStopReplicaMap.contains(broker))
-          brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
-        brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
-      }
-      allLeaders.remove(topicPartition)
-      info("after deleting topics %s, allLeader is updated to %s and the broker to stop replia request map is %s".format(topics, allLeaders, brokerToPartitionToStopReplicaMap))
-      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2))
-    }
-    for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
-      val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
-      info("handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
-      sendRequest(broker, stopReplicaRequest)
-    }
-    /* TODO: kafka-330  remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
-  }
-
-  class TopicChangeListener extends IZkChildListener with Logging {
-    this.logIdent = "[Controller " + config.brokerId + "], "
-
-    @throws(classOf[Exception])
-    def handleChildChange(parentPath : String, children : java.util.List[String]) {
-      controllerLock synchronized {
-        info("topic/partition change listener fired for path " + parentPath)
-        val currentChildren = JavaConversions.asBuffer(children).toSet
-        val newTopics = currentChildren -- allTopics
-        val deletedTopics = allTopics -- currentChildren
-        val deletedPartitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => deletedTopics.contains(p._1._1))
-        allTopics = currentChildren
-
-        val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.iterator)
-        allPartitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1._1))
-        allPartitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
-        info("new topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics, deletedTopics, allPartitionReplicaAssignment))
-        handleNewTopics(newTopics, addedPartitionReplicaAssignment)
-        handleDeletedTopics(deletedTopics, deletedPartitionReplicaAssignment)
-      }
-    }
-  }
-
-  class ControllerExistsListener extends IZkDataListener with Logging {
-    this.logIdent = "[Controller " + config.brokerId + "], "
-
-    @throws(classOf[Exception])
-    def handleDataChange(dataPath: String, data: Object) {
-      // do nothing, since No logic is needed here
-    }
-
-    @throws(classOf[Exception])
-    def handleDataDeleted(dataPath: String) {
-      controllerLock synchronized {
-        info("Current controller failed, participating in election for a new controller")
-        controllerRegisterOrFailover()
-      }
-    }
-  }
-}
-
-object ControllerStat extends KafkaMetricsGroup {
-  val offlinePartitionRate = newMeter("OfflinePartitionsPerSec",  "partitions", TimeUnit.SECONDS)
-  val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",  "elections", TimeUnit.SECONDS)
-  val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-}
Index: core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala	(revision 1386941)
+++ core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala	(working copy)
@@ -72,7 +72,6 @@
   def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = {
     val versionId = buffer.getShort
     val clientId = Utils.readShortString(buffer)
-    val isInit = if(buffer.get() == 1.toByte) true else false
     val ackTimeoutMs = buffer.getInt
     val leaderAndISRRequestCount = buffer.getInt
     val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndIsr]
@@ -84,25 +83,24 @@
 
       leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
     }
-    new LeaderAndIsrRequest(versionId, clientId, isInit, ackTimeoutMs, leaderAndISRInfos)
+    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, leaderAndISRInfos)
   }
 }
 
 
 case class LeaderAndIsrRequest (versionId: Short,
                                 clientId: String,
-                                isInit: Boolean,
                                 ackTimeoutMs: Int,
                                 leaderAndISRInfos: Map[(String, Int), LeaderAndIsr])
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
-  def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
-    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, isInit, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos)
+
+  def this(leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
+    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
-    buffer.put(if(isInit) 1.toByte else 0.toByte)
     buffer.putInt(ackTimeoutMs)
     buffer.putInt(leaderAndISRInfos.size)
     for((key, value) <- leaderAndISRInfos){
