diff --git bin/kafka-check-reassignment-status.sh bin/kafka-check-reassignment-status.sh
new file mode 100644
index 0000000..d337c9e
--- /dev/null
+++ bin/kafka-check-reassignment-status.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.admin.CheckReassignmentStatus $@
diff --git bin/kafka-reassign-partitions.sh bin/kafka-reassign-partitions.sh
new file mode 100644
index 0000000..8d006ac
--- /dev/null
+++ bin/kafka-reassign-partitions.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand $@
diff --git config/server.properties config/server.properties
index 13a9815..b47fe94 100644
--- config/server.properties
+++ config/server.properties
@@ -113,10 +113,3 @@ zk.connect=localhost:2181
 
 # Timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000
-
-# metrics reporter properties
-# kafka.metrics.polling.interval.secs=5
-# kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
-# kafka.csv.metrics.dir=kafka_metrics
-# kafka.csv.metrics.reporter.enabled=true
-
diff --git core/lib/metrics-annotation-3.0.0-10ccc80c.jar core/lib/metrics-annotation-3.0.0-10ccc80c.jar
deleted file mode 100644
index 9f96c1a..0000000
Binary files core/lib/metrics-annotation-3.0.0-10ccc80c.jar and /dev/null differ
diff --git core/lib/metrics-core-3.0.0-10ccc80c.jar core/lib/metrics-core-3.0.0-10ccc80c.jar
deleted file mode 100644
index 5f04089..0000000
Binary files core/lib/metrics-core-3.0.0-10ccc80c.jar and /dev/null differ
diff --git core/src/main/scala/kafka/admin/AdminUtils.scala core/src/main/scala/kafka/admin/AdminUtils.scala
index 9e9a428..af4c382 100644
--- core/src/main/scala/kafka/admin/AdminUtils.scala
+++ core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -50,7 +50,7 @@ object AdminUtils extends Logging {
    */
   def assignReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
                               fixedStartIndex: Int = -1)  // for testing only
-  : Map[Int, List[String]] = {
+  : Map[Int, Seq[String]] = {
     if (nPartitions <= 0)
       throw new AdministrationException("number of partitions must be larger than 0")
     if (replicationFactor <= 0)
@@ -74,7 +74,7 @@ object AdminUtils extends Logging {
     ret.toMap
   }
 
-  def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, List[String]], zkClient: ZkClient) {
+  def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[String]], zkClient: ZkClient) {
     try {
       val zkPath = ZkUtils.getTopicPath(topic)
       val jsonPartitionMap = Utils.mapToJson(replicaAssignment.map(e => (e._1.toString -> e._2)))
diff --git core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
new file mode 100644
index 0000000..981d510
--- /dev/null
+++ core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
@@ -0,0 +1,97 @@
+
+package kafka.admin
+
+import joptsimple.OptionParser
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils._
+import scala.collection.Map
+
+object CheckReassignmentStatus extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val jsonFileOpt = parser.accepts("path to json file", "REQUIRED: The JSON file with the list of partitions and the " +
+      "new replicas they should be reassigned to")
+      .withRequiredArg
+      .describedAs("partition reassignment json file path")
+      .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
+      "form host:port. Multiple URLS can be given to allow fail-over.")
+      .withRequiredArg
+      .describedAs("urls")
+      .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(jsonFileOpt, zkConnectOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val jsonFile = options.valueOf(jsonFileOpt)
+    val zkConnect = options.valueOf(zkConnectOpt)
+    val jsonString = Utils.readFileIntoString(jsonFile)
+    val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+
+    try {
+      // read the json file into a string
+      val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match {
+        case Some(reassignedPartitions) =>
+          val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
+          partitions.map { m =>
+            val topic = m.asInstanceOf[Map[String, String]].get("topic").get
+            val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
+            val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get
+            val newReplicas = replicasList.split(",").map(_.toInt)
+            ((topic, partition), newReplicas.toSeq)
+          }.toMap
+        case None => Map.empty[(String, Int), Seq[Int]]
+      }
+
+      val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
+      reassignedPartitionsStatus.foreach { partition =>
+        partition._2 match {
+          case ReassignmentCompleted =>
+            println("Partition [%s,%d] reassignment to %s completed successfully".format(partition._1, partition._2,
+            partitionsToBeReassigned((partition._1._1, partition._1._2))))
+          case ReassignmentFailed =>
+            println("Partition [%s,%d] reassignment to %s failed".format(partition._1, partition._2,
+            partitionsToBeReassigned((partition._1._1, partition._1._2))))
+          case ReassignmentInProgress =>
+            println("Partition [%s,%d] reassignment to %s in progress".format(partition._1, partition._2,
+            partitionsToBeReassigned((partition._1._1, partition._1._2))))
+        }
+      }
+    }
+  }
+
+  def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[(String, Int), Seq[Int]])
+  :Map[(String, Int), ReassignmentStatus] = {
+    val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
+    // for all partitions whose replica reassignment is complete, check the status
+    partitionsToBeReassigned.map { topicAndPartition =>
+      (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1._1, topicAndPartition._1._2,
+        topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
+    }
+  }
+
+  def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topic: String, partition: Int,
+                                            reassignedReplicas: Seq[Int],
+                                            partitionsToBeReassigned: Map[(String, Int), Seq[Int]],
+                                            partitionsBeingReassigned: Map[(String, Int), Seq[Int]]): ReassignmentStatus = {
+    val newReplicas = partitionsToBeReassigned((topic, partition))
+    partitionsBeingReassigned.get((topic, partition)) match {
+      case Some(partition) => ReassignmentInProgress
+      case None =>
+        // check if AR == RAR
+        val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition)
+        if(assignedReplicas == newReplicas)
+          ReassignmentCompleted
+        else
+          ReassignmentFailed
+    }
+  }
+}
\ No newline at end of file
diff --git core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
new file mode 100644
index 0000000..1a1a900
--- /dev/null
+++ core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import joptsimple.OptionParser
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import kafka.common.AdminCommandFailedException
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+
+object ReassignPartitionsCommand extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val jsonFileOpt = parser.accepts("path to json file", "REQUIRED: The JSON file with the list of partitions and the " +
+      "new replicas they should be reassigned to in the following format - \n" +
+       "[{\"topic\": \"foo\", \"partition\": \"1\", \"replicas\": \"1,2,3\" }]")
+      .withRequiredArg
+      .describedAs("partition reassignment json file path")
+      .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
+      "form host:port. Multiple URLS can be given to allow fail-over.")
+      .withRequiredArg
+      .describedAs("urls")
+      .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(jsonFileOpt, zkConnectOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val jsonFile = options.valueOf(jsonFileOpt)
+    val zkConnect = options.valueOf(zkConnectOpt)
+    val jsonString = Utils.readFileIntoString(jsonFile)
+    var zkClient: ZkClient = null
+
+    try {
+      // read the json file into a string
+      val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match {
+        case Some(reassignedPartitions) =>
+          val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
+          partitions.map { m =>
+            val topic = m.asInstanceOf[Map[String, String]].get("topic").get
+            val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
+            val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get
+            val newReplicas = replicasList.split(",").map(_.toInt)
+            ((topic, partition), newReplicas.toSeq)
+          }.toMap
+        case None => throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile))
+      }
+
+      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+      val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
+
+      // attach shutdown handler to catch control-c
+      Runtime.getRuntime().addShutdownHook(new Thread() {
+        override def run() = {
+          // delete the admin path so it can be retried
+          ZkUtils.deletePathRecursive(zkClient, ZkUtils.ReassignPartitionsPath)
+        }
+      })
+
+      if(reassignPartitionsCommand.reassignPartitions())
+        println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
+      else
+        println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+    } catch {
+      case e =>
+        println("Partitions reassignment failed due to " + e.getMessage)
+        println(Utils.stackTrace(e))
+    } finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+}
+
+class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.immutable.Map[(String, Int), Seq[Int]])
+  extends Logging {
+  def reassignPartitions(): Boolean = {
+    try {
+      val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1._1, p._1._2))
+      val jsonReassignmentData = Utils.mapToJson(validPartitions.map(p =>
+        ("%s,%s".format(p._1._1, p._1._2)) -> p._2.map(_.toString)))
+      ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
+      true
+    }catch {
+      case ze: ZkNodeExistsException =>
+        val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
+        throw new AdminCommandFailedException("Partition reassignment currently in " +
+        "progress for %s. Aborting operation".format(partitionsBeingReassigned))
+      case e => error("Admin command failed", e); false
+    }
+  }
+
+  def validatePartition(zkClient: ZkClient, topic: String, partition: Int): Boolean = {
+    // check if partition exists
+    val partitionsOpt = ZkUtils.getPartitionsForTopics(zkClient, List(topic)).get(topic)
+    partitionsOpt match {
+      case Some(partitions) =>
+        if(partitions.contains(partition)) {
+          true
+        }else{
+          error("Skipping reassignment of partition [%s,%d] ".format(topic, partition) +
+            "since it doesn't exist")
+          false
+        }
+      case None => error("Skipping reassignment of partition " +
+        "[%s,%d] since topic %s doesn't exist".format(topic, partition, topic))
+        false
+    }
+  }
+}
+
+sealed trait ReassignmentStatus { def status: Int }
+case object ReassignmentCompleted extends ReassignmentStatus { val status = 1 }
+case object ReassignmentInProgress extends ReassignmentStatus { val status = 0 }
+case object ReassignmentFailed extends ReassignmentStatus { val status = -1 }
diff --git core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 26f2bd8..b507851 100644
--- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -23,7 +23,6 @@ import kafka.utils._
 import collection.mutable.Map
 import collection.mutable.HashMap
 
-
 object LeaderAndIsr {
   val initialLeaderEpoch: Int = 0
   val initialZKVersion: Int = 0
diff --git core/src/main/scala/kafka/api/StopReplicaRequest.scala core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 2f2ba44..99a5f95 100644
--- core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -20,8 +20,6 @@ package kafka.api
 
 import java.nio._
 import kafka.utils._
-import collection.mutable.HashSet
-import collection.mutable.Set
 
 object StopReplicaRequest {
   val CurrentVersion = 1.shortValue()
@@ -33,29 +31,30 @@ object StopReplicaRequest {
     val clientId = Utils.readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
     val topicPartitionPairCount = buffer.getInt
-    val topicPartitionPairSet = new HashSet[(String, Int)]()
-    for (i <- 0 until topicPartitionPairCount){
-      topicPartitionPairSet.add((Utils.readShortString(buffer, "UTF-8"), buffer.getInt))
+    val topicPartitionPairSet = new collection.mutable.HashSet[(String, Int)]()
+    for (i <- 0 until topicPartitionPairCount) {
+      topicPartitionPairSet.add(Utils.readShortString(buffer, "UTF-8"), buffer.getInt)
     }
-    new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet)
+    new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet.toSet)
   }
 }
 
 case class StopReplicaRequest(versionId: Short,
                               clientId: String,
                               ackTimeoutMs: Int,
-                              stopReplicaSet: Set[(String, Int)])
+                              partitions: Set[(String, Int)])
         extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
-  def this(stopReplicaSet: Set[(String, Int)]) = {
-    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet)
+  def this(partitions: Set[(String, Int)]) = {
+    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
+        partitions)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
-    buffer.putInt(stopReplicaSet.size)
-    for ((topic, partitionId) <- stopReplicaSet){
+    buffer.putInt(partitions.size)
+    for ((topic, partitionId) <- partitions){
       Utils.writeShortString(buffer, topic, "UTF-8")
       buffer.putInt(partitionId)
     }
@@ -63,7 +62,7 @@ case class StopReplicaRequest(versionId: Short,
 
   def sizeInBytes(): Int = {
     var size = 2 + (2 + clientId.length()) + 4 + 4
-    for ((topic, partitionId) <- stopReplicaSet){
+    for ((topic, partitionId) <- partitions){
       size += (2 + topic.length()) + 4
     }
     size
diff --git core/src/main/scala/kafka/cluster/Partition.scala core/src/main/scala/kafka/cluster/Partition.scala
index 3bd970e..41ed288 100644
--- core/src/main/scala/kafka/cluster/Partition.scala
+++ core/src/main/scala/kafka/cluster/Partition.scala
@@ -50,7 +50,7 @@ class Partition(val topic: String,
   newGauge(
     topic + "-" + partitionId + "UnderReplicated",
     new Gauge[Int] {
-      def getValue = {
+      def value() = {
         if (isUnderReplicated) 1 else 0
       }
     }
@@ -69,7 +69,7 @@ class Partition(val topic: String,
         if (isReplicaLocal(replicaId)) {
           val log = logManager.getOrCreateLog(topic, partitionId)
           val localReplica = new Replica(replicaId, this, time,
-            highwaterMarkCheckpoint.read(topic, partitionId).min(log.logEndOffset), Some(log))
+            highwaterMarkCheckpoint.read(topic, partitionId), Some(log))
           addReplicaIfNotExists(localReplica)
         }
         else {
diff --git core/src/main/scala/kafka/common/AdminCommandFailedException.scala core/src/main/scala/kafka/common/AdminCommandFailedException.scala
new file mode 100644
index 0000000..94e2864
--- /dev/null
+++ core/src/main/scala/kafka/common/AdminCommandFailedException.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class AdminCommandFailedException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this() = this(null, null)
+}
diff --git core/src/main/scala/kafka/common/PartitionOfflineException.scala core/src/main/scala/kafka/common/PartitionOfflineException.scala
index ab7a095..3367708 100644
--- core/src/main/scala/kafka/common/PartitionOfflineException.scala
+++ core/src/main/scala/kafka/common/PartitionOfflineException.scala
@@ -17,10 +17,12 @@
 
 package kafka.common
 
+
 /**
  * This exception is thrown by the leader elector in the controller when leader election fails for a partition since
  * all the replicas for a partition are offline
  */
-class PartitionOfflineException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
+class PartitionOfflineException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this() = this(null, null)
 }
\ No newline at end of file
diff --git core/src/main/scala/kafka/common/StateChangeFailedException.scala core/src/main/scala/kafka/common/StateChangeFailedException.scala
index a78ca6b..fd56796 100644
--- core/src/main/scala/kafka/common/StateChangeFailedException.scala
+++ core/src/main/scala/kafka/common/StateChangeFailedException.scala
@@ -17,7 +17,7 @@
 
 package kafka.common
 
-class StateChangeFailedException(message: String) extends RuntimeException(message) {
-  def this(message: String, cause: Throwable) = this(message + " Root cause -> " + cause.toString)
-  def this() = this(null)
+class StateChangeFailedException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this() = this(null, null)
 }
\ No newline at end of file
diff --git core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 886388a..f5df1fc 100644
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -686,7 +686,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       newGauge(
         config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
         new Gauge[Int] {
-          def getValue = q.size
+          def value() = q.size
         }
       )
     })
diff --git core/src/main/scala/kafka/controller/ControllerChannelManager.scala core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 90cf187..fb6ea76 100644
--- core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -137,36 +137,49 @@ class RequestSendThread(val controllerId: Int,
   }
 }
 
-// TODO: When we add more types of requests, we can generalize this class a bit. Right now, it just handles LeaderAndIsr
-// request
 class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit)
   extends  Logging {
-  val brokerRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
+  val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
+  val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
 
   def newBatch() {
     // raise error if the previous batch is not empty
-    if(brokerRequestMap.size > 0)
+    if(leaderAndIsrRequestMap.size > 0 || stopReplicaRequestMap.size > 0)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
-        "a new one. Some state changes %s might be lost ".format(brokerRequestMap.toString()))
-    brokerRequestMap.clear()
+        "a new one. Some state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
+    leaderAndIsrRequestMap.clear()
+    stopReplicaRequestMap.clear()
   }
 
-  def addRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr) {
+  def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr) {
     brokerIds.foreach { brokerId =>
-      brokerRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), LeaderAndIsr])
-      brokerRequestMap(brokerId).put((topic, partition), leaderAndIsr)
+      leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), LeaderAndIsr])
+      leaderAndIsrRequestMap(brokerId).put((topic, partition), leaderAndIsr)
+    }
+  }
+
+  def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int) {
+    brokerIds.foreach { brokerId =>
+      stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[(String, Int)])
+      stopReplicaRequestMap(brokerId) :+ (topic, partition)
     }
   }
 
   def sendRequestsToBrokers() {
-    brokerRequestMap.foreach { m =>
+    leaderAndIsrRequestMap.foreach { m =>
       val broker = m._1
       val leaderAndIsr = m._2
       val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr)
-      info("Sending to broker %d leaderAndIsr request of %s".format(broker, leaderAndIsrRequest))
+      debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
       sendRequest(broker, leaderAndIsrRequest, null)
     }
-    brokerRequestMap.clear()
+    leaderAndIsrRequestMap.clear()
+    stopReplicaRequestMap.foreach { r =>
+      val broker = r._1
+      debug("The stop replica request sent to broker %d is %s".format(broker, r._2.mkString(",")))
+      sendRequest(broker, new StopReplicaRequest(Set.empty[(String, Int)] ++ r._2), null)
+    }
+    stopReplicaRequestMap.clear()
   }
 }
 
diff --git core/src/main/scala/kafka/controller/KafkaController.scala core/src/main/scala/kafka/controller/KafkaController.scala
index 165c1d9..768856c 100644
--- core/src/main/scala/kafka/controller/KafkaController.scala
+++ core/src/main/scala/kafka/controller/KafkaController.scala
@@ -20,14 +20,17 @@ import collection._
 import collection.immutable.Set
 import kafka.cluster.Broker
 import kafka.api._
-import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import kafka.utils.ZkUtils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import kafka.utils.{ZkUtils, Logging}
-import java.lang.Object
 import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
 import java.util.concurrent.TimeUnit
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import com.yammer.metrics.core.Gauge
+import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
+import kafka.utils.{Utils, ZkUtils, Logging}
+import org.I0Itec.zkclient.exception.ZkNoNodeException
+import java.lang.{IllegalStateException, Object}
+import kafka.common.KafkaException
 
 class ControllerContext(val zkClient: ZkClient,
                         var controllerChannelManager: ControllerChannelManager = null,
@@ -36,21 +39,24 @@ class ControllerContext(val zkClient: ZkClient,
                         var liveBrokerIds: Set[Int] = null,
                         var allTopics: Set[String] = null,
                         var partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null,
-                        var allLeaders: mutable.Map[(String, Int), Int] = null)
+                        var allLeaders: mutable.Map[(String, Int), Int] = null,
+                        var partitionsBeingReassigned: mutable.Map[(String, Int), ReassignedPartitionsContext] =
+                        new mutable.HashMap)
 
 class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup {
-  this.logIdent = "[Controller " + config.brokerId + "], "
+  this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
   val controllerContext = new ControllerContext(zkClient)
   private val partitionStateMachine = new PartitionStateMachine(this)
   private val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
     config.brokerId)
+  private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
 
   newGauge(
     "ActiveControllerCount",
     new Gauge[Int] {
-      def getValue = if (isActive) 1 else 0
+      def value() = if (isActive) 1 else 0
     }
   )
 
@@ -67,6 +73,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     if(isRunning) {
       info("Broker %d starting become controller state transition".format(config.brokerId))
       // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
+      registerReassignedPartitionsListener()
       partitionStateMachine.registerListeners()
       replicaStateMachine.registerListeners()
       initializeControllerContext()
@@ -98,7 +105,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     updateLeaderAndIsrCache()
     // update partition state machine
     partitionStateMachine.triggerOnlinePartitionStateChange()
-    replicaStateMachine.handleStateChanges(newBrokers, OnlineReplica)
+    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers),
+      OnlineReplica)
+    // check if reassignment of some partitions need to be restarted
+    val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter(p =>
+      p._2.newReplicas.foldLeft(false)((a, replica) => newBrokers.contains(replica) || a))
+    partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1._1, p._1._2, p._2))
   }
 
   /**
@@ -121,7 +133,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()
     // handle dead replicas
-    replicaStateMachine.handleStateChanges(deadBrokers, OfflineReplica)
+    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers),
+      OfflineReplica)
   }
 
   /**
@@ -146,29 +159,74 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   def onNewPartitionCreation(newPartitions: Seq[(String, Int)]) {
     info("New partition creation callback for %s".format(newPartitions.mkString(",")))
     partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
+    replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), NewReplica)
     partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition)
+    replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), OnlineReplica)
   }
 
-  /* TODO: kafka-330  This API is unused until we introduce the delete topic functionality.
-  remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
-  def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
-    val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
-    for((topicPartition, brokers) <- replicaAssignment){
-      for (broker <- brokers){
-        if (!brokerToPartitionToStopReplicaMap.contains(broker))
-          brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
-        brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
-      }
-      controllerContext.allLeaders.remove(topicPartition)
-      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2))
-    }
-    for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
-      val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
-      info("Handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
-      sendRequest(broker, stopReplicaRequest)
+  /**
+   * This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition
+   * reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener.
+   * Reassigning replicas for a partition goes through a few stages -
+   * RAR = Reassigned replicas
+   * AR = Original list of replicas for partition
+   * 1. Register listener for ISR changes to detect when the RAR is a subset of the ISR
+   * 2. Start new replicas RAR - AR.
+   * 3. Wait until new replicas are in sync with the leader
+   * 4. If the leader is not in RAR, elect a new leader from RAR
+   * 5. Stop old replicas AR - RAR
+   * 6. Write new AR
+   * 7. Remove partition from the /admin/reassign_partitions path
+   */
+  def onPartitionReassignment(topic: String, partition: Int, reassignedPartitionContext: ReassignedPartitionsContext) {
+    val reassignedReplicas = reassignedPartitionContext.newReplicas
+    areReplicasInIsr(topic, partition, reassignedReplicas) match {
+      case true =>
+        // mark the new replicas as online
+        reassignedReplicas.foreach { replica =>
+          replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)),
+            OnlineReplica)
+        }
+        // check if current leader is in the new replicas list. If not, controller needs to trigger leader election
+        moveReassignedPartitionLeaderIfRequired(topic, partition, reassignedPartitionContext)
+        // stop older replicas
+        stopOldReplicasOfReassignedPartition(topic, partition, reassignedPartitionContext)
+        // write the new list of replicas for this partition in zookeeper
+        updateAssignedReplicasForPartition(topic, partition, reassignedPartitionContext)
+        // update the /admin/reassign_partitions path to remove this partition
+        removePartitionFromReassignedPartitions(topic, partition)
+        info("Removed partition [%s, %d] from the list of reassigned partitions in zookeeper".format(topic, partition))
+        controllerContext.partitionsBeingReassigned.remove((topic, partition))
+      case false =>
+        info("New replicas %s for partition [%s, %d] being ".format(reassignedReplicas.mkString(","), topic, partition) +
+          "reassigned not yet caught up with the leader")
+        // start new replicas
+        startNewReplicasForReassignedPartition(topic, partition, reassignedPartitionContext)
+        info("Waiting for new replicas %s for partition [%s, %d] being ".format(reassignedReplicas.mkString(","), topic, partition) +
+          "reassigned to catch up with the leader")
     }
   }
 
+  /* TODO: kafka-330  This API is unused until we introduce the delete topic functionality.
+  remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
+  //  def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
+  //    val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
+  //    for((topicPartition, brokers) <- replicaAssignment){
+  //      for (broker <- brokers){
+  //        if (!brokerToPartitionToStopReplicaMap.contains(broker))
+  //          brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
+  //        brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
+  //      }
+  //      controllerContext.allLeaders.remove(topicPartition)
+  //      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2))
+  //    }
+  //    for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
+  //      val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
+  //      info("Handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
+  //      sendRequest(broker, stopReplicaRequest)
+  //    }
+  //  }
+
   /**
    * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
    * is the controller. It merely registers the session expiration listener and starts the controller leader
@@ -207,7 +265,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   private def registerSessionExpirationListener() = {
-    zkClient.subscribeStateChanges(new SessionExpireListener())
+    zkClient.subscribeStateChanges(new SessionExpirationListener())
   }
 
   private def initializeControllerContext() {
@@ -223,6 +281,23 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     startChannelManager()
     info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
     info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
+    initializeReassignedPartitionsContext()
+  }
+
+  private def initializeReassignedPartitionsContext() {
+    // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
+    val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
+    // check if they are already completed
+    val reassignedPartitions = partitionsBeingReassigned.filter(partition =>
+      controllerContext.partitionReplicaAssignment(partition._1) == partition._2.newReplicas).map(_._1)
+    reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p._1, p._2))
+    controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned
+    controllerContext.partitionsBeingReassigned --= reassignedPartitions
+    info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString()))
+    info("Partitions already reassigned: %s".format(reassignedPartitions.toString()))
+    info("Resuming reassignment of partitions: %s".format(controllerContext.partitionsBeingReassigned.toString()))
+    controllerContext.partitionsBeingReassigned.foreach(partition =>
+      onPartitionReassignment(partition._1._1, partition._1._2, partition._2))
   }
 
   private def startChannelManager() {
@@ -244,8 +319,122 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     }
   }
 
-  class SessionExpireListener() extends IZkStateListener with Logging {
-    this.logIdent = "[Controller " + config.brokerId + "], "
+  private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
+    getLeaderAndIsrForPartition(zkClient, topic, partition) match {
+      case Some(leaderAndIsr) =>
+        val replicasNotInIsr = replicas.filterNot(r => leaderAndIsr.isr.contains(r))
+        replicasNotInIsr.isEmpty
+      case None => false
+    }
+  }
+
+  private def moveReassignedPartitionLeaderIfRequired(topic: String, partition: Int,
+                                                      reassignedPartitionContext: ReassignedPartitionsContext) {
+    val reassignedReplicas = reassignedPartitionContext.newReplicas
+    val currentLeader = controllerContext.allLeaders((topic, partition))
+    if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
+      info("Leader %s for partition [%s, %d] being reassigned, ".format(currentLeader, topic, partition) +
+        "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
+      // move the leader to one of the alive and caught up new replicas
+      partitionStateMachine.handleStateChanges(List((topic, partition)), OnlinePartition,
+        reassignedPartitionLeaderSelector)
+    }else {
+      // check if the leader is alive or not
+      controllerContext.liveBrokerIds.contains(currentLeader) match {
+        case true =>
+          info("Leader %s for partition [%s, %d] being reassigned, ".format(currentLeader, topic, partition) +
+            "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
+        case false =>
+          info("Leader %s for partition [%s, %d] being reassigned, ".format(currentLeader, topic, partition) +
+            "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
+          partitionStateMachine.handleStateChanges(List((topic, partition)), OnlinePartition,
+            reassignedPartitionLeaderSelector)
+      }
+    }
+  }
+
+  private def stopOldReplicasOfReassignedPartition(topic: String, partition: Int,
+                                                   reassignedPartitionContext: ReassignedPartitionsContext) {
+    val reassignedReplicas = reassignedPartitionContext.newReplicas
+    // send stop replica state change request to the old replicas
+    val oldReplicas = controllerContext.partitionReplicaAssignment((topic, partition)).toSet -- reassignedReplicas.toSet
+    // first move the replica to offline state (the controller removes it from the ISR)
+    oldReplicas.foreach { replica =>
+      replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)), OfflineReplica)
+    }
+    // send stop replica command to the old replicas
+    oldReplicas.foreach { replica =>
+      replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)), NonExistentReplica)
+    }
+  }
+
+  private def updateAssignedReplicasForPartition(topic: String, partition: Int,
+                                                 reassignedPartitionContext: ReassignedPartitionsContext) {
+    val reassignedReplicas = reassignedPartitionContext.newReplicas
+    val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1._1.equals(topic))
+    partitionsAndReplicasForThisTopic.put((topic, partition), reassignedReplicas)
+    updateAssignedReplicasForPartition(topic, partition, partitionsAndReplicasForThisTopic)
+    info("Updated assigned replicas for partition [%s, %d] being reassigned ".format(topic, partition) +
+      "to %s".format(reassignedReplicas.mkString(",")))
+    // update the assigned replica list after a successful zookeeper write
+    controllerContext.partitionReplicaAssignment.put((topic, partition), reassignedReplicas)
+    // stop watching the ISR changes for this partition
+    zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+      controllerContext.partitionsBeingReassigned((topic, partition)).isrChangeListener)
+    // update the assigned replica list
+    controllerContext.partitionReplicaAssignment.put((topic, partition), reassignedReplicas)
+  }
+
+  private def startNewReplicasForReassignedPartition(topic: String, partition: Int,
+                                                     reassignedPartitionContext: ReassignedPartitionsContext) {
+    // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned
+    // replicas list
+    val assignedReplicaSet = Set.empty[Int] ++ controllerContext.partitionReplicaAssignment((topic, partition))
+    val reassignedReplicaSet = Set.empty[Int] ++ reassignedPartitionContext.newReplicas
+    val newReplicas: Seq[Int] = (reassignedReplicaSet -- assignedReplicaSet).toSeq
+    newReplicas.foreach { replica =>
+      replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)), NewReplica)
+    }
+  }
+
+  private def registerReassignedPartitionsListener() = {
+    zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, new PartitionsReassignedListener(this))
+  }
+
+  def removePartitionFromReassignedPartitions(topic: String, partition: Int) {
+    // read the current list of reassigned partitions from zookeeper
+    val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
+    // remove this partition from that list
+    val updatedPartitionsBeingReassigned = partitionsBeingReassigned - ((topic, partition))
+    // write the new list to zookeeper
+    ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
+    // update the cache
+    controllerContext.partitionsBeingReassigned.remove((topic, partition))
+  }
+
+  def updateAssignedReplicasForPartition(topic: String, partition: Int,
+                                         newReplicaAssignmentForTopic: Map[(String, Int), Seq[Int]]) {
+    try {
+      val zkPath = ZkUtils.getTopicPath(topic)
+      val jsonPartitionMap = Utils.mapToJson(newReplicaAssignmentForTopic.map(e =>
+        (e._1._2.toString -> e._2.map(_.toString))))
+      ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
+      debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
+    } catch {
+      case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topic))
+      case e2 => throw new KafkaException(e2.toString)
+    }
+  }
+
+  private def getAllReplicasForPartition(partitions: Seq[(String, Int)]): Seq[PartitionAndReplica] = {
+    partitions.map { p =>
+      val replicas = controllerContext.partitionReplicaAssignment(p)
+      replicas.map(r => new PartitionAndReplica(p._1, p._2, r))
+    }.flatten
+  }
+
+  class SessionExpirationListener() extends IZkStateListener with Logging {
+    this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
     @throws(classOf[Exception])
     def handleStateChanged(state: KeeperState) {
       // do nothing, since zkclient will do reconnect for us.
@@ -274,6 +463,159 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 }
 
+/**
+ * Starts the partition reassignment process unless -
+ * 1. Partition previously existed
+ * 2. New replicas are the same as existing replicas
+ * 3. Any replica in the new set of replicas are dead
+ * If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned
+ * partitions.
+ */
+class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
+  this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
+  val zkClient = controller.controllerContext.zkClient
+  val controllerContext = controller.controllerContext
+
+  /**
+   * Invoked when some partitions are reassigned by the admin command
+   * @throws Exception On any error.
+   */
+  @throws(classOf[Exception])
+  def handleDataChange(dataPath: String, data: Object) {
+    debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
+      .format(dataPath, data))
+    val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
+    val newPartitions = partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
+    newPartitions.foreach { partitionToBeReassigned =>
+      controllerContext.controllerLock synchronized {
+        val topic = partitionToBeReassigned._1._1
+        val partition = partitionToBeReassigned._1._2
+        val newReplicas = partitionToBeReassigned._2
+        val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+        try {
+          val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get((topic, partition))
+          assignedReplicasOpt match {
+            case Some(assignedReplicas) =>
+              if(assignedReplicas == newReplicas) {
+                throw new KafkaException("Partition [%s, %d] to be reassigned is already assigned to replicas"
+                  .format(topic, partition) +
+                  " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
+              }else {
+                if(aliveNewReplicas == newReplicas) {
+                  info("Handling reassignment of partition [%s, %d] to new replicas %s".format(topic, partition,
+                    newReplicas.mkString(",")))
+                  val context = createReassignmentContextForPartition(topic, partition, newReplicas)
+                  controllerContext.partitionsBeingReassigned.put((topic, partition), context)
+                  controller.onPartitionReassignment(topic, partition, context)
+                }else {
+                  // some replica in RAR is not alive. Fail partition reassignment
+                  throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
+                    " %s for partition [%s, %d] to be reassigned are alive. ".format(newReplicas.mkString(","), topic, partition) +
+                    "Failing partition reassignment")
+                }
+              }
+            case None => throw new KafkaException("Attempt to reassign partition [%s, %d] that doesn't exist"
+              .format(topic, partition))
+          }
+        }catch {
+          case e => error("Error completing reassignment of partition [%s, %d]".format(topic, partition), e)
+          // remove the partition from the admin path to unblock the admin client
+          controller.removePartitionFromReassignedPartitions(topic, partition)
+        }
+      }
+    }
+  }
+
+  /**
+   * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
+   * @throws Exception
+   *             On any error.
+   */
+  @throws(classOf[Exception])
+  def handleDataDeleted(dataPath: String) {
+  }
+
+  private def createReassignmentContextForPartition(topic: String,
+                                                    partition: Int,
+                                                    newReplicas: Seq[Int]): ReassignedPartitionsContext = {
+    val context = new ReassignedPartitionsContext(newReplicas)
+    // first register ISR change listener
+    watchIsrChangesForReassignedPartition(topic, partition, context)
+    context
+  }
+
+  private def watchIsrChangesForReassignedPartition(topic: String, partition: Int,
+                                                    reassignedPartitionContext: ReassignedPartitionsContext) {
+    val reassignedReplicas = reassignedPartitionContext.newReplicas
+    val isrChangeListener = new ReassignedPartitionsIsrChangeListener(controller, topic, partition,
+      reassignedReplicas.toSet)
+    reassignedPartitionContext.isrChangeListener = isrChangeListener
+    // register listener on the leader and isr path to wait until they catch up with the current leader
+    zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
+  }
+}
+
+class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int,
+                                            reassignedReplicas: Set[Int])
+  extends IZkDataListener with Logging {
+  this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: "
+  val zkClient = controller.controllerContext.zkClient
+  val controllerContext = controller.controllerContext
+
+  /**
+   * Invoked when some partitions are reassigned by the admin command
+   * @throws Exception On any error.
+   */
+  @throws(classOf[Exception])
+  def handleDataChange(dataPath: String, data: Object) {
+    try {
+      controllerContext.controllerLock synchronized {
+        debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
+        // check if this partition is still being reassigned or not
+        controllerContext.partitionsBeingReassigned.get((topic, partition)) match {
+          case Some(reassignedPartitionContext) =>
+            // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
+            val newLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
+            newLeaderAndIsrOpt match {
+              case Some(leaderAndIsr) => // check if new replicas have joined ISR
+                val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
+                if(caughtUpReplicas == reassignedReplicas) {
+                  // resume the partition reassignment process
+                  info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
+                    .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
+                    "Resuming partition reassignment")
+                  controller.onPartitionReassignment(topic, partition, reassignedPartitionContext)
+                }else {
+                  info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
+                    .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
+                    "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
+                }
+              case None => error("Error handling reassignment of partition [%s, %d] to replicas %s as it was never created"
+                .format(topic, partition, reassignedReplicas.mkString(",")))
+            }
+          case None =>
+        }
+      }
+    }catch {
+      case e => error("Error while handling partition reassignment", e)
+    }
+  }
+
+  /**
+   * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
+   * @throws Exception
+   *             On any error.
+   */
+  @throws(classOf[Exception])
+  def handleDataDeleted(dataPath: String) {
+  }
+}
+
+case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
+                                       var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)
+
+case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
+
 object ControllerStat extends KafkaMetricsGroup {
   val offlinePartitionRate = newMeter("OfflinePartitionsPerSec",  "partitions", TimeUnit.SECONDS)
   val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",  "elections", TimeUnit.SECONDS)
diff --git core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
new file mode 100644
index 0000000..2d77d99
--- /dev/null
+++ core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.controller
+
+import kafka.api.LeaderAndIsr
+import kafka.utils.Logging
+import kafka.common.{StateChangeFailedException, PartitionOfflineException}
+
+trait PartitionLeaderSelector {
+
+  /**
+   * @param topic                      The topic of the partition whose leader needs to be elected
+   * @param partition                  The partition whose leader needs to be elected
+   * @param assignedReplicas           The list of replicas assigned to the input partition
+   * @param currentLeaderAndIsr        The current leader and isr of input partition read from zookeeper
+   * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
+   * @returns The leader and isr request, with the newly selected leader info, to send to the brokers
+   * Also, returns the list of replicas the returned leader and isr request should be sent to
+   * This API selects a new leader for the input partition
+   */
+  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
+
+}
+
+/**
+ * This API selects a new leader for the input partition -
+ * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader
+ * 2. Else, it picks some alive broker from the assigned replica list as the new leader
+ * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException
+ * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
+ */
+class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
+
+  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+    controllerContext.partitionReplicaAssignment.get((topic, partition)) match {
+      case Some(assignedReplicas) =>
+        val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+        val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
+        val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
+        val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
+        debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]"
+          .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr,
+          currentLeaderIsrZkPathVersion))
+        val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
+          case true =>
+            debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s"
+              .format(liveAssignedReplicasToThisPartition.mkString(",")))
+            liveAssignedReplicasToThisPartition.isEmpty match {
+              case true =>
+                ControllerStat.offlinePartitionRate.mark()
+                throw new PartitionOfflineException(("No replica for partition " +
+                  "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) +
+                  " Assigned replicas are: [%s]".format(assignedReplicas))
+              case false =>
+                ControllerStat.uncleanLeaderElectionRate.mark()
+                val newLeader = liveAssignedReplicasToThisPartition.head
+                warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
+                  "There's potential data loss")
+                new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
+            }
+          case false =>
+            val newLeader = liveBrokersInIsr.head
+            debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader)
+            new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
+        }
+        info("Selected new leader and ISR %s for offline partition [%s, %d]".format(newLeaderAndIsr.toString(), topic,
+          partition))
+        (newLeaderAndIsr, liveAssignedReplicasToThisPartition)
+      case None =>
+        ControllerStat.offlinePartitionRate.mark()
+        throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic, partition) +
+                                            "replicas assigned to it")
+    }
+  }
+}
+
+/**
+ * Picks one of the alive in-sync reassigned replicas as the new leader
+ */
+class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
+
+  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+    val reassignedReplicas = controllerContext.partitionsBeingReassigned((topic, partition)).newReplicas
+    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
+    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
+    debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]"
+      .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr,
+      currentLeaderIsrZkPathVersion))
+    // pick any replica from the newly assigned replicas list that is in the ISR
+    val aliveReassignedReplicas = reassignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+    val newLeaderOpt = aliveReassignedReplicas.headOption
+    newLeaderOpt match {
+      case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
+                              currentLeaderIsrZkPathVersion + 1), reassignedReplicas)
+      case None =>
+        reassignedReplicas.size match {
+          case 0 =>
+            throw new StateChangeFailedException("List of reassigned replicas for partition " +
+              "([%s, %d]) is empty. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr))
+          case _ =>
+            throw new StateChangeFailedException("None of the reassigned replicas for partition " +
+              "([%s, %d]) are alive. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr))
+        }
+    }
+  }
+}
\ No newline at end of file
diff --git core/src/main/scala/kafka/controller/PartitionStateMachine.scala core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index a9c094c..59fbae3 100644
--- core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -21,7 +21,7 @@ import kafka.api.LeaderAndIsr
 import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.IZkChildListener
 import collection.JavaConversions._
-import kafka.common.{StateChangeFailedException, PartitionOfflineException, KafkaException}
+import kafka.common.{StateChangeFailedException, PartitionOfflineException}
 import java.util.concurrent.atomic.AtomicBoolean
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 
@@ -43,6 +43,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   private val zkClient = controllerContext.zkClient
   var partitionState: mutable.Map[(String, Int), PartitionState] = mutable.Map.empty
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
+  val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private var isShuttingDown = new AtomicBoolean(false)
 
   /**
@@ -82,7 +83,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state
       partitionState.filter(partitionAndState =>
         partitionAndState._2.equals(OfflinePartition) || partitionAndState._2.equals(NewPartition)).foreach {
-        partitionAndState => handleStateChange(partitionAndState._1._1, partitionAndState._1._2, OnlinePartition)
+        partitionAndState => handleStateChange(partitionAndState._1._1, partitionAndState._1._2, OnlinePartition,
+                                               offlinePartitionSelector)
       }
       brokerRequestBatch.sendRequestsToBrokers()
     }catch {
@@ -95,12 +97,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * @param partitions   The list of partitions that need to be transitioned to the target state
    * @param targetState  The state that the partitions should be moved to
    */
-  def handleStateChanges(partitions: Seq[(String, Int)], targetState: PartitionState) {
+  def handleStateChanges(partitions: Seq[(String, Int)], targetState: PartitionState,
+                         leaderSelector: PartitionLeaderSelector = offlinePartitionSelector) {
     info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
     try {
       brokerRequestBatch.newBatch()
       partitions.foreach { topicAndPartition =>
-        handleStateChange(topicAndPartition._1, topicAndPartition._2, targetState)
+        handleStateChange(topicAndPartition._1, topicAndPartition._2, targetState, leaderSelector)
       }
       brokerRequestBatch.sendRequestsToBrokers()
     }catch {
@@ -115,7 +118,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * @param partition   The partition for which the state transition is invoked
    * @param targetState The end state that the partition should be moved to
    */
-  private def handleStateChange(topic: String, partition: Int, targetState: PartitionState) {
+  private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
+                                leaderSelector: PartitionLeaderSelector) {
     try {
       partitionState.getOrElseUpdate((topic, partition), NonExistentPartition)
       targetState match {
@@ -128,17 +132,18 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           info("Partition [%s, %d] state changed from NotExists to New with assigned replicas ".format(topic, partition) +
             "%s".format(controllerContext.partitionReplicaAssignment(topic, partition).mkString(",")))
         case OnlinePartition =>
-          // pre: partition should be in New state
-          assertValidPreviousStates(topic, partition, List(NewPartition, OfflinePartition), OnlinePartition)
+          assertValidPreviousStates(topic, partition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
           partitionState(topic, partition) match {
             case NewPartition =>
               // initialize leader and isr path for new partition
-              initializeLeaderAndIsrForPartition(topic, partition, brokerRequestBatch)
+              initializeLeaderAndIsrForPartition(topic, partition)
             case OfflinePartition =>
-              electLeaderForOfflinePartition(topic, partition, brokerRequestBatch)
+              electLeaderForPartition(topic, partition, leaderSelector)
+            case OnlinePartition => // invoked when the leader needs to be re-elected
+              electLeaderForPartition(topic, partition, leaderSelector)
             case _ => // should never come here since illegal previous states are checked above
           }
-          info("Partition [%s, %d] state changed from %s to Online with leader %d".format(topic, partition,
+          info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition,
             partitionState(topic, partition), controllerContext.allLeaders(topic, partition)))
           partitionState.put((topic, partition), OnlinePartition)
            // post: partition has a leader
@@ -215,8 +220,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * @brokerRequestBatch  The object that holds the leader and isr requests to be sent to each broker as a result of
    *                      this state change
    */
-  private def initializeLeaderAndIsrForPartition(topic: String, partition: Int,
-                                                 brokerRequestBatch: ControllerBrokerRequestBatch) {
+  private def initializeLeaderAndIsrForPartition(topic: String, partition: Int) {
     debug("Initializing leader and isr for partition [%s, %d]".format(topic, partition))
     val replicaAssignment = controllerContext.partitionReplicaAssignment((topic, partition))
     val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
@@ -234,10 +238,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         try {
           ZkUtils.createPersistentPath(controllerContext.zkClient,
             ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString)
-          // TODO: the above write can fail only if the current controller lost its zk session and the new controller
+          // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
           // took over and initialized this partition. This can happen if the current controller went into a long
           // GC pause
-          brokerRequestBatch.addRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr)
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr)
           controllerContext.allLeaders.put((topic, partition), leaderAndIsr.leader)
           partitionState.put((topic, partition), OnlinePartition)
         }catch {
@@ -257,101 +261,38 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * @brokerRequestBatch  The object that holds the leader and isr requests to be sent to each broker as a result of
    *                      this state change
    */
-  private def electLeaderForOfflinePartition(topic: String, partition: Int,
-                                             brokerRequestBatch: ControllerBrokerRequestBatch) {
+  def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
     /** handle leader election for the partitions whose leader is no longer alive **/
-    info("Electing leader for Offline partition [%s, %d]".format(topic, partition))
+    info("Electing leader for partition [%s, %d]".format(topic, partition))
     try {
-      controllerContext.partitionReplicaAssignment.get((topic, partition)) match {
-        case Some(assignedReplicas) =>
-          val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
-          try {
-            // elect new leader or throw exception
-            val newLeaderAndIsr = electLeaderForPartition(topic, partition, assignedReplicas)
-            info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
-            // store new leader and isr info in cache
-            brokerRequestBatch.addRequestForBrokers(liveAssignedReplicasToThisPartition, topic, partition,
-              newLeaderAndIsr)
-          }catch {
-            case e => throw new StateChangeFailedException(("Error while electing leader for partition" +
-              " [%s, %d]").format(topic, partition), e)
-          }
-        case None => throw new KafkaException(("While handling broker changes, the " +
-          "partition [%s, %d] doesn't have assigned replicas. The replica assignment for other partitions is %s")
-          .format(topic, partition, controllerContext.partitionReplicaAssignment))
+      var zookeeperPathUpdateSucceeded: Boolean = false
+      var newLeaderAndIsr: LeaderAndIsr = null
+      var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
+      while(!zookeeperPathUpdateSucceeded) {
+        val currentLeaderAndIsr = getLeaderAndIsrOrThrowException(topic, partition)
+        // elect new leader or throw exception
+        val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topic, partition, currentLeaderAndIsr)
+        val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+          ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
+        newLeaderAndIsr = leaderAndIsr
+        newLeaderAndIsr.zkVersion = newVersion
+        zookeeperPathUpdateSucceeded = updateSucceeded
+        replicasForThisPartition = replicas
       }
+      // update the leader cache
+      controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
+      info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
+      // store new leader and isr info in cache
+      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderAndIsr)
     }catch {
-      case e => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
-        .format(topic, partition) + " Marking this partition offline")
+      case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
+        .format(topic, partition) + " Marking this partition offline", poe)
+      case sce => throw new StateChangeFailedException(("Error while electing leader for partition" +
+        " [%s, %d]").format(topic, partition), sce)
     }
     debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
   }
 
-  /**
-   * @param topic                      The topic of the partition whose leader needs to be elected
-   * @param partition                  The partition whose leader needs to be elected
-   * @param assignedReplicas           The list of replicas assigned to the input partition
-   * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
-   * This API selects a new leader for the input partition -
-   * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader
-   * 2. Else, it picks some alive broker from the assigned replica list as the new leader
-   * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException
-   * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
-   */
-  private def electLeaderForPartition(topic: String, partition: Int, assignedReplicas: Seq[Int]):LeaderAndIsr = {
-    var zookeeperPathUpdateSucceeded: Boolean = false
-    var newLeaderAndIsr: LeaderAndIsr = null
-    while(!zookeeperPathUpdateSucceeded) {
-      newLeaderAndIsr = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
-        case Some(currentLeaderAndIsr) =>
-          var newLeaderAndIsr: LeaderAndIsr = currentLeaderAndIsr
-          val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
-          val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
-          val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
-          val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
-          debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]"
-            .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr,
-            currentLeaderIsrZkPathVersion))
-          newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
-            case true =>
-              debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s"
-                .format(liveAssignedReplicasToThisPartition.mkString(",")))
-              liveAssignedReplicasToThisPartition.isEmpty match {
-                case true =>
-                  ControllerStat.offlinePartitionRate.mark()
-                  throw new PartitionOfflineException(("No replica for partition " +
-                  "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) +
-                  " Assigned replicas are: [%s]".format(assignedReplicas))
-                case false =>
-                  ControllerStat.uncleanLeaderElectionRate.mark()
-                  val newLeader = liveAssignedReplicasToThisPartition.head
-                  warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
-                    "There's potential data loss")
-                  new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
-              }
-            case false =>
-              val newLeader = liveBrokersInIsr.head
-              debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader)
-              new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
-          }
-          info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
-          // update the new leadership decision in zookeeper or retry
-          val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-            newLeaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
-          newLeaderAndIsr.zkVersion = newVersion
-          zookeeperPathUpdateSucceeded = updateSucceeded
-          newLeaderAndIsr
-        case None =>
-          throw new StateChangeFailedException("On broker changes, " +
-            "there's no leaderAndISR information for partition (%s, %d) in zookeeper".format(topic, partition))
-      }
-    }
-    // update the leader cache
-    controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
-    newLeaderAndIsr
-  }
-
   private def registerTopicChangeListener() = {
     zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, new TopicChangeListener())
   }
@@ -360,6 +301,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic))
   }
 
+  private def getLeaderAndIsrOrThrowException(topic: String, partition: Int): LeaderAndIsr = {
+    ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
+      case Some(currentLeaderAndIsr) => currentLeaderAndIsr
+      case None =>
+        throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " +
+          "[%s, %d] in %s state".format(topic, partition, partitionState((topic, partition))))
+    }
+  }
+
   /**
    * This is the zookeeper listener that triggers all the state transitions for a partition
    */
diff --git core/src/main/scala/kafka/controller/ReplicaStateMachine.scala core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 3574d3a..fbef90d 100644
--- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -27,9 +27,15 @@ import org.I0Itec.zkclient.IZkChildListener
 /**
  * This class represents the state machine for replicas. It defines the states that a replica can be in, and
  * transitions to move the replica to another legal state. The different states that a replica can be in are -
- * 1. OnlineReplica     : Once a replica is started, it is in this state. Valid previous state are OnlineReplica or
- *                        OfflineReplica
- * 2. OfflineReplica    : If a replica dies, it moves to this state. Valid previous state is OnlineReplica
+ * 1. NewReplica        : The controller can create new replicas during partition reassignment. In this state, a
+ *                        replica can only get become follower state change request.  Valid previous
+ *                        state is NonExistentReplica
+ * 2. OnlineReplica     : Once a replica is started and part of the assigned replicas for its partition, it is in this
+ *                        state. In this state, it can get either become leader or become follower state change requests.
+ *                        Valid previous state are NewReplica, OnlineReplica or OfflineReplica
+ * 3. OfflineReplica    : If a replica dies, it moves to this state. This happens when the broker hosting the replica
+ *                        is down. Valid previous state are NewReplica, OnlineReplica
+ * 4. NonExistentReplica: If a replica is deleted, it is moved to this state. Valid previous state is OfflineReplica
  */
 class ReplicaStateMachine(controller: KafkaController) extends Logging {
   this.logIdent = "[Replica state machine on Controller " + controller.config.brokerId + "]: "
@@ -49,7 +55,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     // initialize replica state
     initializeReplicaState()
     // move all Online replicas to Online
-    handleStateChanges(controllerContext.liveBrokerIds.toSeq, OnlineReplica)
+    handleStateChanges(ZkUtils.getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq,
+      controllerContext.liveBrokerIds.toSeq), OnlineReplica)
     info("Started replica state machine with initial state -> " + replicaState.toString())
   }
 
@@ -72,20 +79,11 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
    * @param targetState  The state that the replicas should be moved to
    * The controller's allLeaders cache should have been updated before this
    */
-  def handleStateChanges(brokerIds: Seq[Int], targetState: ReplicaState) {
-    info("Invoking state change to %s for brokers %s".format(targetState, brokerIds.mkString(",")))
+  def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState) {
+    info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
     try {
       brokerRequestBatch.newBatch()
-      brokerIds.foreach { brokerId =>
-        // read all the partitions and their assigned replicas into a map organized by
-        // { replica id -> partition 1, partition 2...
-        val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(controllerContext.allTopics.toSeq, brokerId)
-        partitionsAssignedToThisBroker.foreach { topicAndPartition =>
-          handleStateChange(topicAndPartition._1, topicAndPartition._2, brokerId, targetState)
-        }
-        if(partitionsAssignedToThisBroker.size == 0)
-          info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
-      }
+      replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
       brokerRequestBatch.sendRequestsToBrokers()
     }catch {
       case e => error("Error while moving some replicas to %s state".format(targetState), e)
@@ -100,28 +98,62 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
    * @param replicaId   The replica for which the state transition is invoked
    * @param targetState The end state that the replica should be moved to
    */
-  private def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) {
+  def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) {
     try {
+      replicaState.getOrElseUpdate((topic, partition, replicaId), NonExistentReplica)
       targetState match {
-        case OnlineReplica =>
-          assertValidPreviousStates(topic, partition, replicaId, List(OnlineReplica, OfflineReplica), targetState)
-          // check if the leader for this partition is alive or even exists
-          // NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper
-          // for the ISR anyways
+        case NewReplica =>
+          assertValidPreviousStates(topic, partition, replicaId, List(NonExistentReplica), targetState)
+          // start replica as a follower to the current leader for its partition
           val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
           leaderAndIsrOpt match {
             case Some(leaderAndIsr) =>
-              controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
-                case true => // leader is alive
-                  brokerRequestBatch.addRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr)
-                  replicaState.put((topic, partition, replicaId), OnlineReplica)
-                  info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
-                case false => // ignore partitions whose leader is not alive
+              if(leaderAndIsr.leader == replicaId)
+                throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica"
+                  .format(replicaId, topic, partition) + "state as it is being requested to become leader")
+              brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr)
+            case None => // new leader request will be sent to this replica when one gets elected
+          }
+          replicaState.put((topic, partition, replicaId), NewReplica)
+          info("Replica %d for partition [%s, %d] state changed to NewReplica".format(replicaId, topic, partition))
+        case NonExistentReplica =>
+          assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
+          // send stop replica command
+          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition)
+          // remove this replica from the assigned replicas list for its partition
+          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment((topic, partition))
+          controllerContext.partitionReplicaAssignment.put((topic, partition),
+            currentAssignedReplicas.filterNot(_ == replicaId))
+          info("Replica %d for partition [%s, %d] state changed to NonExistentReplica".format(replicaId, topic, partition))
+          replicaState.remove((topic, partition, replicaId))
+        case OnlineReplica =>
+          assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
+          replicaState((topic, partition, replicaId)) match {
+            case NewReplica =>
+              // add this replica to the assigned replicas list for its partition
+              val currentAssignedReplicas = controllerContext.partitionReplicaAssignment((topic, partition))
+              controllerContext.partitionReplicaAssignment.put((topic, partition), currentAssignedReplicas :+ replicaId)
+              info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
+            case _ =>
+              // check if the leader for this partition is alive or even exists
+              // NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper
+              // for the ISR anyways
+              val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
+              leaderAndIsrOpt match {
+                case Some(leaderAndIsr) =>
+                  controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
+                    case true => // leader is alive
+                      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr)
+                      replicaState.put((topic, partition, replicaId), OnlineReplica)
+                      info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
+                    case false => // ignore partitions whose leader is not alive
+                  }
+                case None => // ignore partitions who don't have a leader yet
               }
-            case None => // ignore partitions who don't have a leader yet
           }
+          replicaState.put((topic, partition, replicaId), OnlineReplica)
         case OfflineReplica =>
-          assertValidPreviousStates(topic, partition, replicaId, List(OnlineReplica), targetState)
+          assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
           // As an optimization, the controller removes dead replicas from the ISR
           var zookeeperPathUpdateSucceeded: Boolean = false
           var newLeaderAndIsr: LeaderAndIsr = null
@@ -144,7 +176,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
             }
           }
           // send the shrunk ISR state change request only to the leader
-          brokerRequestBatch.addRequestForBrokers(List(newLeaderAndIsr.leader), topic, partition, newLeaderAndIsr)
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(newLeaderAndIsr.leader), topic, partition, newLeaderAndIsr)
           // update the local leader and isr cache
           controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
           replicaState.put((topic, partition, replicaId), OfflineReplica)
@@ -226,7 +258,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
 }
 
 sealed trait ReplicaState { def state: Byte }
-case object OnlineReplica extends ReplicaState { val state: Byte = 1 }
-case object OfflineReplica extends ReplicaState { val state: Byte = 2 }
+case object NewReplica extends ReplicaState { val state: Byte = 1 }
+case object OnlineReplica extends ReplicaState { val state: Byte = 2 }
+case object OfflineReplica extends ReplicaState { val state: Byte = 3 }
+case object NonExistentReplica extends ReplicaState { val state: Byte = 4 }
 
 
diff --git core/src/main/scala/kafka/log/FileMessageSet.scala core/src/main/scala/kafka/log/FileMessageSet.scala
index be60c24..bc188d9 100644
--- core/src/main/scala/kafka/log/FileMessageSet.scala
+++ core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -31,35 +31,45 @@ import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 /**
  * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
  * will fail on an immutable message set. An optional limit and start position can be applied to the message set
- * which will control the position in the file at which the set begins.
+ * which will control the position in the file at which the set begins
  */
 @nonthreadsafe
 class FileMessageSet private[kafka](val file: File,
                                     private[log] val channel: FileChannel,
-                                    private[log] val start: Long = 0L,
-                                    private[log] val limit: Long = Long.MaxValue) extends MessageSet with Logging {
+                                    private[log] val start: Long, // the starting position in the file
+                                    private[log] val limit: Long, // the length (may be less than the file length)
+                                    val mutable: Boolean) extends MessageSet with Logging {
   
-  /* the size of the message set in bytes */
-  private val _size = new AtomicLong(scala.math.min(channel.size(), limit) - start)
-    
-  /* set the file position to the last byte in the file */
-  channel.position(channel.size)
+  private val setSize = new AtomicLong()
+
+  if(mutable) {
+    if(limit < Long.MaxValue || start > 0)
+      throw new KafkaException("Attempt to open a mutable message set with a view or offset, which is not allowed.")
+
+    setSize.set(channel.size())
+    channel.position(channel.size)
+  } else {
+    setSize.set(scala.math.min(channel.size(), limit) - start)
+  }
   
   /**
    * Create a file message set with no limit or offset
    */
-  def this(file: File, channel: FileChannel) = this(file, channel, 0, Long.MaxValue)
+  def this(file: File, channel: FileChannel, mutable: Boolean) = 
+    this(file, channel, 0, Long.MaxValue, mutable)
   
   /**
    * Create a file message set with no limit or offset
    */
-  def this(file: File) = this(file, Utils.openChannel(file, mutable = true))
+  def this(file: File, mutable: Boolean) = 
+    this(file, Utils.openChannel(file, mutable), mutable)
   
   /**
    * Return a message set which is a view into this set starting from the given position and with the given size limit.
    */
   def read(position: Long, size: Long): FileMessageSet = {
-    new FileMessageSet(file, channel, this.start + position, scala.math.min(this.start + position + size, sizeInBytes()))
+    new FileMessageSet(file, channel, this.start + position, scala.math.min(this.start + position + size, sizeInBytes()),
+      false)
   }
   
   /**
@@ -69,7 +79,7 @@ class FileMessageSet private[kafka](val file: File,
   private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
     var position = startingPosition
     val buffer = ByteBuffer.allocate(12)
-    val size = _size.get()
+    val size = setSize.get()
     while(position + 12 < size) {
       buffer.rewind()
       channel.read(buffer, position)
@@ -128,22 +138,29 @@ class FileMessageSet private[kafka](val file: File,
   /**
    * The number of bytes taken up by this file set
    */
-  def sizeInBytes(): Long = _size.get()
+  def sizeInBytes(): Long = setSize.get()
+
+  def checkMutable(): Unit = {
+    if(!mutable)
+      throw new KafkaException("Attempt to invoke mutation on immutable message set.")
+  }
   
   /**
    * Append this message to the message set
    */
   def append(messages: MessageSet): Unit = {
+    checkMutable()
     var written = 0L
     while(written < messages.sizeInBytes)
       written += messages.writeTo(channel, 0, messages.sizeInBytes)
-    _size.getAndAdd(written)
+    setSize.getAndAdd(written)
   }
  
   /**
    * Commit all written data to the physical disk
    */
   def flush() = {
+    checkMutable()
     LogFlushStats.logFlushTimer.time {
       channel.force(true)
     }
@@ -153,7 +170,8 @@ class FileMessageSet private[kafka](val file: File,
    * Close this message set
    */
   def close() {
-    flush()
+    if(mutable)
+      flush()
     channel.close()
   }
   
@@ -170,12 +188,13 @@ class FileMessageSet private[kafka](val file: File,
    * given size falls on a valid byte offset.
    */
   def truncateTo(targetSize: Long) = {
+    checkMutable()
     if(targetSize > sizeInBytes())
       throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) +
         " size of this log segment is only %d bytes".format(sizeInBytes()))
     channel.truncate(targetSize)
     channel.position(targetSize)
-    _size.set(targetSize)
+    setSize.set(targetSize)
   }
   
 }
diff --git core/src/main/scala/kafka/log/Log.scala core/src/main/scala/kafka/log/Log.scala
index 2309333..4279f86 100644
--- core/src/main/scala/kafka/log/Log.scala
+++ core/src/main/scala/kafka/log/Log.scala
@@ -101,8 +101,8 @@ object Log {
 private[kafka] class Log(val dir: File, 
                          val maxLogFileSize: Long, 
                          val maxMessageSize: Int, 
-                         val flushInterval: Int = Int.MaxValue,
-                         val rollIntervalMs: Long = Long.MaxValue, 
+                         val flushInterval: Int,
+                         val rollIntervalMs: Long, 
                          val needsRecovery: Boolean, 
                          val maxIndexSize: Int = (10*1024*1024),
                          val indexIntervalBytes: Int = 4096,
@@ -128,10 +128,10 @@ private[kafka] class Log(val dir: File,
   private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
 
   newGauge(name + "-" + "NumLogSegments",
-           new Gauge[Int] { def getValue = numberOfSegments })
+           new Gauge[Int] { def value() = numberOfSegments })
 
   newGauge(name + "-" + "LogEndOffset",
-           new Gauge[Long] { def getValue = logEndOffset })
+           new Gauge[Long] { def value() = logEndOffset })
 
   /* The name of this log */
   def name  = dir.getName()
@@ -151,7 +151,8 @@ private[kafka] class Log(val dir: File,
         if(!Log.indexFilename(dir, start).exists)
           throw new IllegalStateException("Found log file with no corresponding index file.")
         logSegments.add(new LogSegment(dir = dir, 
-                                       startOffset = start,
+                                       startOffset = start, 
+                                       mutable = false, 
                                        indexIntervalBytes = indexIntervalBytes, 
                                        maxIndexSize = maxIndexSize))
       }
@@ -160,7 +161,8 @@ private[kafka] class Log(val dir: File,
     if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment
       logSegments.add(new LogSegment(dir = dir, 
-                                     startOffset = 0,
+                                     startOffset = 0, 
+                                     mutable = true, 
                                      indexIntervalBytes = indexIntervalBytes, 
                                      maxIndexSize = maxIndexSize))
     } else {
@@ -174,9 +176,17 @@ private[kafka] class Log(val dir: File,
         }
       })
 
-      // run recovery on the last segment if necessary
+      //make the final section mutable and run recovery on it if necessary
+      val last = logSegments.remove(logSegments.size - 1)
+      last.close()
+      val mutableSegment = new LogSegment(dir = dir, 
+                                          startOffset = last.start, 
+                                          mutable = true, 
+                                          indexIntervalBytes = indexIntervalBytes, 
+                                          maxIndexSize = maxIndexSize)
       if(needsRecovery)
-        recoverSegment(logSegments.get(logSegments.size - 1))
+        recoverSegment(mutableSegment)
+      logSegments.add(mutableSegment)
     }
     new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size)))
   }
@@ -396,11 +406,12 @@ private[kafka] class Log(val dir: File,
     }
     debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
     segments.view.lastOption match {
-      case Some(segment) => segment.index.trimToSize()
+      case Some(segment) => segment.index.makeReadOnly()
       case None => 
     }
     val segment = new LogSegment(dir, 
                                  startOffset = newOffset,
+                                 mutable = true, 
                                  indexIntervalBytes = indexIntervalBytes, 
                                  maxIndexSize = maxIndexSize)
     segments.append(segment)
@@ -535,7 +546,8 @@ private[kafka] class Log(val dir: File,
       val deletedSegments = segments.trunc(segments.view.size)
       debug("Truncate and start log '" + name + "' to " + newOffset)
       segments.append(new LogSegment(dir, 
-                                     newOffset,
+                                     newOffset, 
+                                     mutable = true, 
                                      indexIntervalBytes = indexIntervalBytes, 
                                      maxIndexSize = maxIndexSize))
       deleteSegments(deletedSegments)
diff --git core/src/main/scala/kafka/log/LogSegment.scala core/src/main/scala/kafka/log/LogSegment.scala
index 4bf3939..aaf03e8 100644
--- core/src/main/scala/kafka/log/LogSegment.scala
+++ core/src/main/scala/kafka/log/LogSegment.scala
@@ -28,9 +28,9 @@ class LogSegment(val messageSet: FileMessageSet,
   
   @volatile var deleted = false
   
-  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int) = 
-    this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), 
-         new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
+  def this(dir: File, startOffset: Long, mutable: Boolean, indexIntervalBytes: Int, maxIndexSize: Int) = 
+    this(new FileMessageSet(file = Log.logFilename(dir, startOffset), mutable = mutable), 
+         new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, mutable = mutable, maxIndexSize = maxIndexSize),
          startOffset,
          indexIntervalBytes,
          SystemTime)
diff --git core/src/main/scala/kafka/log/OffsetIndex.scala core/src/main/scala/kafka/log/OffsetIndex.scala
index e8eb554..aa5af1d 100644
--- core/src/main/scala/kafka/log/OffsetIndex.scala
+++ core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -49,36 +49,39 @@ import kafka.utils._
  * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 
  * storage format.
  */
-class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1) extends Logging {
-  
+class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, maxIndexSize: Int = -1) extends Logging {
+
   /* the memory mapping */
   private var mmap: MappedByteBuffer = 
     {
       val newlyCreated = file.createNewFile()
       val raf = new RandomAccessFile(file, "rw")
       try {
-        /* pre-allocate the file if necessary */
-        if(newlyCreated) {
+        if(mutable) {
+          /* if mutable create and memory map a new sparse file */
           if(maxIndexSize < 8)
             throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
-          raf.setLength(roundToExactMultiple(maxIndexSize, 8))
-        }
           
-        val len = raf.length()  
-        if(len < 0 || len % 8 != 0)
-          throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + 
-                                          " bytes which is not positive or not a multiple of 8.")
+          /* pre-allocate the file if necessary */
+          if(newlyCreated)
+            raf.setLength(roundToExactMultiple(maxIndexSize, 8))
+          val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, raf.length())
           
-        /* memory-map the file */
-        val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
-          
-        /* set the position in the index for the next entry */
-        if(newlyCreated)
-          idx.position(0)
-        else
-          // if this is a pre-existing index, assume it is all valid and set position to last entry
-          idx.position(roundToExactMultiple(idx.limit, 8))
-        idx
+          /* set the position in the index for the next entry */
+          if(newlyCreated)
+            idx.position(0)
+          else
+            // if this is a pre-existing index, assume it is all valid and set position to last entry
+            idx.position(roundToExactMultiple(idx.limit, 8))
+          idx
+        } else {
+          /* if not mutable, just mmap what they gave us */
+          val len = raf.length()
+          if(len < 0 || len % 8 != 0)
+            throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + 
+                                            " bytes which is not positive or not a multiple of 8.")
+          raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, len)
+        }
       } finally {
         Utils.swallow(raf.close())
       }
@@ -88,7 +91,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1)
   val maxEntries = mmap.limit / 8
   
   /* the number of entries in the index */
-  private var size = new AtomicInteger(mmap.position / 8)
+  private var size = if(mutable) new AtomicInteger(mmap.position / 8) else new AtomicInteger(mmap.limit / 8)
   
   /* the last offset in the index */
   var lastOffset = readLastOffset()
@@ -112,6 +115,8 @@ class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1)
    * the pair (baseOffset, 0) is returned.
    */
   def lookup(targetOffset: Long): OffsetPosition = {
+    if(entries == 0)
+      return OffsetPosition(baseOffset, 0)
     val idx = mmap.duplicate
     val slot = indexSlotFor(idx, targetOffset)
     if(slot == -1)
@@ -123,20 +128,16 @@ class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1)
   /**
    * Find the slot in which the largest offset less than or equal to the given
    * target offset is stored.
-   * Return -1 if the least entry in the index is larger than the target offset or the index is empty
+   * Return -1 if the least entry in the index is larger than the target offset 
    */
   private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
     // we only store the difference from the baseoffset so calculate that
     val relativeOffset = targetOffset - baseOffset
     
-    // check if the index is empty
-    if(entries == 0)
-      return -1
-    
     // check if the target offset is smaller than the least offset
     if(logical(idx, 0) > relativeOffset)
       return -1
-      
+    
     // binary search for the entry
     var lo = 0
     var hi = entries-1
@@ -174,6 +175,8 @@ class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1)
    */
   def append(logicalOffset: Long, position: Int) {
     this synchronized {
+      if(!mutable)
+        throw new IllegalStateException("Attempt to append to an immutable offset index " + file.getName)
       if(isFull)
         throw new IllegalStateException("Attempt to append to a full index (size = " + size + ").")
       if(size.get > 0 && logicalOffset <= lastOffset)
@@ -224,17 +227,17 @@ class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1)
   }
   
   /**
-   * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
-   * the file.
+   * Make this segment read-only, flush any unsaved changes, and truncate any excess bytes
    */
-  def trimToSize() {
+  def makeReadOnly() {
     this synchronized {
+      mutable = false
       flush()
       val raf = new RandomAccessFile(file, "rws")
       try {
         val newLength = entries * 8
         raf.setLength(newLength)
-        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, newLength)
+        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, newLength)
       } finally {
         Utils.swallow(raf.close())
       }
@@ -262,7 +265,8 @@ class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1)
   
   /** Close the index */
   def close() {
-    trimToSize()
+    if(mutable)
+      makeReadOnly()
   }
   
   /**
diff --git core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
index d676b57..cfe7e34 100644
--- core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
+++ core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
@@ -50,10 +50,9 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
         if (!csvDir.exists())
           csvDir.mkdirs()
         underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
-        if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) {
-          initialized = true
+        if (props.getBoolean("kafka.csv.metrics.reporter.enabled", false))
           startReporter(metricsConfig.pollingIntervalSecs)
-        }
+        initialized = true
       }
     }
   }
diff --git core/src/main/scala/kafka/network/RequestChannel.scala core/src/main/scala/kafka/network/RequestChannel.scala
index f227154..a39efa1 100644
--- core/src/main/scala/kafka/network/RequestChannel.scala
+++ core/src/main/scala/kafka/network/RequestChannel.scala
@@ -92,7 +92,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
   newGauge(
     "RequestQueueSize",
     new Gauge[Int] {
-      def getValue = requestQueue.size
+      def value() = requestQueue.size
     }
   )
 
diff --git core/src/main/scala/kafka/producer/ProducerPool.scala core/src/main/scala/kafka/producer/ProducerPool.scala
index cf3e229..eb8ead3 100644
--- core/src/main/scala/kafka/producer/ProducerPool.scala
+++ core/src/main/scala/kafka/producer/ProducerPool.scala
@@ -21,7 +21,7 @@ import kafka.cluster.Broker
 import java.util.Properties
 import collection.mutable.HashMap
 import java.lang.Object
-import kafka.utils.{Utils, Logging}
+import kafka.utils.Logging
 import kafka.api.TopicMetadata
 import kafka.common.UnavailableProducerException
 
diff --git core/src/main/scala/kafka/producer/async/ProducerSendThread.scala core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 25d995b..705cb44 100644
--- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -36,7 +36,7 @@ class ProducerSendThread[K,V](val threadName: String,
   newGauge(
     "ProducerQueueSize-" + getId,
     new Gauge[Int] {
-      def getValue = queue.size
+      def value() = queue.size
     }
   )
 
diff --git core/src/main/scala/kafka/server/AbstractFetcherManager.scala core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index c956a02..8b26be3 100644
--- core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -23,7 +23,7 @@ import kafka.cluster.Broker
 
 abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging {
     // map of (source brokerid, fetcher Id per source broker) => fetcher
-  private val fetcherThreadMap = new mutable.HashMap[(Broker, Int), AbstractFetcherThread]
+  private val fetcherThreadMap = new mutable.HashMap[(Int, Int), AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = "[" + name + "], "
 
@@ -37,7 +37,7 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
   def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) {
     mapLock synchronized {
       var fetcherThread: AbstractFetcherThread = null
-      val key = (sourceBroker, getFetcherId(topic, partitionId))
+      val key = (sourceBroker.id, getFetcherId(topic, partitionId))
       fetcherThreadMap.get(key) match {
         case Some(f) => fetcherThread = f
         case None =>
@@ -64,6 +64,15 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
     }
   }
 
+  def fetcherSourceBroker(topic: String, partitionId: Int): Option[Int] = {
+    mapLock synchronized {
+      for ( ((sourceBrokerId, _), fetcher) <- fetcherThreadMap)
+        if (fetcher.hasPartition(topic, partitionId))
+          return Some(sourceBrokerId)
+    }
+    None
+  }
+
   def closeAllFetchers() {
     mapLock synchronized {
       for ( (_, fetcher) <- fetcherThreadMap) {
@@ -72,4 +81,4 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
       fetcherThreadMap.clear()
     }
   }
-}
+}
\ No newline at end of file
diff --git core/src/main/scala/kafka/server/AbstractFetcherThread.scala core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 936e61a..fe1af99 100644
--- core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -119,7 +119,7 @@ abstract class  AbstractFetcherThread(name: String, sourceBroker: Broker, socket
                   warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
                     .format(currentOffset.get, topic, partitionId, newOffset))
                 case _ =>
-                  error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
+                  error("error for %s %d to broker %s ".format(topic, partitionId, sourceBroker.host),
                     ErrorMapping.exceptionFor(partitionData.error))
                   partitionsWithError += topicAndPartition
                   fetchMap.remove(topicAndPartition)
@@ -165,7 +165,7 @@ class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup {
   newGauge(
     name._1 + "-" + name._2 + "-ConsumerLag",
     new Gauge[Long] {
-      def getValue = lagVal.get
+      def value() = lagVal.get
     }
   )
 
diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala
index f648df3..f8a0d3f 100644
--- core/src/main/scala/kafka/server/KafkaApis.scala
+++ core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import java.io.IOException
 import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
 import kafka.message._
@@ -76,7 +75,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse)))
   }
 
-
   def handleStopReplicaRequest(request: RequestChannel.Request){
     val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
     if(requestLogger.isTraceEnabled)
@@ -85,7 +83,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val responseMap = new HashMap[(String, Int), Short]
 
-    for((topic, partitionId) <- stopReplicaRequest.stopReplicaSet){
+    for((topic, partitionId) <- stopReplicaRequest.partitions){
       val errorCode = replicaManager.stopReplica(topic, partitionId)
       responseMap.put((topic, partitionId), errorCode)
     }
diff --git core/src/main/scala/kafka/server/KafkaConfig.scala core/src/main/scala/kafka/server/KafkaConfig.scala
index 4252c89..1b94700 100644
--- core/src/main/scala/kafka/server/KafkaConfig.scala
+++ core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
 import java.net.InetAddress
-import kafka.utils.{Topic, Utils, VerifiableProperties, ZKConfig}
+import kafka.utils.{Utils, VerifiableProperties, ZKConfig}
 
 /**
  * Configuration settings for the kafka server
diff --git core/src/main/scala/kafka/server/ReplicaFetcherManager.scala core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 69db208..05f5233 100644
--- core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -20,7 +20,8 @@ package kafka.server
 import kafka.cluster.Broker
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
-        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) {
+        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ",
+          brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
diff --git core/src/main/scala/kafka/server/ReplicaManager.scala core/src/main/scala/kafka/server/ReplicaManager.scala
index 515ba5a..e061924 100644
--- core/src/main/scala/kafka/server/ReplicaManager.scala
+++ core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -47,13 +47,13 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
   newGauge(
     "LeaderCount",
     new Gauge[Int] {
-      def getValue = leaderPartitions.size
+      def value() = leaderPartitions.size
     }
   )
   newGauge(
     "UnderReplicatedPartitions",
     new Gauge[Int] {
-      def getValue = {
+      def value() = {
         leaderPartitionsLock synchronized {
           leaderPartitions.count(_.isUnderReplicated)
         }
@@ -159,19 +159,6 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
       }
       responseMap.put(partitionInfo, errorCode)
     }
-
-    /**
-     *  If IsInit flag is on, this means that the controller wants to treat topics not in the request
-     *  as deleted.
-     *  TODO: Handle this properly as part of KAFKA-330
-     */
-//    if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
-//      startHighWaterMarksCheckPointThread
-//      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1)
-//      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
-//      partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
-//    }
-
     responseMap
   }
 
diff --git core/src/main/scala/kafka/server/RequestPurgatory.scala core/src/main/scala/kafka/server/RequestPurgatory.scala
index 1a3dbd3..0aac6d1 100644
--- core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -69,7 +69,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
   newGauge(
     "NumDelayedRequests",
     new Gauge[Int] {
-      def getValue = expiredRequestReaper.unsatisfied.get()
+      def value() = expiredRequestReaper.unsatisfied.get()
     }
   )
 
diff --git core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 81b34ce..fdcc690 100644
--- core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -22,7 +22,6 @@ import joptsimple._
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import kafka.consumer.SimpleConsumer
-import collection.mutable.Map
 import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
 import kafka.common.TopicAndPartition
 import scala.collection._
@@ -34,9 +33,8 @@ object ConsumerOffsetChecker extends Logging {
 
   private val BidPidPattern = """(\d+)-(\d+)""".r
 
-  private val BrokerIpPattern = """.*:([^:]+):(\d+$)""".r
+  private val BrokerIpPattern = """.*:(\d+\.\d+\.\d+\.\d+):(\d+$)""".r
   // e.g., 127.0.0.1-1315436360737:127.0.0.1:9092
-  // e.g., host.domain.com-1315436360737:host.domain.com:9092
 
   private def getConsumer(zkClient: ZkClient, bid: String): Option[SimpleConsumer] = {
     val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
diff --git core/src/main/scala/kafka/tools/DumpLogSegments.scala core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 947aff3..7396a99 100644
--- core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -43,7 +43,7 @@ object DumpLogSegments {
   /* print out the contents of the index */
   def dumpIndex(file: File) {
     val startOffset = file.getName().split("\\.")(0).toLong
-    val index = new OffsetIndex(file = file, baseOffset = startOffset)
+    val index = new OffsetIndex(file = file, baseOffset = startOffset, mutable = false)
     for(i <- 0 until index.entries) {
       val entry = index.entry(i)
       // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one
@@ -57,7 +57,7 @@ object DumpLogSegments {
   def dumpLog(file: File, printContents: Boolean) {
     val startOffset = file.getName().split("\\.")(0).toLong
     println("Starting offset: " + startOffset)
-    val messageSet = new FileMessageSet(file)
+    val messageSet = new FileMessageSet(file, false)
     var validBytes = 0L
     for(messageAndOffset <- messageSet) {
       val msg = messageAndOffset.message
diff --git core/src/main/scala/kafka/utils/Utils.scala core/src/main/scala/kafka/utils/Utils.scala
index 70d3b02..f030d30 100644
--- core/src/main/scala/kafka/utils/Utils.scala
+++ core/src/main/scala/kafka/utils/Utils.scala
@@ -19,6 +19,7 @@ package kafka.utils
 
 import java.io._
 import java.nio._
+import charset.Charset
 import java.nio.channels._
 import java.lang.management._
 import java.util.zip.CRC32
@@ -638,7 +639,7 @@ object Utils extends Logging {
     builder.toString
   }
 
-  def mapToJson[T <: Any](map: Map[String, List[String]]): String = {
+  def mapToJson[T <: Any](map: Map[String, Seq[String]]): String = {
     val builder = new StringBuilder
     builder.append("{ ")
     var numElements = 0
@@ -715,6 +716,18 @@ object Utils extends Logging {
       for (forever <- Stream.continually(1); t <- coll) yield t
     stream.iterator
   }
+
+  def readFileIntoString(path: String): String = {
+    val stream = new FileInputStream(new File(path))
+    try {
+      val fc = stream.getChannel()
+      val bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size())
+      Charset.defaultCharset().decode(bb).toString()
+    }
+    finally {
+      stream.close()
+    }
+  }
 }
 
 /**
diff --git core/src/main/scala/kafka/utils/ZkUtils.scala core/src/main/scala/kafka/utils/ZkUtils.scala
index 9556b6e..dcc0cda 100644
--- core/src/main/scala/kafka/utils/ZkUtils.scala
+++ core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -27,12 +27,15 @@ import kafka.api.LeaderAndIsr
 import org.apache.zookeeper.data.Stat
 import java.util.concurrent.locks.{ReentrantLock, Condition}
 import kafka.common.{KafkaException, NoEpochForPartitionException}
+import kafka.controller.{PartitionAndReplica, ReassignedPartitionsContext}
+import kafka.admin._
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
   val ControllerPath = "/controller"
+  val ReassignPartitionsPath = "/admin/reassign_partitions"
 
   def getTopicPath(topic: String): String ={
     BrokerTopicsPath + "/" + topic
@@ -76,20 +79,23 @@ object ZkUtils extends Logging {
     val leaderAndIsrOpt = leaderAndIsrInfo._1
     val stat = leaderAndIsrInfo._2
     leaderAndIsrOpt match {
-      case Some(leaderAndIsrStr) =>
-        SyncJSON.parseFull(leaderAndIsrStr) match {
-          case Some(m) =>
-            val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
-            val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
-            val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
-            val isr = Utils.getCSVList(isrString).map(r => r.toInt)
-            val zkPathVersion = stat.getVersion
-            debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
-              isr.toString(), zkPathVersion, topic, partition))
-            Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion))
-          case None => None
-        }
-      case None => None // TODO: Handle if leader and isr info is not available in zookeeper
+      case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, topic, partition, stat)
+      case None => None
+    }
+  }
+
+  def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat): Option[LeaderAndIsr] = {
+    SyncJSON.parseFull(leaderAndIsrStr) match {
+      case Some(m) =>
+        val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
+        val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
+        val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
+        val isr = Utils.getCSVList(isrString).map(r => r.toInt)
+        val zkPathVersion = stat.getVersion
+        debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
+          isr.toString(), zkPathVersion, topic, partition))
+        Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion))
+      case None => None
     }
   }
 
@@ -295,11 +301,13 @@ object ZkUtils extends Logging {
   def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = {
     try {
       val stat = client.writeData(path, data, expectVersion)
-      debug("Conditional update to the zookeeper path %s with expected version %d succeeded and returned the new version: %d".format(path, expectVersion, stat.getVersion))
+      info("Conditional update of zkPath %s with value %s and expected version %d succeeded, returning the new version: %d"
+        .format(path, data, expectVersion, stat.getVersion))
       (true, stat.getVersion)
     } catch {
       case e: Exception =>
-        debug("Conditional update to the zookeeper path %s with expected version %d failed".format(path, expectVersion), e)
+        error("Conditional update of zkPath %s with data %s and expected version %d failed".format(path, data,
+          expectVersion), e)
         (false, -1)
     }
   }
@@ -511,6 +519,66 @@ object ZkUtils extends Logging {
     }.flatten[(String, Int)].toSeq
   }
 
+  def getPartitionsBeingReassigned(zkClient: ZkClient): Map[(String, Int), ReassignedPartitionsContext] = {
+    // read the partitions and their new replica list
+    val jsonPartitionMapOpt = readDataMaybeNull(zkClient, ReassignPartitionsPath)._1
+    jsonPartitionMapOpt match {
+      case Some(jsonPartitionMap) =>
+        val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap)
+        reassignedPartitions.map { p =>
+          val newReplicas = p._2
+          (p._1 -> new ReassignedPartitionsContext(newReplicas))
+        }
+      case None => Map.empty[(String, Int), ReassignedPartitionsContext]
+    }
+  }
+
+  def parsePartitionReassignmentData(jsonData: String):Map[(String, Int), Seq[Int]] = {
+    SyncJSON.parseFull(jsonData) match {
+      case Some(m) =>
+        val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
+        replicaMap.map { reassignedPartitions =>
+          val topic = reassignedPartitions._1.split(",").head
+          val partition = reassignedPartitions._1.split(",").last.toInt
+          val newReplicas = reassignedPartitions._2.map(_.toInt)
+          (topic, partition) -> newReplicas
+        }
+      case None => Map.empty[(String, Int), Seq[Int]]
+    }
+  }
+
+  def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[(String, Int), Seq[Int]]) {
+    val zkPath = ZkUtils.ReassignPartitionsPath
+    partitionsToBeReassigned.size match {
+      case 0 => // need to delete the /admin/reassign_partitions path
+        deletePath(zkClient, zkPath)
+        info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath))
+      case _ =>
+        val jsonData = Utils.mapToJson(partitionsToBeReassigned.map(p => ("%s,%s".format(p._1._1, p._1._2)) -> p._2.map(_.toString)))
+        try {
+          updatePersistentPath(zkClient, zkPath, jsonData)
+          info("Updated partition reassignment path with %s".format(jsonData))
+        }catch {
+          case nne: ZkNoNodeException =>
+            ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
+            debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
+          case e2 => throw new AdministrationException(e2.toString)
+        }
+    }
+  }
+
+  def getAllReplicasOnBroker(zkClient: ZkClient, topics: Seq[String], brokerIds: Seq[Int]): Seq[PartitionAndReplica] = {
+    brokerIds.map { brokerId =>
+      // read all the partitions and their assigned replicas into a map organized by
+      // { replica id -> partition 1, partition 2...
+      val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(zkClient, topics, brokerId)
+      if(partitionsAssignedToThisBroker.size == 0)
+        info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
+      partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1, p._2, brokerId))
+    }.flatten
+  }
+
+
   def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
     val brokerIdPath = BrokerIdsPath + "/" + brokerId
     zkClient.delete(brokerIdPath)
diff --git core/src/test/scala/unit/kafka/admin/AdminTest.scala core/src/test/scala/unit/kafka/admin/AdminTest.scala
index c9e2229..e756239 100644
--- core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -20,8 +20,10 @@ import junit.framework.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
+import kafka.server.{KafkaServer, KafkaConfig}
+import collection.mutable.ListBuffer
 import kafka.common.ErrorMapping
-import kafka.utils.TestUtils
+import kafka.utils.{Utils, ZkUtils, TestUtils}
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -207,4 +209,182 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
         }
     }
   }
+
+  @Test
+  def testPartitionReassignmentWithLeaderInNewReplicas() {
+    val expectedReplicaAssignment = Map(0  -> List("0", "1", "2"))
+    val topic = "test"
+    // create brokers
+    val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    // create the topic
+    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    // reassign partition 0
+    val newReplicas = Seq(0, 2, 3)
+    val partitionToBeReassigned = 0
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas))
+    assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
+    // wait until reassignment is completed
+    TestUtils.waitUntilTrue(() => {
+      val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
+      CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topic, partitionToBeReassigned, newReplicas,
+      Map((topic, partitionToBeReassigned) -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
+    }, 1000)
+    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
+    servers.foreach(_.shutdown())
+  }
+
+  @Test
+  def testPartitionReassignmentWithLeaderNotInNewReplicas() {
+    val expectedReplicaAssignment = Map(0  -> List("0", "1", "2"))
+    val topic = "test"
+    // create brokers
+    val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    // create the topic
+    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    // reassign partition 0
+    val newReplicas = Seq(1, 2, 3)
+    val partitionToBeReassigned = 0
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas))
+    assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
+    // wait until reassignment is completed
+    TestUtils.waitUntilTrue(() => {
+      val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
+      CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topic, partitionToBeReassigned, newReplicas,
+      Map((topic, partitionToBeReassigned) -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
+    }, 1000)
+    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
+    // leader should be 2
+    servers.foreach(_.shutdown())
+  }
+
+  @Test
+  def testPartitionReassignmentNonOverlappingReplicas() {
+    val expectedReplicaAssignment = Map(0  -> List("0", "1"))
+    val topic = "test"
+    // create brokers
+    val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    // create the topic
+    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    // reassign partition 0
+    val newReplicas = Seq(2, 3)
+    val partitionToBeReassigned = 0
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas))
+    assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
+    // wait until reassignment is completed
+    TestUtils.waitUntilTrue(() => {
+      val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
+      CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topic, partitionToBeReassigned, newReplicas,
+      Map((topic, partitionToBeReassigned) -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
+    }, 1000)
+    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
+    // leader should be 2
+    servers.foreach(_.shutdown())
+  }
+
+  @Test
+  def testReassigningNonExistingPartition() {
+    val topic = "test"
+    // create brokers
+    val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    // reassign partition 0
+    val newReplicas = Seq(2, 3)
+    val partitionToBeReassigned = 0
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas))
+    assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
+    val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient)
+    assertFalse("Partition should not be reassigned", reassignedPartitions.contains((topic, partitionToBeReassigned)))
+    // leader should be 2
+    servers.foreach(_.shutdown())
+  }
+
+  @Test
+  def testResumePartitionReassignmentThatWasCompleted() {
+    val expectedReplicaAssignment = Map(0  -> List("0", "1"))
+    val topic = "test"
+    // create the topic
+    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    // put the partition in the reassigned path as well
+    // reassign partition 0
+    val newReplicas = Seq(0, 1)
+    val partitionToBeReassigned = 0
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas))
+    reassignPartitionsCommand.reassignPartitions
+    // create brokers
+    val servers = TestUtils.createBrokerConfigs(2).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1000)
+    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
+    servers.foreach(_.shutdown())
+  }
+
+  @Test
+  def testResumePartitionReassignmentAfterLeaderWasMoved() {
+    var expectedReplicaAssignment = Map(0  -> List(1, 0, 2, 3))
+    val leaderForPartitionMap = Map(0 -> 2)
+    val topic = "test"
+    val serverConfigs = TestUtils.createBrokerConfigs(4).map(b => new KafkaConfig(b))
+    val servers = new ListBuffer[KafkaServer]
+    // create the topic
+    AdminUtils.createTopicPartitionAssignmentPathInZK(topic,
+      expectedReplicaAssignment.map(r => (r._1) -> r._2.map(_.toString)), zkClient)
+    // bring up just brokers 0 and 1
+    servers.append(TestUtils.createServer(serverConfigs(0)))
+    servers.append(TestUtils.createServer(serverConfigs(1)))
+    val newReplicas = Seq(2, 3)
+    val partitionToBeReassigned = 0
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas))
+    reassignPartitionsCommand.reassignPartitions
+    // this partition reassignment request should be ignored since replicas 2 and 3 are not online
+    // and the admin path should be deleted as well
+    TestUtils.waitUntilTrue(checkIfReassignmentPathIsDeleted, 1000)
+    var assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    assertEquals("Partition should not be reassigned to 2, 3 yet", expectedReplicaAssignment(0), assignedReplicas)
+    // create brokers
+    servers.append(TestUtils.createServer(serverConfigs(2)))
+    servers.append(TestUtils.createServer(serverConfigs(3)))
+    // wait until new replicas catch up with leader
+    TestUtils.waitUntilTrue(checkIfNewReplicasInIsr, 2000)
+    // change the assigned replicas to 0 and 1
+    updateAssignedReplicasForPartition("test", 0, List(0, 1))
+    // reissue the partition reassignment
+    reassignPartitionsCommand.reassignPartitions
+    // create leaders for the partition to be reassigned
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    // bounce controller
+    servers.head.shutdown()
+    servers.head.startup()
+    TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1500)
+    assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
+    servers.foreach(_.shutdown())
+  }
+
+  private def checkIfReassignPartitionPathExists(): Boolean = {
+    ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
+  }
+
+  private def checkIfReassignmentPathIsDeleted(): Boolean = {
+    !ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
+  }
+
+  private def checkIfNewReplicasInIsr(): Boolean = {
+    val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, "test", 0)
+    leaderAndIsrOpt match {
+      case Some(leaderAndIsr) =>
+        if(leaderAndIsr.isr.contains(2) && leaderAndIsr.isr.contains(3))
+          true
+        else
+          false
+      case None => false
+    }
+  }
+
+  private def updateAssignedReplicasForPartition(topic: String, partition: Int, newAssignedReplicas: Seq[Int]) {
+    val zkPath = ZkUtils.getTopicPath(topic)
+    val jsonPartitionMap = Utils.mapToJson(Map(partition.toString -> newAssignedReplicas.map(_.toString)))
+    ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
+  }
 }
diff --git core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index ab42381..12cebe6 100644
--- core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -95,7 +95,7 @@ object SerializationTestUtils{
   }
 
   def createTestStopReplicaRequest() : StopReplicaRequest = {
-    new StopReplicaRequest(Set((topic1, 0), (topic2, 0)))
+    new StopReplicaRequest(collection.immutable.Set((topic1, 0), (topic2, 0)))
   }
 
   def createTestStopReplicaResponse() : StopReplicaResponse = {
diff --git core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index d0044cf..f06e537 100644
--- core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -29,7 +29,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   val messageSet = createMessageSet(messages)
   
   def createMessageSet(messages: Seq[Message]): FileMessageSet = {
-    val set = new FileMessageSet(tempFile())
+    val set = new FileMessageSet(tempFile(), true)
     set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*))
     set.flush()
     set
diff --git core/src/test/scala/unit/kafka/log/LogOffsetTest.scala core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
index c39656f..f3a272e 100644
--- core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
+++ core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
@@ -30,7 +30,7 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.admin.CreateTopicCommand
 import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
 import kafka.utils.TestUtils._
-import kafka.common.{ErrorMapping, TopicAndPartition, UnknownTopicOrPartitionException}
+import kafka.common.{ErrorMapping, TopicAndPartition}
 
 object LogOffsetTest {
   val random = new Random()  
diff --git core/src/test/scala/unit/kafka/log/LogSegmentTest.scala core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 8e57514..4a13f0d 100644
--- core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -15,10 +15,10 @@ class LogSegmentTest extends JUnit3Suite {
   
   def createSegment(offset: Long): LogSegment = {
     val msFile = TestUtils.tempFile()
-    val ms = new FileMessageSet(msFile)
+    val ms = new FileMessageSet(msFile, true)
     val idxFile = TestUtils.tempFile()
     idxFile.delete()
-    val idx = new OffsetIndex(idxFile, offset, 100)
+    val idx = new OffsetIndex(idxFile, offset, true, 100)
     val seg = new LogSegment(ms, idx, offset, 10, SystemTime)
     segments += seg
     seg
diff --git core/src/test/scala/unit/kafka/log/LogTest.scala core/src/test/scala/unit/kafka/log/LogTest.scala
index 2f49139..48c10c1 100644
--- core/src/test/scala/unit/kafka/log/LogTest.scala
+++ core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -342,33 +342,6 @@ class LogTest extends JUnitSuite {
     assertEquals("Should change log size", log.size, 0)
   }
 
-  @Test
-  def testReopenThenTruncate() {
-    val set = TestUtils.singleMessageSet("test".getBytes())
-
-    // create a log
-    var log = new Log(logDir, 
-                      maxLogFileSize = set.sizeInBytes * 5, 
-                      maxMessageSize = config.maxMessageSize, 
-                      maxIndexSize = 1000, 
-                      indexIntervalBytes = 10000, 
-                      needsRecovery = true)
-    
-    // add enough messages to roll over several segments then close and re-open and attempt to truncate
-    for(i <- 0 until 100)
-      log.append(set)
-    log.close()
-    log = new Log(logDir, 
-                  maxLogFileSize = set.sizeInBytes * 5, 
-                  maxMessageSize = config.maxMessageSize, 
-                  maxIndexSize = 1000, 
-                  indexIntervalBytes = 10000, 
-                  needsRecovery = true)
-    log.truncateTo(3)
-    assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
-    assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
-  }
-  
   def assertContains(ranges: Array[Range], offset: Long) = {
     Log.findRange(ranges, offset) match {
       case Some(range) => 
diff --git core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 91c9881..6def192 100644
--- core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -33,7 +33,7 @@ class OffsetIndexTest extends JUnitSuite {
   
   @Before
   def setup() {
-    this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8)
+    this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, mutable = true, maxIndexSize = 30 * 8)
   }
   
   @After
@@ -41,7 +41,7 @@ class OffsetIndexTest extends JUnitSuite {
     if(this.idx != null)
       this.idx.file.delete()
   }
-  
+
   @Test
   def randomLookupTest() {
     assertEquals("Not present value should return physical offset 0.", OffsetPosition(idx.baseOffset, 0), idx.lookup(92L))
@@ -88,6 +88,25 @@ class OffsetIndexTest extends JUnitSuite {
     }
     assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalStateException])
   }
+
+  
+  @Test
+  def testReadOnly() {
+    /* add some random values */
+    val vals = List((49, 1), (52, 2), (55, 3))
+    for((logical, physical) <- vals)
+      idx.append(logical, physical)
+    
+    idx.makeReadOnly()
+    
+    assertEquals("File length should just contain added entries.", vals.size * 8L, idx.file.length())
+    assertEquals("Last offset field should be initialized", vals.last._1, idx.lastOffset)
+    
+    for((logical, physical) <- vals)
+    	assertEquals("Should still be able to find everything.", OffsetPosition(logical, physical), idx.lookup(logical))
+    	
+    assertWriteFails("Append should fail on read-only index", idx, 60, classOf[IllegalStateException])
+  }
   
   @Test(expected = classOf[IllegalArgumentException])
   def appendOutOfOrder() {
@@ -96,13 +115,13 @@ class OffsetIndexTest extends JUnitSuite {
   }
   
   @Test
-  def testReopen() {
+  def reopenAsReadonly() {
     val first = OffsetPosition(51, 0)
     val sec = OffsetPosition(52, 1)
     idx.append(first.offset, first.position)
     idx.append(sec.offset, sec.position)
     idx.close()
-    val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset)
+    val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset, mutable = false)
     assertEquals(first, idxRo.lookup(first.offset))
     assertEquals(sec, idxRo.lookup(sec.offset))
     assertWriteFails("Append should fail on read-only index", idxRo, 53, classOf[IllegalStateException])
@@ -110,8 +129,7 @@ class OffsetIndexTest extends JUnitSuite {
   
   @Test
   def truncate() {
-	val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
-	idx.truncate()
+	val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, mutable = true, maxIndexSize = 10 * 8)
     for(i <- 1 until 10)
       idx.append(i, i)
       
@@ -137,6 +155,13 @@ class OffsetIndexTest extends JUnitSuite {
       case e: Exception => assertEquals("Got an unexpected exception.", klass, e.getClass)
     }
   }
+  
+  def makeIndex(baseOffset: Long, mutable: Boolean, vals: Seq[(Long, Int)]): OffsetIndex = {
+    val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = baseOffset, mutable = mutable, maxIndexSize = 2 * vals.size * 8)
+    for ((logical, physical) <- vals)
+      idx.append(logical, physical)
+    idx
+  }
 
   def monotonicSeq(base: Int, len: Int): Seq[Int] = {
     val rand = new Random(1L)
diff --git core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
index 8b9381f..c436f3d 100644
--- core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
+++ core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
@@ -67,7 +67,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
       val channel = new RandomAccessFile(file, "rw").getChannel()
       val written = set.writeTo(channel, 0, 1024)
       assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written)
-      val newSet = new FileMessageSet(file, channel)
+      val newSet = new FileMessageSet(file, channel, false)
       checkEquals(set.iterator, newSet.iterator)
     }
   }
diff --git core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index aac82dd..8b5f38a 100644
--- core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -60,7 +60,6 @@ class MessageCompressionTest extends JUnitSuite {
       true
     } catch {
       case e: UnsatisfiedLinkError => false
-      case e: org.xerial.snappy.SnappyError => false
     }
   }
 }
diff --git core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
index a3f85cf..fe5bc09 100644
--- core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
+++ core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
@@ -35,20 +35,20 @@ class KafkaTimerTest extends JUnit3Suite {
     timer.time {
       clock.addMillis(1000)
     }
-    assertEquals(1, metric.getCount())
-    assertTrue((metric.getMax() - 1000).abs <= Double.Epsilon)
-    assertTrue((metric.getMin() - 1000).abs <= Double.Epsilon)
+    assertEquals(1, metric.count())
+    assertTrue((metric.max() - 1000).abs <= Double.Epsilon)
+    assertTrue((metric.min() - 1000).abs <= Double.Epsilon)
   }
 
   private class ManualClock extends Clock {
 
     private var ticksInNanos = 0L
 
-    override def getTick() = {
+    override def tick() = {
       ticksInNanos
     }
 
-    override def getTime() = {
+    override def time() = {
       TimeUnit.NANOSECONDS.toMillis(ticksInNanos)
     }
 
diff --git core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index d926813..8239b64 100644
--- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -70,20 +70,22 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     // kill the server hosting the preferred replica
     servers.last.shutdown()
     // check if leader moves to the other server
-    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500, if(leader1.get == 0) None else leader1)
+    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500,
+      if(leader1.get == 0) None else leader1)
     val leaderEpoch2 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
     debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
     debug("leader Epoc: " + leaderEpoch2)
     assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1))
     if(leader1.get == leader2.get)
-      assertEquals("Second epoch value should be " + leaderEpoch1, leaderEpoch1, leaderEpoch2)
+      assertEquals("Second epoch value should be " + leaderEpoch1+1, leaderEpoch1+1, leaderEpoch2)
     else
       assertEquals("Second epoch value should be %d".format(leaderEpoch1+1) , leaderEpoch1+1, leaderEpoch2)
 
     servers.last.startup()
     servers.head.shutdown()
     Thread.sleep(zookeeper.tickTime)
-    val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500, if(leader2.get == 1) None else leader2)
+    val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500,
+      if(leader2.get == 1) None else leader2)
     val leaderEpoch3 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
     debug("leader Epoc: " + leaderEpoch3)
     debug("Leader is elected to be: %s".format(leader3.getOrElse(-1)))
diff --git core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
index 4f61f84..f44cc0b 100644
--- core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
+++ core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package kafka.server
 
@@ -69,9 +69,10 @@ class RequestPurgatoryTest extends JUnit3Suite {
     purgatory.watch(r2)
     purgatory.awaitExpiration(r1)
     val elapsed = System.currentTimeMillis - start
+    println("Start = %d, Elapsed = %d".format(start, elapsed))
     assertTrue("r1 expired", purgatory.expired.contains(r1))
     assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2))
-    assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration)
+    assertTrue("Time for expiration was about 20ms", (elapsed - expiration).abs < 10L)
   }
   
   class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {
diff --git core/src/test/scala/unit/kafka/utils/TestUtils.scala core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 31cc051..b82f1af 100644
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -389,7 +389,8 @@ object TestUtils extends Logging {
             newLeaderAndISR.leaderEpoch += 1
             newLeaderAndISR.zkVersion += 1
           }
-          ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath( topic, partition), newLeaderAndISR.toString)
+          ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+            newLeaderAndISR.toString)
         } catch {
           case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe)
         }
diff --git project/build/KafkaProject.scala project/build/KafkaProject.scala
index 2c7ab03..230c28f 100644
--- project/build/KafkaProject.scala
+++ project/build/KafkaProject.scala
@@ -66,42 +66,17 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
         <scope>compile</scope>
       </dependency>
 
-    def metricsDeps =
-      <dependencies>
-        <dependency>
-          <groupId>com.yammer.metrics</groupId>
-          <artifactId>metrics-core</artifactId>
-          <version>3.0.0-10ccc80c</version>
-          <scope>compile</scope>
-        </dependency>
-        <dependency>
-          <groupId>com.yammer.metrics</groupId>
-          <artifactId>metrics-annotations</artifactId>
-          <version>3.0.0-10ccc80c</version>
-          <scope>compile</scope>
-        </dependency>
-      </dependencies>
-
     object ZkClientDepAdder extends RuleTransformer(new RewriteRule() {
       override def transform(node: Node): Seq[Node] = node match {
         case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
-          Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep:_*)
-        }
-        case other => other
-      }
-    })
-
-    object MetricsDepAdder extends RuleTransformer(new RewriteRule() {
-      override def transform(node: Node): Seq[Node] = node match {
-        case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
-          Elem(prefix, "dependencies", attribs, scope, deps ++ metricsDeps:_*)
+          Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep :_*)
         }
         case other => other
       }
     })
 
     override def pomPostProcess(pom: Node): Node = {
-      MetricsDepAdder(ZkClientDepAdder(pom))
+      ZkClientDepAdder(pom)
     }
 
     override def artifactID = "kafka"
@@ -276,6 +251,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
   trait CoreDependencies {
     val log4j = "log4j" % "log4j" % "1.2.15"
     val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
+    val metricsCore = "com.yammer.metrics" % "metrics-core" % "latest.release"
     val slf4jSimple = "org.slf4j" % "slf4j-simple" % "latest.release"
   }
   
